From e6e068ce60a073700701dd2a7b2b59218d59a2c3 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 7 Sep 2018 13:17:49 -0700 Subject: [PATCH] Add support for 'maxTotalRows' to incremental publishing kafka indexing task and appenderator based realtime task (#6129) * resolves #5898 by adding maxTotalRows to incremental publishing kafka index task and appenderator based realtime indexing task, as available in IndexTask * address review comments * changes due to review * merge fail --- .../extensions-core/kafka-ingestion.md | 11 +- ...ementalPublishingKafkaIndexTaskRunner.java | 9 +- .../indexing/kafka/KafkaTuningConfig.java | 19 ++ .../kafka/supervisor/KafkaSupervisorSpec.java | 1 + .../KafkaSupervisorTuningConfig.java | 3 + .../indexing/kafka/KafkaIndexTaskTest.java | 207 +++++++++++++---- .../indexing/kafka/KafkaTuningConfigTest.java | 7 + .../kafka/supervisor/KafkaSupervisorTest.java | 1 + .../RealtimeAppenderatorTuningConfig.java | 32 ++- .../AppenderatorDriverRealtimeIndexTask.java | 6 +- .../druid/indexing/common/task/IndexTask.java | 2 + ...penderatorDriverRealtimeIndexTaskTest.java | 218 +++++++++++++----- .../realtime/appenderator/Appenderator.java | 2 +- .../appenderator/AppenderatorConfig.java | 26 +++ .../AppenderatorDriverAddResult.java | 10 + .../appenderator/AppenderatorImpl.java | 78 +++---- .../druid/segment/realtime/plumber/Sink.java | 10 +- 17 files changed, 463 insertions(+), 179 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 4c51b0562a4..7fbacf4c587 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -117,7 +117,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`type`|String|The indexing task type, this should always be `kafka`.|yes| |`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists). Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)| |`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. Normally this is computed internally and user does not need to set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). |no (default == One-sixth of max JVM memory)| -|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)| +|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)| +|`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| |`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| @@ -131,7 +132,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)| |`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.|no (default == PT30S, min == PT5S)| |`segmentWriteOutMediumFactory`|String|Segment write-out medium to use when creating segments. See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../../configuration/index.html#segmentwriteoutmediumfactory) for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)| -|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)| +|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == P2147483647D)| #### IndexSpec @@ -314,10 +315,10 @@ In this way, configuration changes can be applied without requiring any pause in ### On the Subject of Segments Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment -granular interval until maxRowsPerSegment or intermediateHandoffPeriod limit is reached, at this point a new partition +granular interval until maxRowsPerSegment, maxTotalRows or intermediateHandoffPeriod limit is reached, at this point a new partition for this segment granularity is created for further events. Kafka Indexing Task also does incremental hand-offs which -means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment -or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off +means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment, +maxTotalRows or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off and new set of segments will be created for further events. This means that the task can run for longer durations of time without accumulating old segments locally on Middle Manager nodes and it is encouraged to do so. diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 7e4ed13b3ee..96167571f72 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -431,9 +431,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) { status = Status.PUBLISHING; - } - - if (stopRequested.get()) { break; } @@ -530,10 +527,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask if (addResult.isOk()) { // If the number of rows in the segment exceeds the threshold after adding a row, // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment. - if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { - if (!sequenceToUse.isCheckpointed()) { - sequenceToCheckpoint = sequenceToUse; - } + if (addResult.isPushRequired(tuningConfig) && !sequenceToUse.isCheckpointed()) { + sequenceToCheckpoint = sequenceToUse; } isPersistRequired |= addResult.isPersistRequired(); } else { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaTuningConfig.java index f28413c4453..fd5e666166b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaTuningConfig.java @@ -40,6 +40,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private final int maxRowsInMemory; private final long maxBytesInMemory; private final int maxRowsPerSegment; + @Nullable + private final Long maxTotalRows; private final Period intermediatePersistPeriod; private final File basePersistDirectory; @Deprecated @@ -61,6 +63,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @@ -85,6 +88,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig // initializing this to 0, it will be lazily initialized to a value // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; + this.maxTotalRows = maxTotalRows; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaults.getIntermediatePersistPeriod() : intermediatePersistPeriod; @@ -123,6 +127,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig config.maxRowsInMemory, config.maxBytesInMemory, config.maxRowsPerSegment, + config.maxTotalRows, config.intermediatePersistPeriod, config.basePersistDirectory, config.maxPendingPersists, @@ -153,12 +158,22 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig return maxBytesInMemory; } + @Override @JsonProperty public int getMaxRowsPerSegment() { return maxRowsPerSegment; } + + @JsonProperty + @Override + @Nullable + public Long getMaxTotalRows() + { + return maxTotalRows; + } + @Override @JsonProperty public Period getIntermediatePersistPeriod() @@ -255,6 +270,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, + maxTotalRows, intermediatePersistPeriod, dir, maxPendingPersists, @@ -284,6 +300,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig return maxRowsInMemory == that.maxRowsInMemory && maxRowsPerSegment == that.maxRowsPerSegment && maxBytesInMemory == that.maxBytesInMemory && + Objects.equals(maxTotalRows, that.maxTotalRows) && maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && handoffConditionTimeout == that.handoffConditionTimeout && @@ -305,6 +322,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig maxRowsInMemory, maxRowsPerSegment, maxBytesInMemory, + maxTotalRows, intermediatePersistPeriod, basePersistDirectory, maxPendingPersists, @@ -326,6 +344,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig return "KafkaTuningConfig{" + "maxRowsInMemory=" + maxRowsInMemory + ", maxRowsPerSegment=" + maxRowsPerSegment + + ", maxTotalRows=" + maxTotalRows + ", maxBytesInMemory=" + maxBytesInMemory + ", intermediatePersistPeriod=" + intermediatePersistPeriod + ", basePersistDirectory=" + basePersistDirectory + diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 160ed4bad17..fc620ea61ac 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -97,6 +97,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 6ff590f809b..3416941df78 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -43,6 +43,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") Long maxTotalRows, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @@ -69,6 +70,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, + maxTotalRows, intermediatePersistPeriod, basePersistDirectory, maxPendingPersists, @@ -134,6 +136,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig return "KafkaSupervisorTuningConfig{" + "maxRowsInMemory=" + getMaxRowsInMemory() + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxTotalRows=" + getMaxTotalRows() + ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + ", basePersistDirectory=" + getBasePersistDirectory() + diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index a5d450fc656..611533bbcfb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -104,7 +104,6 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryToolChest; -import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.Result; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -169,7 +168,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -200,6 +198,7 @@ public class KafkaIndexTaskTest private boolean resetOffsetAutomatically = false; private boolean doHandoff = true; private Integer maxRowsPerSegment = null; + private Long maxTotalRows = null; private Period intermediateHandoffPeriod = null; private TaskToolboxFactory toolboxFactory; @@ -214,6 +213,8 @@ public class KafkaIndexTaskTest private File reportsFile; private RowIngestionMetersFactory rowIngestionMetersFactory; + private int handoffCount = 0; + // This should be removed in versions greater that 0.12.x // isIncrementalHandoffSupported should always be set to true in those later versions @Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}") @@ -476,7 +477,7 @@ public class KafkaIndexTaskTest } final String baseSequenceName = "sequence0"; // as soon as any segment has more than one record, incremental publishing should happen - maxRowsPerSegment = 1; + maxRowsPerSegment = 2; // Insert data try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { @@ -560,6 +561,125 @@ public class KafkaIndexTaskTest Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); } + @Test(timeout = 60_000L) + public void testIncrementalHandOffMaxTotalRows() throws Exception + { + if (!isIncrementalHandoffSupported) { + return; + } + final String baseSequenceName = "sequence0"; + // incremental publish should happen every 3 records + maxRowsPerSegment = Integer.MAX_VALUE; + maxTotalRows = 3L; + + // Insert data + int numToAdd = records.size() - 2; + + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (int i = 0; i < numToAdd; i++) { + kafkaProducer.send(records.get(i)).get(); + } + + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); + + final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); + final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 3L, 1, 0L)); + final KafkaPartitions checkpoint2 = new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 0L)); + + final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 2L)); + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + true, + null, + null, + false + ) + ); + final ListenableFuture future = runTask(task); + while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + + Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets)); + task.getRunner().setEndOffsets(currentOffsets, false); + + while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(10); + } + + // add remaining records + for (int i = numToAdd; i < records.size(); i++) { + kafkaProducer.send(records.get(i)).get(); + } + final Map nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + + Assert.assertTrue(checkpoint2.getPartitionOffsetMap().equals(nextOffsets)); + task.getRunner().setEndOffsets(nextOffsets, false); + + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + Assert.assertEquals(2, checkpointRequestsHash.size()); + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + ) + ) + ); + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) + ) + ) + ); + + // Check metrics + Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); + SegmentDescriptor desc3 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc4 = SD(task, "2011/P1D", 0); + SegmentDescriptor desc5 = SD(task, "2011/P1D", 1); + SegmentDescriptor desc6 = SD(task, "2012/P1D", 0); + SegmentDescriptor desc7 = SD(task, "2013/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 2L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3)); + Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4)) + && ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) || + (ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4)) + && ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5)))); + Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); + } + } + @Test(timeout = 60_000L) public void testTimeBasedIncrementalHandOff() throws Exception { @@ -1821,23 +1941,18 @@ public class KafkaIndexTaskTest runningTasks.add(task); } return taskExec.submit( - new Callable() - { - @Override - public TaskStatus call() - { - try { - if (task.isReady(toolbox.getTaskActionClient())) { - return task.run(toolbox); - } else { - throw new ISE("Task is not ready"); - } - } - catch (Exception e) { - log.warn(e, "Task failed"); - return TaskStatus.failure(task.getId()); + () -> { + try { + if (task.isReady(toolbox.getTaskActionClient())) { + return task.run(toolbox); + } else { + throw new ISE("Task is not ready"); } } + catch (Exception e) { + log.warn(e, "Task failed"); + return TaskStatus.failure(task.getId()); + } } ); } @@ -1884,6 +1999,7 @@ public class KafkaIndexTaskTest 1000, null, maxRowsPerSegment, + maxTotalRows, new Period("P1Y"), null, null, @@ -1928,6 +2044,7 @@ public class KafkaIndexTaskTest 1000, null, maxRowsPerSegment, + null, new Period("P1Y"), null, null, @@ -1995,13 +2112,8 @@ public class KafkaIndexTaskTest new TimeseriesQueryRunnerFactory( new TimeseriesQueryQueryToolChest(queryRunnerDecorator), new TimeseriesQueryEngine(), - new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - // do nothing - } + (query, future) -> { + // do nothing } ) ) @@ -2084,37 +2196,30 @@ public class KafkaIndexTaskTest taskStorage, taskActionToolbox ); - final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory() + final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier() { @Override - public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) { - return new SegmentHandoffNotifier() - { - @Override - public boolean registerSegmentHandoffCallback( - SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable - ) - { - if (doHandoff) { - // Simulate immediate handoff - exec.execute(handOffRunnable); - } - return true; - } + if (doHandoff) { + // Simulate immediate handoff + exec.execute(handOffRunnable); + } + return true; + } - @Override - public void start() - { - //Noop - } + @Override + public void start() + { + //Noop + } - @Override - public void close() - { - //Noop - } - }; + @Override + public void close() + { + //Noop } }; final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaTuningConfigTest.java index 5dff8e4a41c..69cf186a3c3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -58,6 +58,7 @@ public class KafkaTuningConfigTest Assert.assertNotNull(config.getBasePersistDirectory()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); + Assert.assertEquals(null, config.getMaxTotalRows()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); @@ -73,6 +74,7 @@ public class KafkaTuningConfigTest + " \"basePersistDirectory\": \"/tmp/xxx\",\n" + " \"maxRowsInMemory\": 100,\n" + " \"maxRowsPerSegment\": 100,\n" + + " \"maxTotalRows\": 1000,\n" + " \"intermediatePersistPeriod\": \"PT1H\",\n" + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" @@ -92,6 +94,8 @@ public class KafkaTuningConfigTest Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); Assert.assertEquals(100, config.getMaxRowsInMemory()); Assert.assertEquals(100, config.getMaxRowsPerSegment()); + Assert.assertNotEquals(null, config.getMaxTotalRows()); + Assert.assertEquals(1000, config.getMaxTotalRows().longValue()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(true, config.isReportParseExceptions()); @@ -105,6 +109,7 @@ public class KafkaTuningConfigTest 1, null, 2, + 10L, new Period("PT3S"), new File("/tmp/xxx"), 4, @@ -123,6 +128,8 @@ public class KafkaTuningConfigTest Assert.assertEquals(1, copy.getMaxRowsInMemory()); Assert.assertEquals(2, copy.getMaxRowsPerSegment()); + Assert.assertNotEquals(null, copy.getMaxTotalRows()); + Assert.assertEquals(10L, copy.getMaxTotalRows().longValue()); Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod()); Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory()); Assert.assertEquals(0, copy.getMaxPendingPersists()); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 5d773db661d..964f5f7cac6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -194,6 +194,7 @@ public class KafkaSupervisorTest extends EasyMockSupport 1000, null, 50000, + null, new Period("P1Y"), new File("/test"), null, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 500f21f13e3..a5c8e7d5032 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -53,8 +53,10 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera } private final int maxRowsInMemory; - private final int maxRowsPerSegment; private final long maxBytesInMemory; + private final int maxRowsPerSegment; + @Nullable + private final Long maxTotalRows; private final Period intermediatePersistPeriod; private final File basePersistDirectory; private final int maxPendingPersists; @@ -73,8 +75,9 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera @JsonCreator public RealtimeAppenderatorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, - @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @@ -94,6 +97,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera // initializing this to 0, it will be lazily intialized to a value // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; + this.maxTotalRows = maxTotalRows; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -105,8 +109,8 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera ? defaultReportParseExceptions : reportParseExceptions; this.publishAndHandoffTimeout = publishAndHandoffTimeout == null - ? defaultPublishAndHandoffTimeout - : publishAndHandoffTimeout; + ? defaultPublishAndHandoffTimeout + : publishAndHandoffTimeout; Preconditions.checkArgument(this.publishAndHandoffTimeout >= 0, "publishAndHandoffTimeout must be >= 0"); this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout; @@ -117,12 +121,16 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera this.maxParseExceptions = 0; this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); } else { - this.maxParseExceptions = maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions; + this.maxParseExceptions = maxParseExceptions == null + ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS + : maxParseExceptions; this.maxSavedParseExceptions = maxSavedParseExceptions == null ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS : maxSavedParseExceptions; } - this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; + this.logParseExceptions = logParseExceptions == null + ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS + : logParseExceptions; } @Override @@ -138,12 +146,21 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera return maxBytesInMemory; } + @Override @JsonProperty public int getMaxRowsPerSegment() { return maxRowsPerSegment; } + @Override + @JsonProperty + @Nullable + public Long getMaxTotalRows() + { + return maxTotalRows; + } + @Override @JsonProperty public Period getIntermediatePersistPeriod() @@ -227,8 +244,9 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera { return new RealtimeAppenderatorTuningConfig( maxRowsInMemory, - maxRowsPerSegment, maxBytesInMemory, + maxRowsPerSegment, + maxTotalRows, intermediatePersistPeriod, dir, maxPendingPersists, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 49a9333567e..11d14bb416f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -322,9 +322,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); if (addResult.isOk()) { - if (addResult.getNumRowsInSegment() >= tuningConfig.getMaxRowsPerSegment()) { + if (addResult.isPushRequired(tuningConfig)) { publishSegments(driver, publisher, committerSupplier, sequenceName); - sequenceNumber++; sequenceName = makeSequenceName(getId(), sequenceNumber); } @@ -542,7 +541,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements private Map getTaskCompletionUnparseableEvents() { Map unparseableEventsMap = Maps.newHashMap(); - List buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions); + List buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions( + savedParseExceptions); if (buildSegmentsParseExceptionMessages != null) { unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 788dc852a4a..5d6bef1b859 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1485,6 +1485,8 @@ public class IndexTask extends AbstractTask implements ChatHandler } @JsonProperty + @Override + @Nullable public Long getMaxTotalRows() { return maxTotalRows; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 1efcd94da19..da6cd0b4293 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -102,7 +102,6 @@ import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryToolChest; -import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.Result; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -158,7 +157,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -528,6 +526,73 @@ public class AppenderatorDriverRealtimeIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); } + @Test(timeout = 60_000L) + public void testMaxTotalRows() throws Exception + { + // Expect 2 segments as we will hit maxTotalRows + expectPublishedSegments(2); + + final AppenderatorDriverRealtimeIndexTask task = + makeRealtimeTask(null, Integer.MAX_VALUE, 1500L); + final ListenableFuture statusFuture = runTask(task); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + // maxTotalRows is 1500 + for (int i = 0; i < 2000; i++) { + firehose.addRows( + ImmutableList.of( + ImmutableMap.of("t", now.getMillis(), "dim1", "foo-" + i, "met1", "1") + ) + ); + } + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + Collection publishedSegments = awaitSegments(); + + // Check metrics. + Assert.assertEquals(2000, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + + // Do some queries. + Assert.assertEquals(2000, sumMetric(task, null, "rows").longValue()); + Assert.assertEquals(2000, sumMetric(task, null, "met1").longValue()); + + awaitHandoffs(); + + Assert.assertEquals(2, publishedSegments.size()); + for (DataSegment publishedSegment : publishedSegments) { + Pair executorRunnablePair = handOffCallbacks.get( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ) + ); + Assert.assertNotNull( + publishedSegment + " missing from handoff callbacks: " + handOffCallbacks, + executorRunnablePair + ); + + // Simulate handoff. + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + } + @Test(timeout = 60_000L) public void testTransformSpec() throws Exception { @@ -1209,35 +1274,65 @@ public class AppenderatorDriverRealtimeIndexTaskTest taskLockbox.syncFromStorage(); final TaskToolbox toolbox = taskToolboxFactory.build(task); return taskExec.submit( - new Callable() - { - @Override - public TaskStatus call() throws Exception - { - try { - if (task.isReady(toolbox.getTaskActionClient())) { - return task.run(toolbox); - } else { - throw new ISE("Task is not ready"); - } - } - catch (Exception e) { - log.warn(e, "Task failed"); - throw e; + () -> { + try { + if (task.isReady(toolbox.getTaskActionClient())) { + return task.run(toolbox); + } else { + throw new ISE("Task is not ready"); } } + catch (Exception e) { + log.warn(e, "Task failed"); + throw e; + } } ); } private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(final String taskId) { - return makeRealtimeTask(taskId, TransformSpec.NONE, true, 0, true, 0, 1); + return makeRealtimeTask( + taskId, + TransformSpec.NONE, + true, + 0, + true, + 0, + 1 + ); + } + + private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( + final String taskId, + final Integer maxRowsPerSegment, + final Long maxTotalRows + ) + { + return makeRealtimeTask( + taskId, + TransformSpec.NONE, + true, + 0, + true, + 0, + 1, + maxRowsPerSegment, + maxTotalRows + ); } private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions) { - return makeRealtimeTask(taskId, TransformSpec.NONE, reportParseExceptions, 0, true, null, 1); + return makeRealtimeTask( + taskId, + TransformSpec.NONE, + reportParseExceptions, + 0, + true, + null, + 1 + ); } private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( @@ -1249,6 +1344,32 @@ public class AppenderatorDriverRealtimeIndexTaskTest final Integer maxParseExceptions, final Integer maxSavedParseExceptions ) + { + + return makeRealtimeTask( + taskId, + transformSpec, + reportParseExceptions, + handoffTimeout, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions, + 1000, + null + ); + } + + private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( + final String taskId, + final TransformSpec transformSpec, + final boolean reportParseExceptions, + final long handoffTimeout, + final Boolean logParseExceptions, + final Integer maxParseExceptions, + final Integer maxSavedParseExceptions, + final Integer maxRowsPerSegment, + final Long maxTotalRows + ) { ObjectMapper objectMapper = new DefaultObjectMapper(); DataSchema dataSchema = new DataSchema( @@ -1283,9 +1404,10 @@ public class AppenderatorDriverRealtimeIndexTaskTest null ); RealtimeAppenderatorTuningConfig tuningConfig = new RealtimeAppenderatorTuningConfig( - 1000, 1000, null, + maxRowsPerSegment, + maxTotalRows, null, null, null, @@ -1425,49 +1547,37 @@ public class AppenderatorDriverRealtimeIndexTaskTest new TimeseriesQueryRunnerFactory( new TimeseriesQueryQueryToolChest(queryRunnerDecorator), new TimeseriesQueryEngine(), - new QueryWatcher() - { - @Override - public void registerQuery(Query query, ListenableFuture future) - { - // do nothing - } + (query, future) -> { + // do nothing } ) ) ); handOffCallbacks = new ConcurrentHashMap<>(); - final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory() + final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier() { @Override - public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) { - return new SegmentHandoffNotifier() - { - @Override - public boolean registerSegmentHandoffCallback( - SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable - ) - { - handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); - handoffLatch.countDown(); - return true; - } - - @Override - public void start() - { - //Noop - } - - @Override - public void close() - { - //Noop - } - - }; + handOffCallbacks.put(descriptor, new Pair<>(exec, handOffRunnable)); + handoffLatch.countDown(); + return true; } + + @Override + public void start() + { + //Noop + } + + @Override + public void close() + { + //Noop + } + }; final TestUtils testUtils = new TestUtils(); rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java index f33ed1b5d1b..3026eb4ded9 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java @@ -119,7 +119,7 @@ public interface Appenderator extends QuerySegmentWalker, Closeable int getRowCount(SegmentIdentifier identifier); /** - * Returns the number of total rows in this appenderator. + * Returns the number of total rows in this appenderator of all segments pending push. * * @return total number of rows */ diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 15ff7d9d21f..f09726ab60e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -30,12 +30,38 @@ public interface AppenderatorConfig { boolean isReportParseExceptions(); + /** + * Maximum number of rows in memory before persisting to local storage + */ int getMaxRowsInMemory(); + /** + * Maximum number of bytes (estimated) to store in memory before persisting to local storage + */ long getMaxBytesInMemory(); int getMaxPendingPersists(); + /** + * Maximum number of rows in a single segment before pushing to deep storage + */ + default int getMaxRowsPerSegment() + { + return Integer.MAX_VALUE; + } + + /** + * Maximum number of rows across all segments before pushing to deep storage + */ + @Nullable + default Long getMaxTotalRows() + { + throw new UnsupportedOperationException("maxTotalRows is not implemented."); + } + + /** + * Period that sets frequency to persist to local storage if no other thresholds are met + */ Period getIntermediatePersistPeriod(); IndexSpec getIndexSpec(); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java index 0dc1ab52cd7..e80658ee2cc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java @@ -100,6 +100,16 @@ public class AppenderatorDriverAddResult return isPersistRequired; } + public boolean isPushRequired(AppenderatorConfig tuningConfig) + { + boolean overThreshold = getNumRowsInSegment() >= tuningConfig.getMaxRowsPerSegment(); + Long maxTotal = tuningConfig.getMaxTotalRows(); + if (maxTotal != null) { + overThreshold |= getTotalNumRowsInAppenderator() >= maxTotal; + } + return overThreshold; + } + @Nullable public ParseException getParseException() { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index e245967daf8..808c8ff1dfb 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -83,7 +83,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; @@ -91,11 +90,9 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -264,8 +261,8 @@ public class AppenderatorImpl implements Appenderator final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd; rowsCurrentlyInMemory.addAndGet(numAddedRows); - totalRows.addAndGet(numAddedRows); bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd); + totalRows.addAndGet(numAddedRows); boolean isPersistRequired = false; boolean persist = false; @@ -438,20 +435,15 @@ public class AppenderatorImpl implements Appenderator if (persistExecutor != null) { final ListenableFuture uncommitFuture = persistExecutor.submit( - new Callable() - { - @Override - public Object call() throws Exception - { - try { - commitLock.lock(); - objectMapper.writeValue(computeCommitFile(), Committed.nil()); - } - finally { - commitLock.unlock(); - } - return null; + () -> { + try { + commitLock.lock(); + objectMapper.writeValue(computeCommitFile(), Committed.nil()); } + finally { + commitLock.unlock(); + } + return null; } ); @@ -610,7 +602,9 @@ public class AppenderatorImpl implements Appenderator throw new ISE("No sink for identifier: %s", identifier); } theSinks.put(identifier, sink); - sink.finishWriting(); + if (sink.finishWriting()) { + totalRows.addAndGet(-sink.getNumRows()); + } } return Futures.transform( @@ -656,8 +650,8 @@ public class AppenderatorImpl implements Appenderator * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only * be run in the single-threaded pushExecutor. * - * @param identifier sink identifier - * @param sink sink to push + * @param identifier sink identifier + * @param sink sink to push * @param useUniquePath true if the segment should be written to a path with a unique identifier * * @return segment descriptor, or null if the sink is no longer valid @@ -986,6 +980,8 @@ public class AppenderatorImpl implements Appenderator commitLock.unlock(); } + int rowsSoFar = 0; + log.info("Loading sinks from[%s]: %s", baseDir, committed.getHydrants().keySet()); for (File sinkDir : files) { @@ -1011,26 +1007,12 @@ public class AppenderatorImpl implements Appenderator // To avoid reading and listing of "merged" dir and other special files final File[] sinkFiles = sinkDir.listFiles( - new FilenameFilter() - { - @Override - public boolean accept(File dir, String fileName) - { - return !(Ints.tryParse(fileName) == null); - } - } + (dir, fileName) -> !(Ints.tryParse(fileName) == null) ); Arrays.sort( sinkFiles, - new Comparator() - { - @Override - public int compare(File o1, File o2) - { - return Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName())); - } - } + (o1, o2) -> Ints.compare(Integer.parseInt(o1.getName()), Integer.parseInt(o2.getName())) ); List hydrants = Lists.newArrayList(); @@ -1074,6 +1056,7 @@ public class AppenderatorImpl implements Appenderator null, hydrants ); + rowsSoFar += currSink.getNumRows(); sinks.put(identifier, currSink); sinkTimeline.add( currSink.getInterval(), @@ -1094,14 +1077,7 @@ public class AppenderatorImpl implements Appenderator final Set loadedSinks = Sets.newHashSet( Iterables.transform( sinks.keySet(), - new Function() - { - @Override - public String apply(SegmentIdentifier input) - { - return input.getIdentifierAsString(); - } - } + input -> input.getIdentifierAsString() ) ); final Set missingSinks = Sets.difference(committed.getHydrants().keySet(), loadedSinks); @@ -1109,6 +1085,7 @@ public class AppenderatorImpl implements Appenderator throw new ISE("Missing committed sinks [%s]", Joiner.on(", ").join(missingSinks)); } + totalRows.set(rowsSoFar); return committed.getMetadata(); } @@ -1119,16 +1096,17 @@ public class AppenderatorImpl implements Appenderator ) { // Ensure no future writes will be made to this sink. - sink.finishWriting(); + if (sink.finishWriting()) { + // Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement, + // i.e. those that haven't been persisted for *InMemory counters, or pushed to deep storage for the total counter. + rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); + bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory()); + totalRows.addAndGet(-sink.getNumRows()); + } // Mark this identifier as dropping, so no future push tasks will pick it up. droppingSinks.add(identifier); - // Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks). - rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); - totalRows.addAndGet(-sink.getNumRows()); - bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory()); - // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( pushBarrier(), diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java index e2f66f675d2..c11d25ddff1 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java @@ -223,12 +223,20 @@ public class Sink implements Iterable return !writable; } - public void finishWriting() + /** + * Marks sink as 'finished', preventing further writes. + * @return 'true' if sink was sucessfully finished, 'false' if sink was already finished + */ + public boolean finishWriting() { synchronized (hydrantLock) { + if (!writable) { + return false; + } writable = false; clearDedupCache(); } + return true; } public DataSegment getSegment()