add kinesis lag metric (#9509)

* add kinesis lag metric

* fixes

* heh

* do it right this time

* more test

* split out supervisor report lags into lagMillis, remove latest offsets from kinesis supervisor report since always null, review stuffs
This commit is contained in:
Clint Wylie 2020-03-16 21:39:53 -07:00 committed by GitHub
parent 7626be26ca
commit 142742f291
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 854 additions and 159 deletions

View File

@ -50,14 +50,13 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -65,8 +64,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -85,9 +82,6 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
};
private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
private static final Long NOT_SET = -1L;
private static final Long END_OF_PARTITION = Long.MAX_VALUE;
@ -132,28 +126,6 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper);
}
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
KafkaSupervisorTuningConfig tuningConfig = spec.getTuningConfig();
reportingExec.scheduleAtFixedRate(
updateCurrentAndLatestOffsets(),
ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
Math.max(
tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS
),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
emitLag(),
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
monitorSchedulerConfig.getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
}
@Override
protected int getTaskGroupIdForPartition(Integer partitionId)
{
@ -179,7 +151,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
Map<Integer, Long> partitionLag = getLagPerPartition(getHighestCurrentOffsets());
Map<Integer, Long> partitionLag = getRecordLagPerPartition(getHighestCurrentOffsets());
return new KafkaSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getTopic(),
@ -267,11 +239,38 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
return taskList;
}
@Override
protected Map<Integer, Long> getPartitionRecordLag()
{
Map<Integer, Long> highestCurrentOffsets = getHighestCurrentOffsets();
if (latestSequenceFromStream == null) {
return null;
}
if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
log.warn(
"Lag metric: Kafka partitions %s do not match task partitions %s",
latestSequenceFromStream.keySet(),
highestCurrentOffsets.keySet()
);
}
return getRecordLagPerPartition(highestCurrentOffsets);
}
@Nullable
@Override
protected Map<Integer, Long> getPartitionTimeLag()
{
// time lag not currently support with kafka
return null;
}
@Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
{
return currentOffsets
.entrySet()
@ -288,6 +287,12 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
);
}
@Override
protected Map<Integer, Long> getTimeLagPerPartition(Map<Integer, Long> currentOffsets)
{
return null;
}
@Override
protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<Integer, Long> map)
{
@ -300,51 +305,6 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
return KafkaSequenceNumber.of(seq);
}
private Runnable emitLag()
{
return () -> {
try {
Map<Integer, Long> highestCurrentOffsets = getHighestCurrentOffsets();
String dataSource = spec.getDataSchema().getDataSource();
if (latestSequenceFromStream == null) {
throw new ISE("Latest offsets from Kafka have not been fetched");
}
if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
log.warn(
"Lag metric: Kafka partitions %s do not match task partitions %s",
latestSequenceFromStream.keySet(),
highestCurrentOffsets.keySet()
);
}
Map<Integer, Long> partitionLags = getLagPerPartition(highestCurrentOffsets);
long maxLag = 0, totalLag = 0, avgLag;
for (long lag : partitionLags.values()) {
if (lag > maxLag) {
maxLag = lag;
}
totalLag += lag;
}
avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", totalLag)
);
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/maxLag", maxLag)
);
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/avgLag", avgLag)
);
}
catch (Exception e) {
log.warn(e, "Unable to compute Kafka lag");
}
};
}
@Override
protected Long getNotSetMarker()
{

View File

@ -55,6 +55,8 @@ public class KafkaSupervisorReportPayload extends SeekableStreamSupervisorReport
latestOffsets,
minimumLag,
aggregateLag,
null,
null,
offsetsLastUpdated,
suspended,
healthy,

View File

@ -34,8 +34,6 @@ import java.io.File;
public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
implements SeekableStreamSupervisorTuningConfig
{
private static final String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S";
private final Integer workerThreads;
private final Integer chatThreads;
private final Long chatRetries;
@ -181,6 +179,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
);
}
@Override
@JsonProperty
public Duration getOffsetFetchPeriod()
{

View File

@ -1371,7 +1371,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
supervisor.updateCurrentAndLatestOffsets();
SupervisorReport<KafkaSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();
@ -1481,7 +1481,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
supervisor.updateCurrentAndLatestOffsets();
SupervisorReport<KafkaSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();
@ -1625,7 +1625,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
supervisor.updateCurrentAndLatestOffsets();
SupervisorReport<KafkaSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();

View File

@ -44,9 +44,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
@ -113,9 +115,9 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
private volatile boolean started;
private volatile boolean stopRequested;
PartitionResource(
StreamPartition<String> streamPartition
)
private volatile long currentLagMillis;
PartitionResource(StreamPartition<String> streamPartition)
{
this.streamPartition = streamPartition;
}
@ -148,6 +150,53 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
stopRequested = true;
}
long getPartitionTimeLag()
{
return currentLagMillis;
}
long getPartitionTimeLag(String offset)
{
// if not started (fetching records in background), fetch lag ourself with a throw-away iterator
if (!started) {
try {
final String iteratorType;
final String offsetToUse;
if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) {
// this should probably check if will start processing earliest or latest rather than assuming earliest
// if latest we could skip this because latest will not be behind latest so lag is 0.
iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
offsetToUse = null;
} else {
iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
offsetToUse = offset;
}
String shardIterator = kinesis.getShardIterator(
streamPartition.getStream(),
streamPartition.getPartitionId(),
iteratorType,
offsetToUse
).getShardIterator();
GetRecordsResult recordsResult = kinesis.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch)
);
currentLagMillis = recordsResult.getMillisBehindLatest();
return currentLagMillis;
}
catch (Exception ex) {
// eat it
log.warn(
ex,
"Failed to determine partition lag for partition %s of stream %s",
streamPartition.getPartitionId(),
streamPartition.getStream()
);
}
}
return currentLagMillis;
}
private Runnable getRecordRunnable()
{
@ -191,11 +240,14 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
recordsResult = kinesis.getRecords(new GetRecordsRequest().withShardIterator(
shardIterator).withLimit(recordsPerFetch));
currentLagMillis = recordsResult.getMillisBehindLatest();
// list will come back empty if there are no records
for (Record kinesisRecord : recordsResult.getRecords()) {
final List<byte[]> data;
if (deaggregate) {
if (deaggregateHandle == null || getDataHandle == null) {
throw new ISE("deaggregateHandle or getDataHandle is null!");
@ -637,6 +689,27 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
this.closed = true;
}
// this is only used for tests
@VisibleForTesting
Map<String, Long> getPartitionTimeLag()
{
return partitionResources.entrySet()
.stream()
.collect(
Collectors.toMap(k -> k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag())
);
}
public Map<String, Long> getPartitionTimeLag(Map<String, String> currentOffsets)
{
Map<String, Long> partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size());
for (Map.Entry<StreamPartition<String>, PartitionResource> partition : partitionResources.entrySet()) {
final String partitionId = partition.getKey().getPartitionId();
partitionLag.put(partitionId, partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId)));
}
return partitionLag;
}
private void seekInternal(StreamPartition<String> partition, String sequenceNumber, ShardIteratorType iteratorEnum)
{
PartitionResource resource = partitionResources.get(partition);

View File

@ -64,7 +64,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
/**
* Supervisor responsible for managing the KinesisIndexTask for a single dataSource. At a high level, the class accepts a
@ -73,8 +72,6 @@ import java.util.concurrent.ScheduledExecutorService;
* and the list of running indexing tasks and ensures that all partitions are being read from and that there are enough
* tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of
* Kinesis sequences.
* <p>
* the Kinesis supervisor does not yet support lag calculations
*/
public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
{
@ -85,9 +82,10 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
{
};
private static final String NOT_SET = "-1";
public static final String NOT_SET = "-1";
private final KinesisSupervisorSpec spec;
private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag;
public KinesisSupervisor(
final TaskStorage taskStorage,
@ -114,6 +112,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
this.spec = spec;
this.awsCredentialsConfig = awsCredentialsConfig;
this.currentPartitionTimeLag = null;
}
@Override
@ -215,12 +214,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
);
}
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
// not yet implemented, see issue #6739
}
/**
* We hash the shard ID string, and then use the first four bytes of the hash as an int % task count
*/
@ -277,6 +270,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
)
{
KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
Map<String, Long> partitionLag = getTimeLagPerPartition(getHighestCurrentOffsets());
return new KinesisSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getStream(),
@ -287,17 +281,26 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
stateManager.isHealthy(),
stateManager.getSupervisorState().getBasicState(),
stateManager.getSupervisorState(),
stateManager.getExceptionEvents()
stateManager.getExceptionEvents(),
includeOffsets ? partitionLag : null,
includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null
);
}
// not yet supported, will be implemented in the future
// not yet supported, will be implemented in the future maybe? need a way to get record count between current
// sequence and latest sequence
@Override
protected Map<String, String> getLagPerPartition(Map<String, String> currentOffsets)
protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets)
{
return ImmutableMap.of();
}
@Override
protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets)
{
return ((KinesisRecordSupplier) recordSupplier).getPartitionTimeLag(currentOffsets);
}
@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
String stream,
@ -315,10 +318,24 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
@Override
protected void updateLatestSequenceFromStream(
RecordSupplier<String, String> recordSupplier, Set<StreamPartition<String>> streamPartitions
RecordSupplier<String, String> recordSupplier,
Set<StreamPartition<String>> streamPartitions
)
{
// do nothing
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets());
}
@Override
protected Map<String, Long> getPartitionRecordLag()
{
return null;
}
@Override
protected Map<String, Long> getPartitionTimeLag()
{
return currentPartitionTimeLag;
}
@Override
@ -457,5 +474,4 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
return new KinesisDataSourceMetadata(newSequences);
}
}

