Add delay before the peon drops the segments after publishing them (#15373)

Currently in the realtime ingestion (Kafka/Kinesis) case, after publishing the segments, upon acknowledgement from the coordinator that the segments are already placed in some historicals, the peon would unannounce the segments (basically saying the segments are not in this peon anymore to the whole cluster) and drop the segments from cache and sink timeline in one shot.

The in transit queries from the brokers that still thinks the segments are in the peon can get a NullPointer exception when the peon is unsetting the hydrants in the sinks.

The fix would let the peon to wait for a configurable delay period before dropping segments, remove segments from cache etc after the peon unannounce the segments.

This delayed approach is similar to how the historicals handle segments moving out.
This commit is contained in:
kaisun2000 2024-01-01 21:38:28 -08:00 committed by GitHub
parent cce539495d
commit a5e9b14be0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 319 additions and 52 deletions

View File

@ -55,6 +55,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@ -78,6 +79,7 @@ import java.util.Collection;
*/
public class TaskToolbox
{
private final SegmentLoaderConfig segmentLoaderConfig;
private final TaskConfig config;
private final DruidNode taskExecutorNode;
private final TaskActionClient taskActionClient;
@ -130,6 +132,7 @@ public class TaskToolbox
private final String attemptId;
public TaskToolbox(
SegmentLoaderConfig segmentLoaderConfig,
TaskConfig config,
DruidNode taskExecutorNode,
TaskActionClient taskActionClient,
@ -171,6 +174,7 @@ public class TaskToolbox
String attemptId
)
{
this.segmentLoaderConfig = segmentLoaderConfig;
this.config = config;
this.taskExecutorNode = taskExecutorNode;
this.taskActionClient = taskActionClient;
@ -213,6 +217,11 @@ public class TaskToolbox
this.attemptId = attemptId;
}
public SegmentLoaderConfig getSegmentLoaderConfig()
{
return segmentLoaderConfig;
}
public TaskConfig getConfig()
{
return config;
@ -504,6 +513,7 @@ public class TaskToolbox
public static class Builder
{
private SegmentLoaderConfig segmentLoaderConfig;
private TaskConfig config;
private DruidNode taskExecutorNode;
private TaskActionClient taskActionClient;
@ -550,6 +560,7 @@ public class TaskToolbox
public Builder(TaskToolbox other)
{
this.segmentLoaderConfig = other.segmentLoaderConfig;
this.config = other.config;
this.taskExecutorNode = other.taskExecutorNode;
this.taskActionClient = other.taskActionClient;
@ -589,6 +600,12 @@ public class TaskToolbox
this.shuffleClient = other.shuffleClient;
}
public Builder config(final SegmentLoaderConfig segmentLoaderConfig)
{
this.segmentLoaderConfig = segmentLoaderConfig;
return this;
}
public Builder config(final TaskConfig config)
{
this.config = config;
@ -826,6 +843,7 @@ public class TaskToolbox
public TaskToolbox build()
{
return new TaskToolbox(
segmentLoaderConfig,
config,
taskExecutorNode,
taskActionClient,

View File

@ -56,6 +56,7 @@ import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
@ -72,6 +73,7 @@ import java.util.function.Function;
*/
public class TaskToolboxFactory
{
private final SegmentLoaderConfig segmentLoaderConfig;
private final TaskConfig config;
private final DruidNode taskExecutorNode;
private final TaskActionClientFactory taskActionClientFactory;
@ -115,6 +117,7 @@ public class TaskToolboxFactory
@Inject
public TaskToolboxFactory(
SegmentLoaderConfig segmentLoadConfig,
TaskConfig config,
@Parent DruidNode taskExecutorNode,
TaskActionClientFactory taskActionClientFactory,
@ -155,6 +158,7 @@ public class TaskToolboxFactory
@AttemptId String attemptId
)
{
this.segmentLoaderConfig = segmentLoadConfig;
this.config = config;
this.taskExecutorNode = taskExecutorNode;
this.taskActionClientFactory = taskActionClientFactory;
@ -210,6 +214,7 @@ public class TaskToolboxFactory
final File taskWorkDir = config.getTaskWorkDir(task.getId());
return new TaskToolbox.Builder()
.config(config)
.config(segmentLoaderConfig)
.taskExecutorNode(taskExecutorNode)
.taskActionClient(taskActionClientFactory.create(task))
.emitter(emitter)

View File

@ -775,6 +775,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
)
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
null,
getId(),
dataSchema,
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),

View File

@ -187,6 +187,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
)
{
return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask(
toolbox.getSegmentLoaderConfig(),
getId(),
dataSchema,
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),

View File

@ -48,6 +48,7 @@ import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
@ -94,6 +95,7 @@ public class TaskToolboxTest
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
private Cache mockCache = EasyMock.createMock(Cache.class);
private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class);
private SegmentLoaderConfig segmentLoaderConfig = EasyMock.createMock(SegmentLoaderConfig.class);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -115,6 +117,7 @@ public class TaskToolboxTest
.build();
taskToolbox = new TaskToolboxFactory(
segmentLoaderConfig,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
mockTaskActionClientFactory,
@ -162,6 +165,12 @@ public class TaskToolboxTest
Assert.assertEquals(mockDataSegmentArchiver, taskToolbox.build(task).getDataSegmentArchiver());
}
@Test
public void testGetSegmentLoaderConfig()
{
Assert.assertEquals(segmentLoaderConfig, taskToolbox.build(task).getSegmentLoaderConfig());
}
@Test
public void testGetSegmentAnnouncer()
{

View File

@ -1606,6 +1606,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
};
final TestUtils testUtils = new TestUtils();
taskToolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
taskActionClientFactory,

View File

@ -979,6 +979,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
};
final TestUtils testUtils = new TestUtils();
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,

View File

@ -36,6 +36,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@ -50,6 +51,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
@ -72,6 +74,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
)
{
realtimeAppenderator = Appenderators.createRealtime(
segmentLoaderConfig,
taskId,
schema,
config,

View File

@ -97,6 +97,7 @@ public class SingleTaskBackgroundRunnerTest
final ServiceEmitter emitter = new NoopServiceEmitter();
EmittingLogger.registerEmitter(emitter);
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null,
EasyMock.createMock(TaskActionClientFactory.class),

View File

@ -619,6 +619,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.build();
return new TaskToolboxFactory(
null,
taskConfig,
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
tac,

View File

@ -77,6 +77,7 @@ public class TestTaskToolboxFactory extends TaskToolboxFactory
)
{
super(
null,
bob.config,
bob.taskExecutorNode,
bob.taskActionClientFactory,

View File

@ -662,6 +662,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
toolboxFactory = new TaskToolboxFactory(
null,
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,

View File

@ -128,6 +128,7 @@ public class WorkerTaskManagerTest
jsonMapper,
new TestTaskRunner(
new TaskToolboxFactory(
null,
taskConfig,
null,
taskActionClientFactory,

View File

@ -170,6 +170,7 @@ public class WorkerTaskMonitorTest
jsonMapper,
new SingleTaskBackgroundRunner(
new TaskToolboxFactory(
null,
taskConfig,
null,
taskActionClientFactory,

View File

@ -35,6 +35,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
@ -43,6 +44,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
public class Appenderators
{
public static Appenderator createRealtime(
SegmentLoaderConfig segmentLoaderConfig,
String id,
DataSchema schema,
AppenderatorConfig config,
@ -65,6 +67,7 @@ public class Appenderators
)
{
return new StreamAppenderator(
segmentLoaderConfig,
id,
schema,
config,

View File

@ -36,6 +36,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
@ -64,6 +65,7 @@ public interface AppenderatorsManager
* used for query processing.
*/
Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,

View File

@ -96,6 +96,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
{
final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
return Appenderators.createRealtime(
null,
schema.getDataSource(),
schema,
config.withBasePersistDirectory(

View File

@ -37,6 +37,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
@ -55,6 +56,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,

View File

@ -37,6 +37,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
@ -61,6 +62,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
@ -88,6 +90,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
throw new ISE("A batch appenderator was already created for this peon's task.");
} else {
realtimeAppenderator = Appenderators.createRealtime(
segmentLoaderConfig,
taskId,
schema,
config,

View File

@ -67,6 +67,7 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
@ -95,6 +96,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -170,6 +173,9 @@ public class StreamAppenderator implements Appenderator
private volatile Throwable persistError;
private final SegmentLoaderConfig segmentLoaderConfig;
private ScheduledExecutorService exec;
/**
* This constructor allows the caller to provide its own SinkQuerySegmentWalker.
*
@ -180,6 +186,7 @@ public class StreamAppenderator implements Appenderator
* Appenderators.
*/
StreamAppenderator(
SegmentLoaderConfig segmentLoaderConfig,
String id,
DataSchema schema,
AppenderatorConfig tuningConfig,
@ -196,6 +203,7 @@ public class StreamAppenderator implements Appenderator
boolean useMaxMemoryEstimates
)
{
this.segmentLoaderConfig = segmentLoaderConfig;
this.myId = id;
this.schema = Preconditions.checkNotNull(schema, "schema");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
@ -221,6 +229,20 @@ public class StreamAppenderator implements Appenderator
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck();
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
this.exec = Executors.newScheduledThreadPool(
1,
Execs.makeThreadFactory("StreamAppenderSegmentRemoval-%s")
);
}
@VisibleForTesting
void setExec(ScheduledExecutorService testExec)
{
if (exec != null) {
exec.shutdown();
}
exec = testExec;
}
@Override
@ -1170,6 +1192,10 @@ public class StreamAppenderator implements Appenderator
if (intermediateTempExecutor != null) {
intermediateTempExecutor.shutdownNow();
}
if (exec != null) {
exec.shutdownNow();
}
}
private void resetNextFlush()
@ -1400,6 +1426,7 @@ public class StreamAppenderator implements Appenderator
.emit();
}
Runnable removeRunnable = () -> {
droppingSinks.remove(identifier);
sinkTimeline.remove(
sink.getInterval(),
@ -1418,6 +1445,29 @@ public class StreamAppenderator implements Appenderator
}
log.info("Dropped segment[%s].", identifier);
};
if (segmentLoaderConfig == null) {
log.info(
"Unannounced segment[%s]",
identifier
);
removeRunnable.run();
} else {
log.info(
"Unannounced segment[%s], scheduling drop in [%d] millisecs",
identifier,
segmentLoaderConfig.getDropSegmentDelayMillis()
);
// Keep the segments in the cache and sinkTimeline for dropSegmentDelay after unannouncing the segments
// This way, in transit queries which still see the segments in this peon would be able to query the
// segments and not throw NullPtr exceptions.
exec.schedule(
removeRunnable,
segmentLoaderConfig.getDropSegmentDelayMillis(),
TimeUnit.MILLISECONDS
);
}
return null;
}

View File

@ -62,6 +62,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@ -149,6 +150,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
@Override
public Appenderator createRealtimeAppenderatorForTask(
SegmentLoaderConfig segmentLoaderConfig,
String taskId,
DataSchema schema,
AppenderatorConfig config,
@ -177,6 +179,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
);
Appenderator appenderator = new StreamAppenderator(
null,
taskId,
schema,
rewriteAppenderatorConfigMemoryLimits(config),

View File

@ -61,6 +61,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamAppenderatorTest extends InitializedNullHandlingTest
@ -950,6 +953,101 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
}
}
@Test
public void testDelayedDrop() throws Exception
{
class TestScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor
{
ScheduledFuture<?> scheduledFuture;
public TestScheduledThreadPoolExecutor()
{
super(1);
}
@Override
public ScheduledFuture<?> schedule(
Runnable command,
long delay, TimeUnit unit
)
{
ScheduledFuture<?> future = super.schedule(command, delay, unit);
scheduledFuture = future;
return future;
}
ScheduledFuture<?> getLastScheduledFuture()
{
return scheduledFuture;
}
}
try (
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.enablePushFailure(true)
.withSegmentDropDelayInMilli(1000)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
TestScheduledThreadPoolExecutor testExec = new TestScheduledThreadPoolExecutor();
((StreamAppenderator) appenderator).setExec(testExec);
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
// Query1: 2000/2001
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2001")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
appenderator.drop(IDENTIFIERS.get(0)).get();
// segment 0 won't be dropped immediately
final List<Result<TimeseriesResultValue>> results1 =
QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query1",
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
)
),
results1
);
// segment 0 would eventually be dropped at some time after 1 secs drop delay
testExec.getLastScheduledFuture().get(5000, TimeUnit.MILLISECONDS);
final List<Result<TimeseriesResultValue>> results = QueryPlus.wrap(query1)
.run(appenderator, ResponseContext.createEmpty())
.toList();
List<Result<TimeseriesResultValue>> expectedResults =
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L))
)
);
Assert.assertEquals("query after dropped", expectedResults, results);
}
}
@Test
public void testQueryByIntervals() throws Exception
{

View File

@ -63,6 +63,7 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
@ -93,6 +94,7 @@ public class StreamAppenderatorTester implements AutoCloseable
private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
public StreamAppenderatorTester(
final int delayInMilli,
final int maxRowsInMemory,
final long maxSizeInBytes,
final File basePersistDirectory,
@ -209,7 +211,9 @@ public class StreamAppenderatorTester implements AutoCloseable
throw new UnsupportedOperationException();
}
};
if (delayInMilli <= 0) {
appenderator = Appenderators.createRealtime(
null,
schema.getDataSource(),
schema,
tuningConfig,
@ -246,6 +250,54 @@ public class StreamAppenderatorTester implements AutoCloseable
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
true
);
} else {
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public int getDropSegmentDelayMillis()
{
return delayInMilli;
}
};
appenderator = Appenderators.createRealtime(
segmentLoaderConfig,
schema.getDataSource(),
schema,
tuningConfig,
metrics,
dataSegmentPusher,
objectMapper,
indexIO,
indexMerger,
new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.of(
TimeseriesQuery.class, new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
ScanQuery.class, new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory()
),
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
),
new NoopDataSegmentAnnouncer(),
emitter,
new ForwardingQueryProcessingPool(queryExecutor),
NoopJoinableFactory.INSTANCE,
MapCache.create(2048),
new CacheConfig(),
new CachePopulatorStats(),
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
true
);
}
}
private long getDefaultMaxBytesInMemory()
@ -305,6 +357,7 @@ public class StreamAppenderatorTester implements AutoCloseable
private boolean enablePushFailure;
private RowIngestionMeters rowIngestionMeters;
private boolean skipBytesInMemoryOverheadCheck;
private int delayInMilli = 0;
public Builder maxRowsInMemory(final int maxRowsInMemory)
{
@ -342,9 +395,16 @@ public class StreamAppenderatorTester implements AutoCloseable
return this;
}
public Builder withSegmentDropDelayInMilli(int delayInMilli)
{
this.delayInMilli = delayInMilli;
return this;
}
public StreamAppenderatorTester build()
{
return new StreamAppenderatorTester(
delayInMilli,
maxRowsInMemory,
maxSizeInBytes,
Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory"),