From a5e9b14be0e4968358f1153d673c7c9a53ad6c70 Mon Sep 17 00:00:00 2001 From: kaisun2000 <52840222+kaisun2000@users.noreply.github.com> Date: Mon, 1 Jan 2024 21:38:28 -0800 Subject: [PATCH] Add delay before the peon drops the segments after publishing them (#15373) Currently in the realtime ingestion (Kafka/Kinesis) case, after publishing the segments, upon acknowledgement from the coordinator that the segments are already placed in some historicals, the peon would unannounce the segments (basically saying the segments are not in this peon anymore to the whole cluster) and drop the segments from cache and sink timeline in one shot. The in transit queries from the brokers that still thinks the segments are in the peon can get a NullPointer exception when the peon is unsetting the hydrants in the sinks. The fix would let the peon to wait for a configurable delay period before dropping segments, remove segments from cache etc after the peon unannounce the segments. This delayed approach is similar to how the historicals handle segments moving out. --- .../druid/indexing/common/TaskToolbox.java | 18 +++ .../indexing/common/TaskToolboxFactory.java | 5 + .../AppenderatorDriverRealtimeIndexTask.java | 1 + .../SeekableStreamIndexTask.java | 1 + .../indexing/common/TaskToolboxTest.java | 9 ++ ...penderatorDriverRealtimeIndexTaskTest.java | 1 + .../common/task/RealtimeIndexTaskTest.java | 1 + .../common/task/TestAppenderatorsManager.java | 3 + .../SingleTaskBackgroundRunnerTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 1 + .../overlord/TestTaskToolboxFactory.java | 1 + .../SeekableStreamIndexTaskTestBase.java | 1 + .../worker/WorkerTaskManagerTest.java | 1 + .../worker/WorkerTaskMonitorTest.java | 1 + .../realtime/appenderator/Appenderators.java | 3 + .../appenderator/AppenderatorsManager.java | 2 + .../DefaultRealtimeAppenderatorFactory.java | 1 + ...DummyForInjectionAppenderatorsManager.java | 2 + .../PeonAppenderatorsManager.java | 3 + .../appenderator/StreamAppenderator.java | 80 +++++++++-- .../UnifiedIndexerAppenderatorsManager.java | 3 + .../appenderator/StreamAppenderatorTest.java | 98 +++++++++++++ .../StreamAppenderatorTester.java | 134 +++++++++++++----- 23 files changed, 319 insertions(+), 52 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 9ebfb96b567..3e1ac720bb9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -55,6 +55,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.SegmentCacheManager; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; @@ -78,6 +79,7 @@ import java.util.Collection; */ public class TaskToolbox { + private final SegmentLoaderConfig segmentLoaderConfig; private final TaskConfig config; private final DruidNode taskExecutorNode; private final TaskActionClient taskActionClient; @@ -130,6 +132,7 @@ public class TaskToolbox private final String attemptId; public TaskToolbox( + SegmentLoaderConfig segmentLoaderConfig, TaskConfig config, DruidNode taskExecutorNode, TaskActionClient taskActionClient, @@ -171,6 +174,7 @@ public class TaskToolbox String attemptId ) { + this.segmentLoaderConfig = segmentLoaderConfig; this.config = config; this.taskExecutorNode = taskExecutorNode; this.taskActionClient = taskActionClient; @@ -213,6 +217,11 @@ public class TaskToolbox this.attemptId = attemptId; } + public SegmentLoaderConfig getSegmentLoaderConfig() + { + return segmentLoaderConfig; + } + public TaskConfig getConfig() { return config; @@ -504,6 +513,7 @@ public class TaskToolbox public static class Builder { + private SegmentLoaderConfig segmentLoaderConfig; private TaskConfig config; private DruidNode taskExecutorNode; private TaskActionClient taskActionClient; @@ -550,6 +560,7 @@ public class TaskToolbox public Builder(TaskToolbox other) { + this.segmentLoaderConfig = other.segmentLoaderConfig; this.config = other.config; this.taskExecutorNode = other.taskExecutorNode; this.taskActionClient = other.taskActionClient; @@ -589,6 +600,12 @@ public class TaskToolbox this.shuffleClient = other.shuffleClient; } + public Builder config(final SegmentLoaderConfig segmentLoaderConfig) + { + this.segmentLoaderConfig = segmentLoaderConfig; + return this; + } + public Builder config(final TaskConfig config) { this.config = config; @@ -826,6 +843,7 @@ public class TaskToolbox public TaskToolbox build() { return new TaskToolbox( + segmentLoaderConfig, config, taskExecutorNode, taskActionClient, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 288d89919b9..f2df3ddc3a3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -56,6 +56,7 @@ import org.apache.druid.segment.loading.DataSegmentArchiver; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.DruidNode; @@ -72,6 +73,7 @@ import java.util.function.Function; */ public class TaskToolboxFactory { + private final SegmentLoaderConfig segmentLoaderConfig; private final TaskConfig config; private final DruidNode taskExecutorNode; private final TaskActionClientFactory taskActionClientFactory; @@ -115,6 +117,7 @@ public class TaskToolboxFactory @Inject public TaskToolboxFactory( + SegmentLoaderConfig segmentLoadConfig, TaskConfig config, @Parent DruidNode taskExecutorNode, TaskActionClientFactory taskActionClientFactory, @@ -155,6 +158,7 @@ public class TaskToolboxFactory @AttemptId String attemptId ) { + this.segmentLoaderConfig = segmentLoadConfig; this.config = config; this.taskExecutorNode = taskExecutorNode; this.taskActionClientFactory = taskActionClientFactory; @@ -210,6 +214,7 @@ public class TaskToolboxFactory final File taskWorkDir = config.getTaskWorkDir(task.getId()); return new TaskToolbox.Builder() .config(config) + .config(segmentLoaderConfig) .taskExecutorNode(taskExecutorNode) .taskActionClient(taskActionClientFactory.create(task)) .emitter(emitter) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 3a599dd485b..9e8817fc5cc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -775,6 +775,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements ) { return toolbox.getAppenderatorsManager().createRealtimeAppenderatorForTask( + null, getId(), dataSchema, tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index d74ee5c0be2..c881b3814e3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -187,6 +187,7 @@ public abstract class SeekableStreamIndexTask { + droppingSinks.remove(identifier); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + identifier.getShardSpec().createChunk(sink) + ); + for (FireHydrant hydrant : sink) { + if (cache != null) { + cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + } + hydrant.swapSegment(null); } - hydrant.swapSegment(null); - } - if (removeOnDiskData) { - removeDirectory(computePersistDir(identifier)); - } + if (removeOnDiskData) { + removeDirectory(computePersistDir(identifier)); + } - log.info("Dropped segment[%s].", identifier); + log.info("Dropped segment[%s].", identifier); + }; + + if (segmentLoaderConfig == null) { + log.info( + "Unannounced segment[%s]", + identifier + ); + removeRunnable.run(); + } else { + log.info( + "Unannounced segment[%s], scheduling drop in [%d] millisecs", + identifier, + segmentLoaderConfig.getDropSegmentDelayMillis() + ); + // Keep the segments in the cache and sinkTimeline for dropSegmentDelay after unannouncing the segments + // This way, in transit queries which still see the segments in this peon would be able to query the + // segments and not throw NullPtr exceptions. + exec.schedule( + removeRunnable, + segmentLoaderConfig.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); + } return null; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 0a728eb890c..b9be326c822 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -62,6 +62,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; @@ -149,6 +150,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager @Override public Appenderator createRealtimeAppenderatorForTask( + SegmentLoaderConfig segmentLoaderConfig, String taskId, DataSchema schema, AppenderatorConfig config, @@ -177,6 +179,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager ); Appenderator appenderator = new StreamAppenderator( + null, taskId, schema, rewriteAppenderatorConfigMemoryLimits(config), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 2e05cb9053f..bf3458b0975 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -61,6 +61,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class StreamAppenderatorTest extends InitializedNullHandlingTest @@ -950,6 +953,101 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest } } + @Test + public void testDelayedDrop() throws Exception + { + class TestScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor + { + ScheduledFuture scheduledFuture; + + public TestScheduledThreadPoolExecutor() + { + super(1); + } + + @Override + public ScheduledFuture schedule( + Runnable command, + long delay, TimeUnit unit + ) + { + ScheduledFuture future = super.schedule(command, delay, unit); + scheduledFuture = future; + return future; + } + + ScheduledFuture getLastScheduledFuture() + { + return scheduledFuture; + } + } + + try ( + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(2) + .basePersistDirectory(temporaryFolder.newFolder()) + .enablePushFailure(true) + .withSegmentDropDelayInMilli(1000) + .build()) { + final Appenderator appenderator = tester.getAppenderator(); + TestScheduledThreadPoolExecutor testExec = new TestScheduledThreadPoolExecutor(); + ((StreamAppenderator) appenderator).setExec(testExec); + + appenderator.startJob(); + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil())); + appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil())); + + // Query1: 2000/2001 + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals(ImmutableList.of(Intervals.of("2000/2001"))) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + appenderator.drop(IDENTIFIERS.get(0)).get(); + + // segment 0 won't be dropped immediately + final List> results1 = + QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList(); + Assert.assertEquals( + "query1", + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) + ) + ), + results1 + ); + + // segment 0 would eventually be dropped at some time after 1 secs drop delay + testExec.getLastScheduledFuture().get(5000, TimeUnit.MILLISECONDS); + + final List> results = QueryPlus.wrap(query1) + .run(appenderator, ResponseContext.createEmpty()) + .toList(); + List> expectedResults = + ImmutableList.of( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 4L)) + ) + ); + Assert.assertEquals("query after dropped", expectedResults, results); + } + } + @Test public void testQueryByIntervals() throws Exception { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index 217c90116c3..3663af38b01 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -63,6 +63,7 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; @@ -93,6 +94,7 @@ public class StreamAppenderatorTester implements AutoCloseable private final List pushedSegments = new CopyOnWriteArrayList<>(); public StreamAppenderatorTester( + final int delayInMilli, final int maxRowsInMemory, final long maxSizeInBytes, final File basePersistDirectory, @@ -209,43 +211,93 @@ public class StreamAppenderatorTester implements AutoCloseable throw new UnsupportedOperationException(); } }; - appenderator = Appenderators.createRealtime( - schema.getDataSource(), - schema, - tuningConfig, - metrics, - dataSegmentPusher, - objectMapper, - indexIO, - indexMerger, - new DefaultQueryRunnerFactoryConglomerate( - ImmutableMap.of( - TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(), - new TimeseriesQueryEngine(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER - ), - ScanQuery.class, new ScanQueryRunnerFactory( - new ScanQueryQueryToolChest( - new ScanQueryConfig(), - new DefaultGenericQueryMetricsFactory() - ), - new ScanQueryEngine(), - new ScanQueryConfig() - ) - ) - ), - new NoopDataSegmentAnnouncer(), - emitter, - new ForwardingQueryProcessingPool(queryExecutor), - NoopJoinableFactory.INSTANCE, - MapCache.create(2048), - new CacheConfig(), - new CachePopulatorStats(), - rowIngestionMeters, - new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), - true - ); + if (delayInMilli <= 0) { + appenderator = Appenderators.createRealtime( + null, + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.of( + TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + ScanQuery.class, new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + ), + new NoopDataSegmentAnnouncer(), + emitter, + new ForwardingQueryProcessingPool(queryExecutor), + NoopJoinableFactory.INSTANCE, + MapCache.create(2048), + new CacheConfig(), + new CachePopulatorStats(), + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + true + ); + } else { + SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() + { + @Override + public int getDropSegmentDelayMillis() + { + return delayInMilli; + } + }; + appenderator = Appenderators.createRealtime( + segmentLoaderConfig, + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.of( + TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + ScanQuery.class, new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + ), + new NoopDataSegmentAnnouncer(), + emitter, + new ForwardingQueryProcessingPool(queryExecutor), + NoopJoinableFactory.INSTANCE, + MapCache.create(2048), + new CacheConfig(), + new CachePopulatorStats(), + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + true + ); + } } private long getDefaultMaxBytesInMemory() @@ -305,6 +357,7 @@ public class StreamAppenderatorTester implements AutoCloseable private boolean enablePushFailure; private RowIngestionMeters rowIngestionMeters; private boolean skipBytesInMemoryOverheadCheck; + private int delayInMilli = 0; public Builder maxRowsInMemory(final int maxRowsInMemory) { @@ -342,9 +395,16 @@ public class StreamAppenderatorTester implements AutoCloseable return this; } + public Builder withSegmentDropDelayInMilli(int delayInMilli) + { + this.delayInMilli = delayInMilli; + return this; + } + public StreamAppenderatorTester build() { return new StreamAppenderatorTester( + delayInMilli, maxRowsInMemory, maxSizeInBytes, Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory"),