From 142742f291daaf1ac9afea319cacbbe2a3077952 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 16 Mar 2020 21:39:53 -0700 Subject: [PATCH] add kinesis lag metric (#9509) * add kinesis lag metric * fixes * heh * do it right this time * more test * split out supervisor report lags into lagMillis, remove latest offsets from kinesis supervisor report since always null, review stuffs --- .../kafka/supervisor/KafkaSupervisor.java | 112 +++---- .../KafkaSupervisorReportPayload.java | 2 + .../KafkaSupervisorTuningConfig.java | 3 +- .../kafka/supervisor/KafkaSupervisorTest.java | 6 +- .../kinesis/KinesisRecordSupplier.java | 79 ++++- .../kinesis/supervisor/KinesisSupervisor.java | 48 ++- .../KinesisSupervisorReportPayload.java | 16 +- .../KinesisSupervisorTuningConfig.java | 17 +- .../kinesis/KinesisIndexTaskTest.java | 5 + .../KinesisIndexTaskTuningConfigTest.java | 1 + .../kinesis/KinesisRecordSupplierTest.java | 110 ++++++- .../supervisor/KinesisSupervisorTest.java | 118 ++++++- .../supervisor/SeekableStreamSupervisor.java | 161 ++++++++-- ...SeekableStreamSupervisorReportPayload.java | 25 +- .../SeekableStreamSupervisorTuningConfig.java | 5 +- .../supervisor/TaskReportData.java | 17 +- .../SeekableStreamSupervisorStateTest.java | 288 +++++++++++++++++- 17 files changed, 854 insertions(+), 159 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 99c8105a671..f5cec3d9147 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -50,14 +50,13 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -65,8 +64,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -85,9 +82,6 @@ public class KafkaSupervisor extends SeekableStreamSupervisor }; private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class); - private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000; - private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000; - private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000; private static final Long NOT_SET = -1L; private static final Long END_OF_PARTITION = Long.MAX_VALUE; @@ -132,28 +126,6 @@ public class KafkaSupervisor extends SeekableStreamSupervisor return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper); } - @Override - protected void scheduleReporting(ScheduledExecutorService reportingExec) - { - KafkaSupervisorIOConfig ioConfig = spec.getIoConfig(); - KafkaSupervisorTuningConfig tuningConfig = spec.getTuningConfig(); - reportingExec.scheduleAtFixedRate( - updateCurrentAndLatestOffsets(), - ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up - Math.max( - tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS - ), - TimeUnit.MILLISECONDS - ); - - reportingExec.scheduleAtFixedRate( - emitLag(), - ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up - monitorSchedulerConfig.getEmitterPeriod().getMillis(), - TimeUnit.MILLISECONDS - ); - } - @Override protected int getTaskGroupIdForPartition(Integer partitionId) { @@ -179,7 +151,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor ) { KafkaSupervisorIOConfig ioConfig = spec.getIoConfig(); - Map partitionLag = getLagPerPartition(getHighestCurrentOffsets()); + Map partitionLag = getRecordLagPerPartition(getHighestCurrentOffsets()); return new KafkaSupervisorReportPayload( spec.getDataSchema().getDataSource(), ioConfig.getTopic(), @@ -267,11 +239,38 @@ public class KafkaSupervisor extends SeekableStreamSupervisor return taskList; } + @Override + protected Map getPartitionRecordLag() + { + Map highestCurrentOffsets = getHighestCurrentOffsets(); + + if (latestSequenceFromStream == null) { + return null; + } + + if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) { + log.warn( + "Lag metric: Kafka partitions %s do not match task partitions %s", + latestSequenceFromStream.keySet(), + highestCurrentOffsets.keySet() + ); + } + + return getRecordLagPerPartition(highestCurrentOffsets); + } + + @Nullable + @Override + protected Map getPartitionTimeLag() + { + // time lag not currently support with kafka + return null; + } @Override // suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here @SuppressWarnings("SSBasedInspection") - protected Map getLagPerPartition(Map currentOffsets) + protected Map getRecordLagPerPartition(Map currentOffsets) { return currentOffsets .entrySet() @@ -288,6 +287,12 @@ public class KafkaSupervisor extends SeekableStreamSupervisor ); } + @Override + protected Map getTimeLagPerPartition(Map currentOffsets) + { + return null; + } + @Override protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map) { @@ -300,51 +305,6 @@ public class KafkaSupervisor extends SeekableStreamSupervisor return KafkaSequenceNumber.of(seq); } - private Runnable emitLag() - { - return () -> { - try { - Map highestCurrentOffsets = getHighestCurrentOffsets(); - String dataSource = spec.getDataSchema().getDataSource(); - - if (latestSequenceFromStream == null) { - throw new ISE("Latest offsets from Kafka have not been fetched"); - } - - if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) { - log.warn( - "Lag metric: Kafka partitions %s do not match task partitions %s", - latestSequenceFromStream.keySet(), - highestCurrentOffsets.keySet() - ); - } - - Map partitionLags = getLagPerPartition(highestCurrentOffsets); - long maxLag = 0, totalLag = 0, avgLag; - for (long lag : partitionLags.values()) { - if (lag > maxLag) { - maxLag = lag; - } - totalLag += lag; - } - avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size(); - - emitter.emit( - ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", totalLag) - ); - emitter.emit( - ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/maxLag", maxLag) - ); - emitter.emit( - ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/avgLag", avgLag) - ); - } - catch (Exception e) { - log.warn(e, "Unable to compute Kafka lag"); - } - }; - } - @Override protected Long getNotSetMarker() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java index 768468c933c..d7e639caea9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java @@ -55,6 +55,8 @@ public class KafkaSupervisorReportPayload extends SeekableStreamSupervisorReport latestOffsets, minimumLag, aggregateLag, + null, + null, offsetsLastUpdated, suspended, healthy, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 5c00988e860..2dca7f6edfc 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -34,8 +34,6 @@ import java.io.File; public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig implements SeekableStreamSupervisorTuningConfig { - private static final String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S"; - private final Integer workerThreads; private final Integer chatThreads; private final Long chatRetries; @@ -181,6 +179,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig ); } + @Override @JsonProperty public Duration getOffsetFetchPeriod() { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 0b29b65a628..a29fab1a504 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1371,7 +1371,7 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets().run(); + supervisor.updateCurrentAndLatestOffsets(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1481,7 +1481,7 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets().run(); + supervisor.updateCurrentAndLatestOffsets(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1625,7 +1625,7 @@ public class KafkaSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets().run(); + supervisor.updateCurrentAndLatestOffsets(); SupervisorReport report = supervisor.getStatus(); verifyAll(); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index b71f485fe26..e7ec59a7344 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -44,9 +44,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.common.collect.Queues; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsUtils; +import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; @@ -113,9 +115,9 @@ public class KinesisRecordSupplier implements RecordSupplier private volatile boolean started; private volatile boolean stopRequested; - PartitionResource( - StreamPartition streamPartition - ) + private volatile long currentLagMillis; + + PartitionResource(StreamPartition streamPartition) { this.streamPartition = streamPartition; } @@ -148,6 +150,53 @@ public class KinesisRecordSupplier implements RecordSupplier stopRequested = true; } + long getPartitionTimeLag() + { + return currentLagMillis; + } + + long getPartitionTimeLag(String offset) + { + // if not started (fetching records in background), fetch lag ourself with a throw-away iterator + if (!started) { + try { + final String iteratorType; + final String offsetToUse; + if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) { + // this should probably check if will start processing earliest or latest rather than assuming earliest + // if latest we could skip this because latest will not be behind latest so lag is 0. + iteratorType = ShardIteratorType.TRIM_HORIZON.toString(); + offsetToUse = null; + } else { + iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); + offsetToUse = offset; + } + String shardIterator = kinesis.getShardIterator( + streamPartition.getStream(), + streamPartition.getPartitionId(), + iteratorType, + offsetToUse + ).getShardIterator(); + + GetRecordsResult recordsResult = kinesis.getRecords( + new GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch) + ); + + currentLagMillis = recordsResult.getMillisBehindLatest(); + return currentLagMillis; + } + catch (Exception ex) { + // eat it + log.warn( + ex, + "Failed to determine partition lag for partition %s of stream %s", + streamPartition.getPartitionId(), + streamPartition.getStream() + ); + } + } + return currentLagMillis; + } private Runnable getRecordRunnable() { @@ -191,11 +240,14 @@ public class KinesisRecordSupplier implements RecordSupplier recordsResult = kinesis.getRecords(new GetRecordsRequest().withShardIterator( shardIterator).withLimit(recordsPerFetch)); + currentLagMillis = recordsResult.getMillisBehindLatest(); + // list will come back empty if there are no records for (Record kinesisRecord : recordsResult.getRecords()) { final List data; + if (deaggregate) { if (deaggregateHandle == null || getDataHandle == null) { throw new ISE("deaggregateHandle or getDataHandle is null!"); @@ -637,6 +689,27 @@ public class KinesisRecordSupplier implements RecordSupplier this.closed = true; } + // this is only used for tests + @VisibleForTesting + Map getPartitionTimeLag() + { + return partitionResources.entrySet() + .stream() + .collect( + Collectors.toMap(k -> k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag()) + ); + } + + public Map getPartitionTimeLag(Map currentOffsets) + { + Map partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); + for (Map.Entry, PartitionResource> partition : partitionResources.entrySet()) { + final String partitionId = partition.getKey().getPartitionId(); + partitionLag.put(partitionId, partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId))); + } + return partitionLag; + } + private void seekInternal(StreamPartition partition, String sequenceNumber, ShardIteratorType iteratorEnum) { PartitionResource resource = partitionResources.get(partition); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index f15006750b3..13f94a5d345 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -64,7 +64,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ScheduledExecutorService; /** * Supervisor responsible for managing the KinesisIndexTask for a single dataSource. At a high level, the class accepts a @@ -73,8 +72,6 @@ import java.util.concurrent.ScheduledExecutorService; * and the list of running indexing tasks and ensures that all partitions are being read from and that there are enough * tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of * Kinesis sequences. - *

- * the Kinesis supervisor does not yet support lag calculations */ public class KinesisSupervisor extends SeekableStreamSupervisor { @@ -85,9 +82,10 @@ public class KinesisSupervisor extends SeekableStreamSupervisor { }; - private static final String NOT_SET = "-1"; + public static final String NOT_SET = "-1"; private final KinesisSupervisorSpec spec; private final AWSCredentialsConfig awsCredentialsConfig; + private volatile Map currentPartitionTimeLag; public KinesisSupervisor( final TaskStorage taskStorage, @@ -114,6 +112,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor this.spec = spec; this.awsCredentialsConfig = awsCredentialsConfig; + this.currentPartitionTimeLag = null; } @Override @@ -215,12 +214,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor ); } - @Override - protected void scheduleReporting(ScheduledExecutorService reportingExec) - { - // not yet implemented, see issue #6739 - } - /** * We hash the shard ID string, and then use the first four bytes of the hash as an int % task count */ @@ -277,6 +270,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor ) { KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); + Map partitionLag = getTimeLagPerPartition(getHighestCurrentOffsets()); return new KinesisSupervisorReportPayload( spec.getDataSchema().getDataSource(), ioConfig.getStream(), @@ -287,17 +281,26 @@ public class KinesisSupervisor extends SeekableStreamSupervisor stateManager.isHealthy(), stateManager.getSupervisorState().getBasicState(), stateManager.getSupervisorState(), - stateManager.getExceptionEvents() + stateManager.getExceptionEvents(), + includeOffsets ? partitionLag : null, + includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null ); } - // not yet supported, will be implemented in the future + // not yet supported, will be implemented in the future maybe? need a way to get record count between current + // sequence and latest sequence @Override - protected Map getLagPerPartition(Map currentOffsets) + protected Map getRecordLagPerPartition(Map currentOffsets) { return ImmutableMap.of(); } + @Override + protected Map getTimeLagPerPartition(Map currentOffsets) + { + return ((KinesisRecordSupplier) recordSupplier).getPartitionTimeLag(currentOffsets); + } + @Override protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, @@ -315,10 +318,24 @@ public class KinesisSupervisor extends SeekableStreamSupervisor @Override protected void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, Set> streamPartitions + RecordSupplier recordSupplier, + Set> streamPartitions ) { - // do nothing + KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; + currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets()); + } + + @Override + protected Map getPartitionRecordLag() + { + return null; + } + + @Override + protected Map getPartitionTimeLag() + { + return currentPartitionTimeLag; } @Override @@ -457,5 +474,4 @@ public class KinesisSupervisor extends SeekableStreamSupervisor return new KinesisDataSourceMetadata(newSequences); } - } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java index 9a4ee86937d..89527a17bcd 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java @@ -22,8 +22,9 @@ package org.apache.druid.indexing.kinesis.supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; -import java.util.Collections; +import javax.annotation.Nullable; import java.util.List; +import java.util.Map; public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorReportPayload { @@ -37,7 +38,9 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo boolean healthy, SupervisorStateManager.State state, SupervisorStateManager.State detailedState, - List recentErrors + List recentErrors, + @Nullable Map minimumLagMillis, + @Nullable Long aggregateLagMillis ) { super( @@ -46,10 +49,12 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo partitions, replicas, durationSeconds, - Collections.emptyMap(), - Collections.emptyMap(), null, null, + null, + minimumLagMillis, + aggregateLagMillis, + null, suspended, healthy, state, @@ -74,7 +79,8 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo ", state=" + getState() + ", detailedState=" + getDetailedState() + ", recentErrors=" + getRecentErrors() + + (getMinimumLagMillis() != null ? ", minimumLagMillis=" + getMinimumLagMillis() : "") + + (getAggregateLagMillis() != null ? ", aggregateLagMillis=" + getAggregateLagMillis() : "") + '}'; } - } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index bc3bbd2314a..8c1a5fa1670 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -39,6 +39,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig private final Duration httpTimeout; private final Duration shutdownTimeout; private final Duration repartitionTransitionDuration; + private final Duration offsetFetchPeriod; public static KinesisSupervisorTuningConfig defaultConfig() { @@ -73,6 +74,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig null, null, null, + null, null ); } @@ -108,7 +110,8 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, - @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration + @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, + @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod ) { super( @@ -151,6 +154,10 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig repartitionTransitionDuration, DEFAULT_REPARTITION_TRANSITION_DURATION ); + this.offsetFetchPeriod = SeekableStreamSupervisorTuningConfig.defaultDuration( + offsetFetchPeriod, + DEFAULT_OFFSET_FETCH_PERIOD + ); } @Override @@ -194,6 +201,13 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig return repartitionTransitionDuration; } + @Override + @JsonProperty + public Duration getOffsetFetchPeriod() + { + return offsetFetchPeriod; + } + @Override public String toString() { @@ -261,5 +275,4 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig getIntermediateHandoffPeriod() ); } - } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 7867522a54e..03302c0538a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2034,8 +2034,13 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase .andReturn(Collections.emptyList()) .anyTimes(); + EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(null) + .anyTimes(); + replayAll(); + final KinesisIndexTask task1 = createTask( "task1", new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 13cfd74c722..15e56c032df 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -312,6 +312,7 @@ public class KinesisIndexTaskTuningConfigTest null, null, null, + null, null ); KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 3bb675e464e..76de9a95139 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -48,6 +48,7 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -59,6 +60,15 @@ public class KinesisRecordSupplierTest extends EasyMockSupport private static final String SHARD_ID0 = "0"; private static final String SHARD1_ITERATOR = "1"; private static final String SHARD0_ITERATOR = "0"; + + private static final Long SHARD0_LAG_MILLIS = 100L; + private static final Long SHARD1_LAG_MILLIS = 200L; + private static Map SHARDS_LAG_MILLIS = + ImmutableMap.of(SHARD_ID0, SHARD0_LAG_MILLIS, SHARD_ID1, SHARD1_LAG_MILLIS); + private static final List SHARD0_RECORDS = ImmutableList.of( + new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), + new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1") + ); private static final List SHARD1_RECORDS = ImmutableList.of( new Record().withData(jb("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), new Record().withData(jb("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"), @@ -71,10 +81,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"), new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9") ); - private static final List SHARD0_RECORDS = ImmutableList.of( - new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), - new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1") - ); private static final List ALL_RECORDS = ImmutableList.builder() .addAll(SHARD0_RECORDS.stream() .map(x -> new OrderedPartitionableRecord<>( @@ -104,6 +110,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport .toList())) .build(); + private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1) { try { @@ -262,6 +269,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -299,6 +308,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); } @Test @@ -335,6 +345,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS.subList(2, SHARD1_RECORDS.size())).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -374,7 +386,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Assert.assertEquals(9, polledRecords.size()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(4, 12))); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(1, 2))); - + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); } @@ -499,6 +511,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD1_RECORDS.subList(7, SHARD1_RECORDS.size())).once(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -534,6 +548,9 @@ public class KinesisRecordSupplierTest extends EasyMockSupport firstRecord ); + // only one partition in this test. first results come from getRecordsResult1, which has SHARD1_LAG_MILLIS + Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionTimeLag()); + recordSupplier.seek(StreamPartition.of(STREAM, SHARD_ID1), "7"); recordSupplier.start(); @@ -541,9 +558,12 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Thread.sleep(100); } + OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); Assert.assertEquals(ALL_RECORDS.get(9), record2); + // only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS + Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionTimeLag()); verifyAll(); } @@ -581,6 +601,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -618,6 +640,83 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); + } + + + @Test + public void getPartitionTimeLag() throws InterruptedException + { + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult0).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult1).anyTimes(); + + EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); + EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + .andReturn(getRecordsResult0) + .anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) + .andReturn(getRecordsResult1) + .anyTimes(); + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); + EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); + EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); + + replayAll(); + + Set> partitions = ImmutableSet.of( + StreamPartition.of(STREAM, SHARD_ID0), + StreamPartition.of(STREAM, SHARD_ID1) + ); + + + recordSupplier = new KinesisRecordSupplier( + kinesis, + recordsPerFetch, + 0, + 2, + true, + 100, + 5000, + 5000, + 60000, + 100 + ); + + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + recordSupplier.start(); + + for (int i = 0; i < 10 && recordSupplier.bufferSize() < 12; i++) { + Thread.sleep(100); + } + + Map offsts = ImmutableMap.of( + SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(), + SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber() + ); + Map timeLag = recordSupplier.getPartitionTimeLag(offsts); + + verifyAll(); + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag); } /** @@ -637,4 +736,5 @@ public class KinesisRecordSupplierTest extends EasyMockSupport return retVal; } } + } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index ad267e8a1ea..94ca6bf693b 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -128,7 +128,7 @@ public class KinesisSupervisorTest extends EasyMockSupport private static final StreamPartition SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0); private static final StreamPartition SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1); private static final StreamPartition SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2); - + private static final Map TIME_LAG = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L); private static DataSchema dataSchema; private KinesisRecordSupplier supervisorRecordSupplier; @@ -198,6 +198,7 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -230,6 +231,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -297,6 +301,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -356,6 +363,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -433,6 +443,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -485,6 +498,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -544,6 +560,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -641,6 +660,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); // non KinesisIndexTask (don't kill) Task id2 = new RealtimeIndexTask( @@ -707,6 +729,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -819,7 +844,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -931,6 +958,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); DateTime now = DateTimes.nowUtc(); DateTime maxi = now.plusMinutes(60); @@ -1068,6 +1098,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1194,6 +1227,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); final Capture firstTasks = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1305,6 +1341,7 @@ public class KinesisSupervisorTest extends EasyMockSupport public void testDiscoverExistingPublishingTask() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final Map timeLag = ImmutableMap.of(SHARD_ID1, 0L, SHARD_ID0, 20000000L); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); @@ -1323,6 +1360,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(timeLag) + .atLeastOnce(); Task task = createKinesisIndexTask( "id1", @@ -1394,7 +1434,7 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets().run(); + supervisor.updateCurrentAndLatestOffsets(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1412,6 +1452,9 @@ public class KinesisSupervisorTest extends EasyMockSupport Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); Assert.assertEquals(0, payload.getRecentErrors().size()); + Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); + Assert.assertEquals(20000000L, (long) payload.getAggregateLagMillis()); + TaskReportData publishingReport = payload.getPublishingTasks().get(0); Assert.assertEquals("id1", publishingReport.getId()); @@ -1463,6 +1506,7 @@ public class KinesisSupervisorTest extends EasyMockSupport public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); + final Map timeLag = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(EasyMock.anyObject()); @@ -1480,6 +1524,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(timeLag) + .atLeastOnce(); Task task = createKinesisIndexTask( "id1", @@ -1540,7 +1587,7 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets().run(); + supervisor.updateCurrentAndLatestOffsets(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1557,6 +1604,9 @@ public class KinesisSupervisorTest extends EasyMockSupport Assert.assertEquals(1, payload.getPublishingTasks().size()); Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); Assert.assertEquals(0, payload.getRecentErrors().size()); + Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); + Assert.assertEquals(9000L + 1234L, (long) payload.getAggregateLagMillis()); + TaskReportData publishingReport = payload.getPublishingTasks().get(0); @@ -1611,6 +1661,7 @@ public class KinesisSupervisorTest extends EasyMockSupport final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); + final Map timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); @@ -1629,6 +1680,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(timeLag) + .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", DATASOURCE, @@ -1733,7 +1787,7 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets().run(); + supervisor.updateCurrentAndLatestOffsets(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1768,6 +1822,7 @@ public class KinesisSupervisorTest extends EasyMockSupport SHARD_ID0, "1" ), activeReport.getCurrentOffsets()); + Assert.assertEquals(timeLag, activeReport.getLagMillis()); Assert.assertEquals("id1", publishingReport.getId()); Assert.assertEquals(ImmutableMap.of( @@ -1803,6 +1858,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1885,6 +1943,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1994,6 +2055,10 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); + Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); @@ -2141,6 +2206,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -2549,6 +2617,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -2775,6 +2846,10 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); @@ -2929,6 +3004,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3182,7 +3260,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -3431,6 +3511,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Task task = createKinesisIndexTask( "id2", @@ -3527,6 +3610,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Task task = createKinesisIndexTask( "id1", @@ -3643,6 +3729,7 @@ public class KinesisSupervisorTest extends EasyMockSupport null, 42, // This property is different from tuningConfig null, + null, null ); @@ -3768,6 +3855,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3876,6 +3966,9 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4051,6 +4144,9 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4214,6 +4310,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -4333,6 +4432,9 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture postMergeCaptured = Capture.newInstance(CaptureType.ALL); @@ -4487,6 +4589,9 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + .andReturn(TIME_LAG) + .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4694,6 +4799,7 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, + null, null ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 95e08029f9f..948b687ea56 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -77,6 +77,8 @@ import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.segment.indexing.DataSchema; import org.joda.time.DateTime; @@ -106,6 +108,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -128,6 +131,10 @@ public abstract class SeekableStreamSupervisor recordSupplier; + protected volatile RecordSupplier recordSupplier; private volatile boolean started = false; private volatile boolean stopped = false; private volatile boolean lifecycleStarted = false; + private final ServiceEmitter emitter; public SeekableStreamSupervisor( final String supervisorId, @@ -502,6 +510,7 @@ public abstract class SeekableStreamSupervisor { - try { - updateCurrentOffsets(); - updateLatestOffsetsFromStream(); - sequenceLastUpdated = DateTimes.nowUtc(); - } - catch (Exception e) { - log.warn(e, "Exception while getting current/latest sequences"); - } - }; + try { + updateCurrentOffsets(); + updateLatestOffsetsFromStream(); + sequenceLastUpdated = DateTimes.nowUtc(); + } + catch (Exception e) { + log.warn(e, "Exception while getting current/latest sequences"); + } } private void updateCurrentOffsets() throws InterruptedException, ExecutionException, TimeoutException @@ -3158,18 +3167,35 @@ public abstract class SeekableStreamSupervisor> partitions ); + /** + * Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets. + */ + @Nullable + protected abstract Map getPartitionRecordLag(); + + /** + * Gets 'lag' of currently processed offset behind latest offset as a measure of the difference in time inserted. + */ + @Nullable + protected abstract Map getPartitionTimeLag(); + protected Map getHighestCurrentOffsets() { - return activelyReadingTaskGroups - .values() - .stream() - .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) - .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()) - .collect(Collectors.toMap( - Entry::getKey, - Entry::getValue, - (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2 - )); + if (!spec.isSuspended() || activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) { + return activelyReadingTaskGroups + .values() + .stream() + .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) + .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()) + .collect(Collectors.toMap( + Entry::getKey, + Entry::getValue, + (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2 + )); + } else { + // if supervisor is suspended, no tasks are likely running so use offsets in metadata, if exist + return getOffsetsFromMetadataStorage(); + } } private OrderedSequenceNumber makeSequenceNumber(SequenceOffsetType seq) @@ -3352,18 +3378,42 @@ public abstract class SeekableStreamSupervisor lag */ - protected abstract Map getLagPerPartition(Map currentOffsets); + protected abstract Map getRecordLagPerPartition( + Map currentOffsets + ); + + protected abstract Map getTimeLagPerPartition( + Map currentOffsets + ); /** * returns an instance of a specific Kinesis/Kafka recordSupplier @@ -3397,6 +3447,61 @@ public abstract class SeekableStreamSupervisor partitionRecordLags = getPartitionRecordLag(); + Map partitionTimeLags = getPartitionTimeLag(); + + if (partitionRecordLags == null && partitionTimeLags == null) { + throw new ISE("Latest offsets have not been fetched"); + } + final String type = spec.getType(); + + BiConsumer, String> emitFn = (partitionLags, suffix) -> { + if (partitionLags == null) { + return; + } + + long maxLag = 0, totalLag = 0, avgLag; + for (long lag : partitionLags.values()) { + if (lag > maxLag) { + maxLag = lag; + } + totalLag += lag; + } + avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size(); + + emitter.emit( + ServiceMetricEvent.builder() + .setDimension("dataSource", dataSource) + .build(StringUtils.format("ingest/%s/lag%s", type, suffix), totalLag) + ); + emitter.emit( + ServiceMetricEvent.builder() + .setDimension("dataSource", dataSource) + .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), maxLag) + ); + emitter.emit( + ServiceMetricEvent.builder() + .setDimension("dataSource", dataSource) + .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), avgLag) + ); + }; + + // this should probably really be /count or /records or something.. but keeping like this for backwards compat + emitFn.accept(partitionRecordLags, ""); + emitFn.accept(partitionTimeLags, "/time"); + } + catch (Exception e) { + log.warn(e, "Unable to compute lag"); + } + } + /** * a special sequence number that is used to indicate that the sequence offset * for a particular partition has not yet been calculated by the supervisor. When diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java index 7436488cf11..7d69c0c8ee7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java @@ -41,8 +41,10 @@ public abstract class SeekableStreamSupervisorReportPayload activeTasks; private final List publishingTasks; private final Map latestOffsets; - private final Map minimumLag; + private final Map minimumLag; private final Long aggregateLag; + private final Map minimumLagMillis; + private final Long aggregateLagMillis; private final DateTime offsetsLastUpdated; private final boolean suspended; private final boolean healthy; @@ -57,8 +59,10 @@ public abstract class SeekableStreamSupervisorReportPayload latestOffsets, - @Nullable Map minimumLag, + @Nullable Map minimumLag, @Nullable Long aggregateLag, + @Nullable Map minimumLagMillis, + @Nullable Long aggregateLagMillis, @Nullable DateTime offsetsLastUpdated, boolean suspended, boolean healthy, @@ -77,6 +81,8 @@ public abstract class SeekableStreamSupervisorReportPayload getMinimumLag() + public Map getMinimumLag() { return minimumLag; } @@ -168,6 +174,19 @@ public abstract class SeekableStreamSupervisorReportPayload getMinimumLagMillis() + { + return minimumLagMillis; + } + @JsonProperty public DateTime getOffsetsLastUpdated() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java index 381a5639644..733f1258ccb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java @@ -26,7 +26,7 @@ import org.joda.time.Period; public interface SeekableStreamSupervisorTuningConfig { - + String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S"; int DEFAULT_CHAT_RETRIES = 8; String DEFAULT_HTTP_TIMEOUT = "PT10S"; String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S"; @@ -55,5 +55,8 @@ public interface SeekableStreamSupervisorTuningConfig @JsonProperty Duration getRepartitionTransitionDuration(); + @JsonProperty + Duration getOffsetFetchPeriod(); + SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/TaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/TaskReportData.java index 0b18e58ee67..3edf7124fd2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/TaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/TaskReportData.java @@ -34,7 +34,8 @@ public class TaskReportData private final Long remainingSeconds; private final TaskType type; private final Map currentOffsets; - private final Map lag; + private final Map lag; + private final Map lagMillis; public TaskReportData( String id, @@ -43,7 +44,8 @@ public class TaskReportData @Nullable DateTime startTime, Long remainingSeconds, TaskType type, - @Nullable Map lag + @Nullable Map lag, + @Nullable Map lagMillis ) { this.id = id; @@ -53,6 +55,7 @@ public class TaskReportData this.remainingSeconds = remainingSeconds; this.type = type; this.lag = lag; + this.lagMillis = lagMillis; } @JsonProperty @@ -95,11 +98,18 @@ public class TaskReportData @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) - public Map getLag() + public Map getLag() { return lag; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getLagMillis() + { + return lagMillis; + } + @Override public String toString() { @@ -110,6 +120,7 @@ public class TaskReportData ", startTime=" + startTime + ", remainingSeconds=" + remainingSeconds + (lag != null ? ", lag=" + lag : "") + + (lagMillis != null ? ", lagMillis=" + lagMillis : "") + '}'; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 57aaec30f06..2e5d74af439 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -61,12 +61,16 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; +import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.AuthorizerMapper; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -85,8 +89,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public class SeekableStreamSupervisorStateTest extends EasyMockSupport { @@ -110,6 +116,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport private RowIngestionMetersFactory rowIngestionMetersFactory; private SupervisorStateManagerConfig supervisorConfig; + private TestEmitter emitter; + @Before public void setupTest() { @@ -127,11 +135,14 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport supervisorConfig = new SupervisorStateManagerConfig(); + emitter = new TestEmitter(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes(); EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); EasyMock.expect(taskClientFactory.build( EasyMock.anyObject(), @@ -552,6 +563,173 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport verifyAll(); } + + @Test + public void testEmitBothLag() throws Exception + { + expectEmitterSupervisor(false); + + CountDownLatch latch = new CountDownLatch(1); + TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( + latch, + ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), + ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L) + ); + + + supervisor.start(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + + latch.await(); + Assert.assertEquals(6, emitter.getEvents().size()); + Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric")); + Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric")); + Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value")); + Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric")); + Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value")); + Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(3).toMap().get("metric")); + Assert.assertEquals(45000L, emitter.getEvents().get(3).toMap().get("value")); + Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(4).toMap().get("metric")); + Assert.assertEquals(20000L, emitter.getEvents().get(4).toMap().get("value")); + Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(5).toMap().get("metric")); + Assert.assertEquals(15000L, emitter.getEvents().get(5).toMap().get("value")); + verifyAll(); + } + + @Test + public void testEmitRecordLag() throws Exception + { + expectEmitterSupervisor(false); + + CountDownLatch latch = new CountDownLatch(1); + TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( + latch, + ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), + null + ); + + + supervisor.start(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + + latch.await(); + Assert.assertEquals(3, emitter.getEvents().size()); + Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric")); + Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric")); + Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value")); + Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric")); + Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value")); + verifyAll(); + } + + @Test + public void testEmitTimeLag() throws Exception + { + expectEmitterSupervisor(false); + + CountDownLatch latch = new CountDownLatch(1); + TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( + latch, + null, + ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L) + ); + + + supervisor.start(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + + latch.await(); + Assert.assertEquals(3, emitter.getEvents().size()); + Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(0).toMap().get("metric")); + Assert.assertEquals(45000L, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(1).toMap().get("metric")); + Assert.assertEquals(20000L, emitter.getEvents().get(1).toMap().get("value")); + Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(2).toMap().get("metric")); + Assert.assertEquals(15000L, emitter.getEvents().get(2).toMap().get("value")); + verifyAll(); + } + + @Test + public void testEmitNoLagWhenSuspended() throws Exception + { + expectEmitterSupervisor(true); + + CountDownLatch latch = new CountDownLatch(1); + TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( + latch, + ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), + ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L) + ); + + + supervisor.start(); + supervisor.runInternal(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + + + latch.await(); + Assert.assertEquals(0, emitter.getEvents().size()); + verifyAll(); + } + + private void expectEmitterSupervisor(boolean suspended) throws EntryExistsException + { + spec = createMock(SeekableStreamSupervisorSpec.class); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()), + 1, + 1, + new Period("PT1H"), + new Period("PT1S"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, null + ) + { + }).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes(); + EasyMock.expect(spec.getType()).andReturn("test").anyTimes(); + + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + + replayAll(); + } + private static DataSchema getDataSchema() { List dimensions = new ArrayList<>(); @@ -635,6 +813,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport return new Period("PT2M").toStandardDuration(); } + @Override + public Duration getOffsetFetchPeriod() + { + return new Period("PT5M").toStandardDuration(); + } + @Override public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() { @@ -725,9 +909,9 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } } - private class TestSeekableStreamSupervisor extends SeekableStreamSupervisor + private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor { - private TestSeekableStreamSupervisor() + private BaseTestSeekableStreamSupervisor() { super( "testSupervisorId", @@ -756,6 +940,20 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport // do nothing } + @Nullable + @Override + protected Map getPartitionRecordLag() + { + return null; + } + + @Nullable + @Override + protected Map getPartitionTimeLag() + { + return null; + } + @Override protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( int groupId, @@ -850,13 +1048,13 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Override - protected void scheduleReporting(ScheduledExecutorService reportingExec) + protected Map getRecordLagPerPartition(Map currentOffsets) { - // do nothing + return null; } @Override - protected Map getLagPerPartition(Map currentOffsets) + protected Map getTimeLagPerPartition(Map currentOffsets) { return null; } @@ -864,7 +1062,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport @Override protected RecordSupplier setupRecordSupplier() { - return recordSupplier; + return SeekableStreamSupervisorStateTest.this.recordSupplier; } @Override @@ -883,6 +1081,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport null, null, null, + null, + null, false, true, null, @@ -923,4 +1123,80 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport return false; } } + + private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor + { + @Override + protected void scheduleReporting(ScheduledExecutorService reportingExec) + { + // do nothing + } + } + + private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor + { + private final CountDownLatch latch; + private final Map partitionsRecordLag; + private final Map partitionsTimeLag; + + TestEmittingTestSeekableStreamSupervisor( + CountDownLatch latch, + Map partitionsRecordLag, + Map partitionsTimeLag + ) + { + this.latch = latch; + this.partitionsRecordLag = partitionsRecordLag; + this.partitionsTimeLag = partitionsTimeLag; + } + + @Nullable + @Override + protected Map getPartitionRecordLag() + { + return partitionsRecordLag; + } + + @Nullable + @Override + protected Map getPartitionTimeLag() + { + return partitionsTimeLag; + } + + @Override + protected void emitLag() + { + super.emitLag(); + latch.countDown(); + } + + @Override + protected void scheduleReporting(ScheduledExecutorService reportingExec) + { + SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig(); + reportingExec.scheduleAtFixedRate( + this::emitLag, + ioConfig.getStartDelay().getMillis(), + spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), + TimeUnit.MILLISECONDS + ); + } + } + + private static class TestEmitter extends NoopServiceEmitter + { + private final List events = new ArrayList<>(); + + @Override + public void emit(Event event) + { + events.add(event); + } + + public List getEvents() + { + return events; + } + } }