View File

@ -22,8 +22,9 @@ package org.apache.druid.indexing.kinesis.supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import java.util.Collections;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorReportPayload<String, String>
{
@ -37,7 +38,9 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo
boolean healthy,
SupervisorStateManager.State state,
SupervisorStateManager.State detailedState,
List<SupervisorStateManager.ExceptionEvent> recentErrors
List<SupervisorStateManager.ExceptionEvent> recentErrors,
@Nullable Map<String, Long> minimumLagMillis,
@Nullable Long aggregateLagMillis
)
{
super(
@ -46,10 +49,12 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo
partitions,
replicas,
durationSeconds,
Collections.emptyMap(),
Collections.emptyMap(),
null,
null,
null,
minimumLagMillis,
aggregateLagMillis,
null,
suspended,
healthy,
state,
@ -74,7 +79,8 @@ public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorRepo
", state=" + getState() +
", detailedState=" + getDetailedState() +
", recentErrors=" + getRecentErrors() +
(getMinimumLagMillis() != null ? ", minimumLagMillis=" + getMinimumLagMillis() : "") +
(getAggregateLagMillis() != null ? ", aggregateLagMillis=" + getAggregateLagMillis() : "") +
'}';
}
}

View File

@ -39,6 +39,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration httpTimeout;
private final Duration shutdownTimeout;
private final Duration repartitionTransitionDuration;
private final Duration offsetFetchPeriod;
public static KinesisSupervisorTuningConfig defaultConfig()
{
@ -73,6 +74,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
null,
null,
null,
null,
null
);
}
@ -108,7 +110,8 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod
)
{
super(
@ -151,6 +154,10 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
repartitionTransitionDuration,
DEFAULT_REPARTITION_TRANSITION_DURATION
);
this.offsetFetchPeriod = SeekableStreamSupervisorTuningConfig.defaultDuration(
offsetFetchPeriod,
DEFAULT_OFFSET_FETCH_PERIOD
);
}
@Override
@ -194,6 +201,13 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
return repartitionTransitionDuration;
}
@Override
@JsonProperty
public Duration getOffsetFetchPeriod()
{
return offsetFetchPeriod;
}
@Override
public String toString()
{
@ -261,5 +275,4 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
getIntermediateHandoffPeriod()
);
}
}

View File

@ -2034,8 +2034,13 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
.andReturn(Collections.emptyList())
.anyTimes();
EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(null)
.anyTimes();
replayAll();
final KinesisIndexTask task1 = createTask(
"task1",
new KinesisIndexTaskIOConfig(

View File

@ -312,6 +312,7 @@ public class KinesisIndexTaskTuningConfigTest
null,
null,
null,
null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();

View File

@ -48,6 +48,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -59,6 +60,15 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
private static final String SHARD_ID0 = "0";
private static final String SHARD1_ITERATOR = "1";
private static final String SHARD0_ITERATOR = "0";
private static final Long SHARD0_LAG_MILLIS = 100L;
private static final Long SHARD1_LAG_MILLIS = 200L;
private static Map<String, Long> SHARDS_LAG_MILLIS =
ImmutableMap.of(SHARD_ID0, SHARD0_LAG_MILLIS, SHARD_ID1, SHARD1_LAG_MILLIS);
private static final List<Record> SHARD0_RECORDS = ImmutableList.of(
new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1")
);
private static final List<Record> SHARD1_RECORDS = ImmutableList.of(
new Record().withData(jb("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
new Record().withData(jb("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"),
@ -71,10 +81,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"),
new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9")
);
private static final List<Record> SHARD0_RECORDS = ImmutableList.of(
new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"),
new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1")
);
private static final List<Object> ALL_RECORDS = ImmutableList.builder()
.addAll(SHARD0_RECORDS.stream()
.map(x -> new OrderedPartitionableRecord<>(
@ -104,6 +110,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
.toList()))
.build();
private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1)
{
try {
@ -262,6 +269,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
replayAll();
@ -299,6 +308,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag());
}
@Test
@ -335,6 +345,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS.subList(2, SHARD1_RECORDS.size())).once();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
replayAll();
@ -374,7 +386,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(9, polledRecords.size());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(4, 12)));
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(1, 2)));
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag());
}
@ -499,6 +511,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD1_RECORDS.subList(7, SHARD1_RECORDS.size())).once();
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
replayAll();
@ -534,6 +548,9 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
firstRecord
);
// only one partition in this test. first results come from getRecordsResult1, which has SHARD1_LAG_MILLIS
Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionTimeLag());
recordSupplier.seek(StreamPartition.of(STREAM, SHARD_ID1), "7");
recordSupplier.start();
@ -541,9 +558,12 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Thread.sleep(100);
}
OrderedPartitionableRecord<String, String> record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0);
Assert.assertEquals(ALL_RECORDS.get(9), record2);
// only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS
Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionTimeLag());
verifyAll();
}
@ -581,6 +601,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
replayAll();
@ -618,6 +640,83 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag());
}
@Test
public void getPartitionTimeLag() throws InterruptedException
{
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
EasyMock.anyString(),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult0).anyTimes();
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID1),
EasyMock.anyString(),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult1).anyTimes();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch)))
.andReturn(getRecordsResult0)
.anyTimes();
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch)))
.andReturn(getRecordsResult1)
.anyTimes();
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
replayAll();
Set<StreamPartition<String>> partitions = ImmutableSet.of(
StreamPartition.of(STREAM, SHARD_ID0),
StreamPartition.of(STREAM, SHARD_ID1)
);
recordSupplier = new KinesisRecordSupplier(
kinesis,
recordsPerFetch,
0,
2,
true,
100,
5000,
5000,
60000,
100
);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
recordSupplier.start();
for (int i = 0; i < 10 && recordSupplier.bufferSize() < 12; i++) {
Thread.sleep(100);
}
Map<String, String> offsts = ImmutableMap.of(
SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(),
SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber()
);
Map<String, Long> timeLag = recordSupplier.getPartitionTimeLag(offsts);
verifyAll();
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag);
}
/**
@ -637,4 +736,5 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
return retVal;
}
}
}

View File

@ -128,7 +128,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0);
private static final StreamPartition<String> SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1);
private static final StreamPartition<String> SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2);
private static final Map<String, Long> TIME_LAG = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L);
private static DataSchema dataSchema;
private KinesisRecordSupplier supervisorRecordSupplier;
@ -198,6 +198,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@ -230,6 +231,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -297,6 +301,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -356,6 +363,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -433,6 +443,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -485,6 +498,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -544,6 +560,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -641,6 +660,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
// non KinesisIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
@ -707,6 +729,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@ -819,7 +844,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -931,6 +958,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
DateTime now = DateTimes.nowUtc();
DateTime maxi = now.plusMinutes(60);
@ -1068,6 +1098,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -1194,6 +1227,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
final Capture<Task> firstTasks = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -1305,6 +1341,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
public void testDiscoverExistingPublishingTask() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 0L, SHARD_ID0, 20000000L);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@ -1323,6 +1360,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
Task task = createKinesisIndexTask(
"id1",
@ -1394,7 +1434,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
supervisor.updateCurrentAndLatestOffsets();
SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();
@ -1412,6 +1452,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());
Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
Assert.assertEquals(20000000L, (long) payload.getAggregateLagMillis());
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
Assert.assertEquals("id1", publishingReport.getId());
@ -1463,6 +1506,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(EasyMock.anyObject());
@ -1480,6 +1524,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
Task task = createKinesisIndexTask(
"id1",
@ -1540,7 +1587,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
supervisor.updateCurrentAndLatestOffsets();
SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();
@ -1557,6 +1604,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(1, payload.getPublishingTasks().size());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());
Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
Assert.assertEquals(9000L + 1234L, (long) payload.getAggregateLagMillis());
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
@ -1611,6 +1661,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
final Map<String, Long> timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L);
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@ -1629,6 +1680,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
DATASOURCE,
@ -1733,7 +1787,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets().run();
supervisor.updateCurrentAndLatestOffsets();
SupervisorReport<KinesisSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();
@ -1768,6 +1822,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
SHARD_ID0,
"1"
), activeReport.getCurrentOffsets());
Assert.assertEquals(timeLag, activeReport.getLagMillis());
Assert.assertEquals("id1", publishingReport.getId());
Assert.assertEquals(ImmutableMap.of(
@ -1803,6 +1858,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -1885,6 +1943,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -1994,6 +2055,10 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@ -2141,6 +2206,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@ -2549,6 +2617,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@ -2775,6 +2846,10 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
@ -2929,6 +3004,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -3182,7 +3260,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@ -3431,6 +3511,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task task = createKinesisIndexTask(
"id2",
@ -3527,6 +3610,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task task = createKinesisIndexTask(
"id1",
@ -3643,6 +3729,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
42, // This property is different from tuningConfig
null,
null,
null
);
@ -3768,6 +3855,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -3876,6 +3966,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@ -4051,6 +4144,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@ -4214,6 +4310,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -4333,6 +4432,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> postMergeCaptured = Capture.newInstance(CaptureType.ALL);
@ -4487,6 +4589,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@ -4694,6 +4799,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
null
);

View File

@ -77,6 +77,8 @@ import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
@ -106,6 +108,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -128,6 +131,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
private static final int MAX_INITIALIZATION_RETRIES = 20;
@ -480,10 +487,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private int initRetryCounter = 0;
private volatile DateTime firstRunTime;
private volatile DateTime earlyStopTime = null;
private volatile RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier;
protected volatile RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier;
private volatile boolean started = false;
private volatile boolean stopped = false;
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
public SeekableStreamSupervisor(
final String supervisorId,
@ -502,6 +510,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.sortingMapper = mapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
this.spec = spec;
this.emitter = spec.getEmitter();
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
@ -839,7 +848,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
startTime,
remainingSeconds,
TaskReportData.TaskType.ACTIVE,
includeOffsets ? getLagPerPartition(currentOffsets) : null
includeOffsets ? getRecordLagPerPartition(currentOffsets) : null,
includeOffsets ? getTimeLagPerPartition(currentOffsets) : null
)
);
}
@ -866,6 +876,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
startTime,
remainingSeconds,
TaskReportData.TaskType.PUBLISHING,
null,
null
)
);
@ -3090,18 +3101,16 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@VisibleForTesting
public Runnable updateCurrentAndLatestOffsets()
public void updateCurrentAndLatestOffsets()
{
return () -> {
try {
updateCurrentOffsets();
updateLatestOffsetsFromStream();
sequenceLastUpdated = DateTimes.nowUtc();
}
catch (Exception e) {
log.warn(e, "Exception while getting current/latest sequences");
}
};
try {
updateCurrentOffsets();
updateLatestOffsetsFromStream();
sequenceLastUpdated = DateTimes.nowUtc();
}
catch (Exception e) {
log.warn(e, "Exception while getting current/latest sequences");
}
}
private void updateCurrentOffsets() throws InterruptedException, ExecutionException, TimeoutException
@ -3158,18 +3167,35 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Set<StreamPartition<PartitionIdType>> partitions
);
/**
* Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets.
*/
@Nullable
protected abstract Map<PartitionIdType, Long> getPartitionRecordLag();
/**
* Gets 'lag' of currently processed offset behind latest offset as a measure of the difference in time inserted.
*/
@Nullable
protected abstract Map<PartitionIdType, Long> getPartitionTimeLag();
protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
{
return activelyReadingTaskGroups
.values()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream())
.collect(Collectors.toMap(
Entry::getKey,
Entry::getValue,
(v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
));
if (!spec.isSuspended() || activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) {
return activelyReadingTaskGroups
.values()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream())
.collect(Collectors.toMap(
Entry::getKey,
Entry::getValue,
(v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
));
} else {
// if supervisor is suspended, no tasks are likely running so use offsets in metadata, if exist
return getOffsetsFromMetadataStorage();
}
}
private OrderedSequenceNumber<SequenceOffsetType> makeSequenceNumber(SequenceOffsetType seq)
@ -3352,18 +3378,42 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
/**
* schedules periodic emitLag() reporting for Kafka, not yet implemented in Kinesis,
* but will be in the future
* default implementation, schedules periodic fetch of latest offsets and {@link #emitLag} reporting for Kafka and Kinesis
*/
protected abstract void scheduleReporting(ScheduledExecutorService reportingExec);
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
SeekableStreamSupervisorTuningConfig tuningConfig = spec.getTuningConfig();
reportingExec.scheduleAtFixedRate(
this::updateCurrentAndLatestOffsets,
ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
Math.max(
tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS
),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
this::emitLag,
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
}
/**
* calculate lag per partition for kafka, kinesis implementation returns an empty
* calculate lag per partition for kafka as a measure of message count, kinesis implementation returns an empty
* map
*
* @return map of partition id -> lag
*/
protected abstract Map<PartitionIdType, SequenceOffsetType> getLagPerPartition(Map<PartitionIdType, SequenceOffsetType> currentOffsets);
protected abstract Map<PartitionIdType, Long> getRecordLagPerPartition(
Map<PartitionIdType, SequenceOffsetType> currentOffsets
);
protected abstract Map<PartitionIdType, Long> getTimeLagPerPartition(
Map<PartitionIdType, SequenceOffsetType> currentOffsets
);
/**
* returns an instance of a specific Kinesis/Kafka recordSupplier
@ -3397,6 +3447,61 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
&& makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata)) <= 0;
}
protected void emitLag()
{
if (spec.isSuspended()) {
// don't emit metrics if supervisor is suspended (lag should still available in status report)
return;
}
try {
Map<PartitionIdType, Long> partitionRecordLags = getPartitionRecordLag();
Map<PartitionIdType, Long> partitionTimeLags = getPartitionTimeLag();
if (partitionRecordLags == null && partitionTimeLags == null) {
throw new ISE("Latest offsets have not been fetched");
}
final String type = spec.getType();
BiConsumer<Map<PartitionIdType, Long>, String> emitFn = (partitionLags, suffix) -> {
if (partitionLags == null) {
return;
}
long maxLag = 0, totalLag = 0, avgLag;
for (long lag : partitionLags.values()) {
if (lag > maxLag) {
maxLag = lag;
}
totalLag += lag;
}
avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.build(StringUtils.format("ingest/%s/lag%s", type, suffix), totalLag)
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), maxLag)
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), avgLag)
);
};
// this should probably really be /count or /records or something.. but keeping like this for backwards compat
emitFn.accept(partitionRecordLags, "");
emitFn.accept(partitionTimeLags, "/time");
}
catch (Exception e) {
log.warn(e, "Unable to compute lag");
}
}
/**
* a special sequence number that is used to indicate that the sequence offset
* for a particular partition has not yet been calculated by the supervisor. When

View File

@ -41,8 +41,10 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
private final List<TaskReportData> activeTasks;
private final List<TaskReportData> publishingTasks;
private final Map<PartitionIdType, SequenceOffsetType> latestOffsets;
private final Map<PartitionIdType, SequenceOffsetType> minimumLag;
private final Map<PartitionIdType, Long> minimumLag;
private final Long aggregateLag;
private final Map<PartitionIdType, Long> minimumLagMillis;
private final Long aggregateLagMillis;
private final DateTime offsetsLastUpdated;
private final boolean suspended;
private final boolean healthy;
@ -57,8 +59,10 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
int replicas,
long durationSeconds,
@Nullable Map<PartitionIdType, SequenceOffsetType> latestOffsets,
@Nullable Map<PartitionIdType, SequenceOffsetType> minimumLag,
@Nullable Map<PartitionIdType, Long> minimumLag,
@Nullable Long aggregateLag,
@Nullable Map<PartitionIdType, Long> minimumLagMillis,
@Nullable Long aggregateLagMillis,
@Nullable DateTime offsetsLastUpdated,
boolean suspended,
boolean healthy,
@ -77,6 +81,8 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
this.latestOffsets = latestOffsets;
this.minimumLag = minimumLag;
this.aggregateLag = aggregateLag;
this.minimumLagMillis = minimumLagMillis;
this.aggregateLagMillis = aggregateLagMillis;
this.offsetsLastUpdated = offsetsLastUpdated;
this.suspended = suspended;
this.healthy = healthy;
@ -157,7 +163,7 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
}
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getMinimumLag()
public Map<PartitionIdType, Long> getMinimumLag()
{
return minimumLag;
}
@ -168,6 +174,19 @@ public abstract class SeekableStreamSupervisorReportPayload<PartitionIdType, Seq
return aggregateLag;
}
@JsonProperty
public Long getAggregateLagMillis()
{
return aggregateLagMillis;
}
@JsonProperty
public Map<PartitionIdType, Long> getMinimumLagMillis()
{
return minimumLagMillis;
}
@JsonProperty
public DateTime getOffsetsLastUpdated()
{

View File

@ -26,7 +26,7 @@ import org.joda.time.Period;
public interface SeekableStreamSupervisorTuningConfig
{
String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S";
int DEFAULT_CHAT_RETRIES = 8;
String DEFAULT_HTTP_TIMEOUT = "PT10S";
String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S";
@ -55,5 +55,8 @@ public interface SeekableStreamSupervisorTuningConfig
@JsonProperty
Duration getRepartitionTransitionDuration();
@JsonProperty
Duration getOffsetFetchPeriod();
SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig();
}

View File

@ -34,7 +34,8 @@ public class TaskReportData<PartitionIdType, SequenceOffsetType>
private final Long remainingSeconds;
private final TaskType type;
private final Map<PartitionIdType, SequenceOffsetType> currentOffsets;
private final Map<PartitionIdType, SequenceOffsetType> lag;
private final Map<PartitionIdType, Long> lag;
private final Map<PartitionIdType, Long> lagMillis;
public TaskReportData(
String id,
@ -43,7 +44,8 @@ public class TaskReportData<PartitionIdType, SequenceOffsetType>
@Nullable DateTime startTime,
Long remainingSeconds,
TaskType type,
@Nullable Map<PartitionIdType, SequenceOffsetType> lag
@Nullable Map<PartitionIdType, Long> lag,
@Nullable Map<PartitionIdType, Long> lagMillis
)
{
this.id = id;
@ -53,6 +55,7 @@ public class TaskReportData<PartitionIdType, SequenceOffsetType>
this.remainingSeconds = remainingSeconds;
this.type = type;
this.lag = lag;
this.lagMillis = lagMillis;
}
@JsonProperty
@ -95,11 +98,18 @@ public class TaskReportData<PartitionIdType, SequenceOffsetType>
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<PartitionIdType, SequenceOffsetType> getLag()
public Map<PartitionIdType, Long> getLag()
{
return lag;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<PartitionIdType, Long> getLagMillis()
{
return lagMillis;
}
@Override
public String toString()
{
@ -110,6 +120,7 @@ public class TaskReportData<PartitionIdType, SequenceOffsetType>
", startTime=" + startTime +
", remainingSeconds=" + remainingSeconds +
(lag != null ? ", lag=" + lag : "") +
(lagMillis != null ? ", lagMillis=" + lagMillis : "") +
'}';
}

View File

@ -61,12 +61,16 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@ -85,8 +89,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
@ -110,6 +116,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
private RowIngestionMetersFactory rowIngestionMetersFactory;
private SupervisorStateManagerConfig supervisorConfig;
private TestEmitter emitter;
@Before
public void setupTest()
{
@ -127,11 +135,14 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
supervisorConfig = new SupervisorStateManagerConfig();
emitter = new TestEmitter();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(taskClientFactory.build(
EasyMock.anyObject(),
@ -552,6 +563,173 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
verifyAll();
}
@Test
public void testEmitBothLag() throws Exception
{
expectEmitterSupervisor(false);
CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
);
supervisor.start();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
latch.await();
Assert.assertEquals(6, emitter.getEvents().size());
Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric"));
Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value"));
Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric"));
Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value"));
Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric"));
Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value"));
Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(3).toMap().get("metric"));
Assert.assertEquals(45000L, emitter.getEvents().get(3).toMap().get("value"));
Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(4).toMap().get("metric"));
Assert.assertEquals(20000L, emitter.getEvents().get(4).toMap().get("value"));
Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(5).toMap().get("metric"));
Assert.assertEquals(15000L, emitter.getEvents().get(5).toMap().get("value"));
verifyAll();
}
@Test
public void testEmitRecordLag() throws Exception
{
expectEmitterSupervisor(false);
CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
null
);
supervisor.start();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
latch.await();
Assert.assertEquals(3, emitter.getEvents().size());
Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric"));
Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value"));
Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric"));
Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value"));
Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric"));
Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value"));
verifyAll();
}
@Test
public void testEmitTimeLag() throws Exception
{
expectEmitterSupervisor(false);
CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
null,
ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
);
supervisor.start();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
latch.await();
Assert.assertEquals(3, emitter.getEvents().size());
Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(0).toMap().get("metric"));
Assert.assertEquals(45000L, emitter.getEvents().get(0).toMap().get("value"));
Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(1).toMap().get("metric"));
Assert.assertEquals(20000L, emitter.getEvents().get(1).toMap().get("value"));
Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(2).toMap().get("metric"));
Assert.assertEquals(15000L, emitter.getEvents().get(2).toMap().get("value"));
verifyAll();
}
@Test
public void testEmitNoLagWhenSuspended() throws Exception
{
expectEmitterSupervisor(true);
CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
);
supervisor.start();
supervisor.runInternal();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
latch.await();
Assert.assertEquals(0, emitter.getEvents().size());
verifyAll();
}
private void expectEmitterSupervisor(boolean suspended) throws EntryExistsException
{
spec = createMock(SeekableStreamSupervisorSpec.class);
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()),
1,
1,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null, null
)
{
}).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes();
EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
replayAll();
}
private static DataSchema getDataSchema()
{
List<DimensionSchema> dimensions = new ArrayList<>();
@ -635,6 +813,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
return new Period("PT2M").toStandardDuration();
}
@Override
public Duration getOffsetFetchPeriod()
{
return new Period("PT5M").toStandardDuration();
}
@Override
public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
{
@ -725,9 +909,9 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
}
private class TestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String>
private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String>
{
private TestSeekableStreamSupervisor()
private BaseTestSeekableStreamSupervisor()
{
super(
"testSupervisorId",
@ -756,6 +940,20 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
// do nothing
}
@Nullable
@Override
protected Map<String, Long> getPartitionRecordLag()
{
return null;
}
@Nullable
@Override
protected Map<String, Long> getPartitionTimeLag()
{
return null;
}
@Override
protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
int groupId,
@ -850,13 +1048,13 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets)
{
// do nothing
return null;
}
@Override
protected Map<String, String> getLagPerPartition(Map<String, String> currentOffsets)
protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets)
{
return null;
}
@ -864,7 +1062,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
@Override
protected RecordSupplier<String, String> setupRecordSupplier()
{
return recordSupplier;
return SeekableStreamSupervisorStateTest.this.recordSupplier;
}
@Override
@ -883,6 +1081,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null,
null,
null,
null,
null,
false,
true,
null,
@ -923,4 +1123,80 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
return false;
}
}
private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
{
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
// do nothing
}
}
private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
{
private final CountDownLatch latch;
private final Map<String, Long> partitionsRecordLag;
private final Map<String, Long> partitionsTimeLag;
TestEmittingTestSeekableStreamSupervisor(
CountDownLatch latch,
Map<String, Long> partitionsRecordLag,
Map<String, Long> partitionsTimeLag
)
{
this.latch = latch;
this.partitionsRecordLag = partitionsRecordLag;
this.partitionsTimeLag = partitionsTimeLag;
}
@Nullable
@Override
protected Map<String, Long> getPartitionRecordLag()
{
return partitionsRecordLag;
}
@Nullable
@Override
protected Map<String, Long> getPartitionTimeLag()
{
return partitionsTimeLag;
}
@Override
protected void emitLag()
{
super.emitLag();
latch.countDown();
}
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
reportingExec.scheduleAtFixedRate(
this::emitLag,
ioConfig.getStartDelay().getMillis(),
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
}
}
private static class TestEmitter extends NoopServiceEmitter
{
private final List<Event> events = new ArrayList<>();
@Override
public void emit(Event event)
{
events.add(event);
}
public List<Event> getEvents()
{
return events;
}
}
}