refactor SeekableStreamSupervisor usage of RecordSupplier (#9819)

* refactor SeekableStreamSupervisor usage of RecordSupplier to reduce contention between background threads and main thread, refactor KinesisRecordSupplier, refactor Kinesis lag metric collection and emitting

* fix style and test

* cleanup, refactor, javadocs, test

* fixes

* keep collecting current offsets and lag if unhealthy in background reporting thread

* review stuffs

* add comment
This commit is contained in:
Clint Wylie 2020-05-16 14:09:39 -07:00 committed by GitHub
parent 522df300c2
commit 2e9548d93d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 712 additions and 580 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexer;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.druid.java.util.common.StringUtils;
@ -31,6 +32,8 @@ public class TaskIdUtils
{
private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*");
private static final Joiner UNDERSCORE_JOINER = Joiner.on("_");
public static void validateId(String thingToValidate, String stringToValidate)
{
Preconditions.checkArgument(
@ -60,4 +63,9 @@ public class TaskIdUtils
}
return suffix.toString();
}
public static String getRandomIdWithPrefix(String prefix)
{
return UNDERSCORE_JOINER.join(prefix, TaskIdUtils.getRandomId());
}
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
@ -46,6 +45,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
@ -219,7 +219,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KafkaIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
@ -334,16 +334,38 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
}
@Override
protected void updateLatestSequenceFromStream(
RecordSupplier<Integer, Long> recordSupplier,
Set<StreamPartition<Integer>> partitions
)
protected void updatePartitionLagFromStream()
{
latestSequenceFromStream = partitions.stream()
.collect(Collectors.toMap(
StreamPartition::getPartitionId,
recordSupplier::getPosition
));
getRecordSupplierLock().lock();
try {
Set<Integer> partitionIds;
try {
partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream());
}
catch (Exception e) {
log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream());
throw new StreamException(e);
}
Set<StreamPartition<Integer>> partitions = partitionIds
.stream()
.map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
.collect(Collectors.toSet());
recordSupplier.seekToLatest(partitions);
// this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is
// because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest
// task offsets from the latest offsets from the stream when it is needed
latestSequenceFromStream =
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, recordSupplier::getPosition));
}
catch (InterruptedException e) {
throw new StreamException(e);
}
finally {
getRecordSupplierLock().unlock();
}
}
@Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
@ -97,8 +98,12 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig);
int fetchThreads = tuningConfig.getFetchThreads() != null
? tuningConfig.getFetchThreads()
: Math.max(1, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
: Runtime.getRuntime().availableProcessors() * 2;
Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record supplier"
);
return new KinesisRecordSupplier(
KinesisRecordSupplier.getAmazonKinesisClient(
ioConfig.getEndpoint(),
@ -114,7 +119,8 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll()
tuningConfig.getMaxRecordsPerPoll(),
false
);
}

View File

@ -54,6 +54,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -82,7 +83,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@ -95,6 +98,14 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000;
private static final long EXCEPTION_RETRY_DELAY_MS = 10000;
/**
* We call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard.
* In the case where the shard is constantly removing records that are past their retention period, it is possible
* that we never find the first record in the shard if we use a limit of 1.
*/
private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
private static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
{
final boolean isIOException = ex.getCause() instanceof IOException;
@ -102,6 +113,37 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
return isIOException || isTimeout;
}
/**
* Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing
* array itself. Does not modify position or limit of the buffer.
*/
private static byte[] toByteArray(final ByteBuffer buffer)
{
if (buffer.hasArray()
&& buffer.arrayOffset() == 0
&& buffer.position() == 0
&& buffer.array().length == buffer.limit()) {
return buffer.array();
} else {
final byte[] retVal = new byte[buffer.remaining()];
buffer.duplicate().get(retVal);
return retVal;
}
}
/**
* Catch any exception and wrap it in a {@link StreamException}
*/
private static <T> T wrapExceptions(Callable<T> callable)
{
try {
return callable.call();
}
catch (Exception e) {
throw new StreamException(e);
}
}
private class PartitionResource
{
private final StreamPartition<String> streamPartition;
@ -112,101 +154,80 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
// to indicate that this shard has no more records to read
@Nullable
private volatile String shardIterator;
private volatile boolean started;
private volatile boolean stopRequested;
private volatile long currentLagMillis;
PartitionResource(StreamPartition<String> streamPartition)
private final AtomicBoolean fetchStarted = new AtomicBoolean();
private ScheduledFuture<?> currentFetch;
private PartitionResource(StreamPartition<String> streamPartition)
{
this.streamPartition = streamPartition;
}
void startBackgroundFetch()
private void startBackgroundFetch()
{
if (started) {
if (!backgroundFetchEnabled) {
return;
}
// if seek has been called
if (shardIterator == null) {
log.warn(
"Skipping background fetch for stream[%s] partition[%s] since seek has not been called for this partition",
streamPartition.getStream(),
streamPartition.getPartitionId()
);
return;
}
if (fetchStarted.compareAndSet(false, true)) {
log.debug(
"Starting scheduled fetch for stream[%s] partition[%s]",
streamPartition.getStream(),
streamPartition.getPartitionId()
);
log.info(
"Starting scheduled fetch runnable for stream[%s] partition[%s]",
streamPartition.getStream(),
streamPartition.getPartitionId()
);
stopRequested = false;
started = true;
rescheduleRunnable(fetchDelayMillis);
scheduleBackgroundFetch(fetchDelayMillis);
}
}
void stopBackgroundFetch()
private void stopBackgroundFetch()
{
log.info(
"Stopping scheduled fetch runnable for stream[%s] partition[%s]",
streamPartition.getStream(),
streamPartition.getPartitionId()
);
stopRequested = true;
}
long getPartitionTimeLag()
{
return currentLagMillis;
}
long getPartitionTimeLag(String offset)
{
// if not started (fetching records in background), fetch lag ourself with a throw-away iterator
if (!started) {
try {
final String iteratorType;
final String offsetToUse;
if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) {
// this should probably check if will start processing earliest or latest rather than assuming earliest
// if latest we could skip this because latest will not be behind latest so lag is 0.
iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
offsetToUse = null;
} else {
iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
offsetToUse = offset;
}
String shardIterator = kinesis.getShardIterator(
streamPartition.getStream(),
streamPartition.getPartitionId(),
iteratorType,
offsetToUse
).getShardIterator();
GetRecordsResult recordsResult = kinesis.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch)
);
currentLagMillis = recordsResult.getMillisBehindLatest();
return currentLagMillis;
}
catch (Exception ex) {
// eat it
log.warn(
ex,
"Failed to determine partition lag for partition %s of stream %s",
streamPartition.getPartitionId(),
streamPartition.getStream()
);
if (fetchStarted.compareAndSet(true, false)) {
log.debug(
"Stopping scheduled fetch for stream[%s] partition[%s]",
streamPartition.getStream(),
streamPartition.getPartitionId()
);
if (currentFetch != null && !currentFetch.isDone()) {
currentFetch.cancel(true);
}
}
return currentLagMillis;
}
private Runnable getRecordRunnable()
private void scheduleBackgroundFetch(long delayMillis)
{
if (fetchStarted.get()) {
try {
currentFetch = scheduledExec.schedule(fetchRecords(), delayMillis, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException e) {
log.warn(
e,
"Caught RejectedExecutionException, KinesisRecordSupplier for partition[%s] has likely temporarily shutdown the ExecutorService. "
+ "This is expected behavior after calling seek(), seekToEarliest() and seekToLatest()",
streamPartition.getPartitionId()
);
}
} else {
log.debug("Worker for partition[%s] is already stopped", streamPartition.getPartitionId());
}
}
private Runnable fetchRecords()
{
return () -> {
if (stopRequested) {
started = false;
stopRequested = false;
log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId());
if (!fetchStarted.get()) {
log.debug("Worker for partition[%s] has been stopped", streamPartition.getPartitionId());
return;
}
@ -231,7 +252,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
log.warn("OrderedPartitionableRecord buffer full, retrying in [%,dms]", recordBufferFullWait);
rescheduleRunnable(recordBufferFullWait);
scheduleBackgroundFetch(recordBufferFullWait);
}
return;
@ -298,14 +319,14 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
currRecord.getSequenceNumber()
).getShardIterator();
rescheduleRunnable(recordBufferFullWait);
scheduleBackgroundFetch(recordBufferFullWait);
return;
}
}
shardIterator = recordsResult.getNextShardIterator(); // will be null if the shard has been closed
rescheduleRunnable(fetchDelayMillis);
scheduleBackgroundFetch(fetchDelayMillis);
}
catch (ProvisionedThroughputExceededException e) {
log.warn(
@ -315,7 +336,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
+ "the available throughput. Reduce the frequency or size of your requests."
);
long retryMs = Math.max(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS, fetchDelayMillis);
rescheduleRunnable(retryMs);
scheduleBackgroundFetch(retryMs);
}
catch (InterruptedException e) {
// may happen if interrupted while BlockingQueue.offer() is waiting
@ -324,7 +345,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
"Interrupted while waiting to add record to buffer, retrying in [%,dms]",
EXCEPTION_RETRY_DELAY_MS
);
rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS);
scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS);
}
catch (ExpiredIteratorException e) {
log.warn(
@ -334,7 +355,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
);
if (recordsResult != null) {
shardIterator = recordsResult.getNextShardIterator(); // will be null if the shard has been closed
rescheduleRunnable(fetchDelayMillis);
scheduleBackgroundFetch(fetchDelayMillis);
} else {
throw new ISE("can't reschedule fetch records runnable, recordsResult is null??");
}
@ -347,7 +368,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
catch (AmazonServiceException e) {
if (isServiceExceptionRecoverable(e)) {
log.warn(e, "encounted unknown recoverable AWS exception, retrying in [%,dms]", EXCEPTION_RETRY_DELAY_MS);
rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS);
scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS);
} else {
log.warn(e, "encounted unknown unrecoverable AWS exception, will not retry");
throw new RuntimeException(e);
@ -355,31 +376,32 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
}
catch (Throwable e) {
// non transient errors
log.error(e, "unknown getRecordRunnable exception, will not retry");
log.error(e, "unknown fetchRecords exception, will not retry");
throw new RuntimeException(e);
}
};
}
private void rescheduleRunnable(long delayMillis)
private void seek(ShardIteratorType iteratorEnum, String sequenceNumber)
{
if (started && !stopRequested) {
try {
scheduledExec.schedule(getRecordRunnable(), delayMillis, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException e) {
log.warn(
e,
"Caught RejectedExecutionException, KinesisRecordSupplier for partition[%s] has likely temporarily shutdown the ExecutorService. "
+ "This is expected behavior after calling seek(), seekToEarliest() and seekToLatest()",
streamPartition.getPartitionId()
);
log.debug(
"Seeking partition [%s] to [%s]",
streamPartition.getPartitionId(),
sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
);
}
} else {
log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId());
}
shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
streamPartition.getStream(),
streamPartition.getPartitionId(),
iteratorEnum.toString(),
sequenceNumber
).getShardIterator());
}
private long getPartitionTimeLag()
{
return currentLagMillis;
}
}
@ -398,6 +420,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
private final int maxRecordsPerPoll;
private final int fetchThreads;
private final int recordBufferSize;
private final boolean useEarliestSequenceNumber;
private ScheduledExecutorService scheduledExec;
@ -405,8 +428,9 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
new ConcurrentHashMap<>();
private BlockingQueue<OrderedPartitionableRecord<String, String>> records;
private volatile boolean checkPartitionsStarted = false;
private final boolean backgroundFetchEnabled;
private volatile boolean closed = false;
private AtomicBoolean partitionsFetchStarted = new AtomicBoolean();
public KinesisRecordSupplier(
AmazonKinesis amazonKinesis,
@ -418,7 +442,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
int recordBufferOfferTimeout,
int recordBufferFullWait,
int fetchSequenceNumberTimeout,
int maxRecordsPerPoll
int maxRecordsPerPoll,
boolean useEarliestSequenceNumber
)
{
Preconditions.checkNotNull(amazonKinesis);
@ -432,6 +457,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
this.maxRecordsPerPoll = maxRecordsPerPoll;
this.fetchThreads = fetchThreads;
this.recordBufferSize = recordBufferSize;
this.useEarliestSequenceNumber = useEarliestSequenceNumber;
this.backgroundFetchEnabled = fetchThreads > 0;
// the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache.
// The work around here is to use reflection to find the deaggregate function in the classpath. See details on the
@ -459,16 +486,18 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
getDataHandle = null;
}
log.info(
"Creating fetch thread pool of size [%d] (Runtime.availableProcessors=%d)",
fetchThreads,
Runtime.getRuntime().availableProcessors()
);
if (backgroundFetchEnabled) {
log.info(
"Creating fetch thread pool of size [%d] (Runtime.availableProcessors=%d)",
fetchThreads,
Runtime.getRuntime().availableProcessors()
);
scheduledExec = Executors.newScheduledThreadPool(
fetchThreads,
Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
);
scheduledExec = Executors.newScheduledThreadPool(
fetchThreads,
Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
);
}
records = new LinkedBlockingQueue<>(recordBufferSize);
}
@ -517,12 +546,35 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
public void start()
{
checkIfClosed();
if (checkPartitionsStarted) {
if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(false, true)) {
partitionResources.values().forEach(PartitionResource::startBackgroundFetch);
checkPartitionsStarted = false;
}
}
@Override
public void close()
{
if (this.closed) {
return;
}
assign(ImmutableSet.of());
scheduledExec.shutdown();
try {
if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) {
scheduledExec.shutdownNow();
}
}
catch (InterruptedException e) {
log.warn(e, "InterruptedException while shutting down");
throw new RuntimeException(e);
}
this.closed = true;
}
@Override
public void assign(Set<StreamPartition<String>> collection)
{
@ -535,39 +587,14 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
)
);
for (Iterator<Map.Entry<StreamPartition<String>, PartitionResource>> i = partitionResources.entrySet()
.iterator(); i.hasNext(); ) {
Iterator<Map.Entry<StreamPartition<String>, PartitionResource>> i = partitionResources.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<StreamPartition<String>, PartitionResource> entry = i.next();
if (!collection.contains(entry.getKey())) {
i.remove();
entry.getValue().stopBackgroundFetch();
}
}
}
@Override
public void seek(StreamPartition<String> partition, String sequenceNumber) throws InterruptedException
{
checkIfClosed();
filterBufferAndResetFetchRunnable(ImmutableSet.of(partition));
seekInternal(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER);
}
@Override
public void seekToEarliest(Set<StreamPartition<String>> partitions) throws InterruptedException
{
checkIfClosed();
filterBufferAndResetFetchRunnable(partitions);
partitions.forEach(partition -> seekInternal(partition, null, ShardIteratorType.TRIM_HORIZON));
}
@Override
public void seekToLatest(Set<StreamPartition<String>> partitions) throws InterruptedException
{
checkIfClosed();
filterBufferAndResetFetchRunnable(partitions);
partitions.forEach(partition -> seekInternal(partition, null, ShardIteratorType.LATEST));
}
@Override
@ -576,15 +603,39 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
return partitionResources.keySet();
}
@Override
public void seek(StreamPartition<String> partition, String sequenceNumber) throws InterruptedException
{
filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition));
partitionSeek(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER);
}
@Override
public void seekToEarliest(Set<StreamPartition<String>> partitions) throws InterruptedException
{
filterBufferAndResetBackgroundFetch(partitions);
partitions.forEach(partition -> partitionSeek(partition, null, ShardIteratorType.TRIM_HORIZON));
}
@Override
public void seekToLatest(Set<StreamPartition<String>> partitions) throws InterruptedException
{
filterBufferAndResetBackgroundFetch(partitions);
partitions.forEach(partition -> partitionSeek(partition, null, ShardIteratorType.LATEST));
}
@Nullable
@Override
public String getPosition(StreamPartition<String> partition)
{
throw new UnsupportedOperationException("getPosition() is not supported in Kinesis");
}
@Nonnull
@Override
public List<OrderedPartitionableRecord<String, String>> poll(long timeout)
{
checkIfClosed();
if (checkPartitionsStarted) {
partitionResources.values().forEach(PartitionResource::startBackgroundFetch);
checkPartitionsStarted = false;
}
start();
try {
int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll);
@ -616,23 +667,14 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
@Override
public String getLatestSequenceNumber(StreamPartition<String> partition)
{
checkIfClosed();
return getSequenceNumberInternal(partition, ShardIteratorType.LATEST);
return getSequenceNumber(partition, ShardIteratorType.LATEST);
}
@Nullable
@Override
public String getEarliestSequenceNumber(StreamPartition<String> partition)
{
checkIfClosed();
return getSequenceNumberInternal(partition, ShardIteratorType.TRIM_HORIZON);
}
@Nullable
@Override
public String getPosition(StreamPartition<String> partition)
{
throw new UnsupportedOperationException("getPosition() is not supported in Kinesis");
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
@Override
@ -665,92 +707,216 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
);
}
@Override
public void close()
{
if (this.closed) {
return;
}
assign(ImmutableSet.of());
scheduledExec.shutdown();
try {
if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) {
scheduledExec.shutdownNow();
}
}
catch (InterruptedException e) {
log.warn(e, "InterruptedException while shutting down");
throw new RuntimeException(e);
}
this.closed = true;
}
// this is only used for tests
@VisibleForTesting
Map<String, Long> getPartitionTimeLag()
{
return partitionResources.entrySet()
.stream()
.collect(
Collectors.toMap(k -> k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag())
);
}
public Map<String, Long> getPartitionTimeLag(Map<String, String> currentOffsets)
/**
* Fetch the partition lag, given a stream and set of current partition offsets. This operates independently from
* the {@link PartitionResource} which have been assigned to this record supplier.
*/
public Map<String, Long> getPartitionsTimeLag(String stream, Map<String, String> currentOffsets)
{
Map<String, Long> partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size());
for (Map.Entry<StreamPartition<String>, PartitionResource> partition : partitionResources.entrySet()) {
final String partitionId = partition.getKey().getPartitionId();
partitionLag.put(partitionId, partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId)));
for (Map.Entry<String, String> partitionOffset : currentOffsets.entrySet()) {
StreamPartition<String> partition = new StreamPartition<>(stream, partitionOffset.getKey());
long currentLag = getPartitionTimeLag(partition, partitionOffset.getValue());
partitionLag.put(partitionOffset.getKey(), currentLag);
}
return partitionLag;
}
private void seekInternal(StreamPartition<String> partition, String sequenceNumber, ShardIteratorType iteratorEnum)
/**
* This method is only used for tests to verify that {@link PartitionResource} in fact tracks it's current lag
* as it is polled for records. This isn't currently used in production at all, but could be one day if we were
* to prefer to get the lag from the running tasks in the same API call which fetches the current task offsets,
* instead of directly calling the AWS Kinesis API with the offsets returned from those tasks
* (see {@link #getPartitionsTimeLag}, which accepts a map of current partition offsets).
*/
@VisibleForTesting
Map<String, Long> getPartitionResourcesTimeLag()
{
return partitionResources.entrySet()
.stream()
.collect(
Collectors.toMap(
k -> k.getKey().getPartitionId(),
k -> k.getValue().getPartitionTimeLag()
)
);
}
@VisibleForTesting
public int bufferSize()
{
return records.size();
}
@VisibleForTesting
public boolean isBackgroundFetchRunning()
{
return partitionsFetchStarted.get();
}
/**
* Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call
* {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background
* fetch, which should have been stopped prior to calling this method by a call to
* {@link #filterBufferAndResetBackgroundFetch}.
*/
private void partitionSeek(StreamPartition<String> partition, String sequenceNumber, ShardIteratorType iteratorEnum)
{
PartitionResource resource = partitionResources.get(partition);
if (resource == null) {
throw new ISE("Partition [%s] has not been assigned", partition);
}
log.debug(
"Seeking partition [%s] to [%s]",
partition.getPartitionId(),
sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
);
resource.shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
partition.getStream(),
partition.getPartitionId(),
iteratorEnum.toString(),
sequenceNumber
).getShardIterator());
checkPartitionsStarted = true;
resource.seek(iteratorEnum, sequenceNumber);
}
private void filterBufferAndResetFetchRunnable(Set<StreamPartition<String>> partitions) throws InterruptedException
/**
* Given a partition and a {@link ShardIteratorType}, create a shard iterator and fetch
* {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first sequence number from the result set.
* This method is thread safe as it does not depend on the internal state of the supplier (it doesn't use the
* {@link PartitionResource} which have been assigned to the supplier), and the Kinesis client is thread safe.
*/
@Nullable
private String getSequenceNumber(StreamPartition<String> partition, ShardIteratorType iteratorEnum)
{
scheduledExec.shutdown();
return wrapExceptions(() -> {
String shardIterator =
kinesis.getShardIterator(partition.getStream(), partition.getPartitionId(), iteratorEnum.toString())
.getShardIterator();
long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout;
GetRecordsResult recordsResult = null;
try {
if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) {
scheduledExec.shutdownNow();
while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) {
if (closed) {
log.info("KinesisRecordSupplier closed while fetching sequenceNumber");
return null;
}
final String currentShardIterator = shardIterator;
final GetRecordsRequest request = new GetRecordsRequest().withShardIterator(currentShardIterator)
.withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT);
recordsResult = RetryUtils.retry(
() -> kinesis.getRecords(request),
(throwable) -> {
if (throwable instanceof ProvisionedThroughputExceededException) {
log.warn(
throwable,
"encountered ProvisionedThroughputExceededException while fetching records, this means "
+ "that the request rate for the stream is too high, or the requested data is too large for "
+ "the available throughput. Reduce the frequency or size of your requests. Consider increasing "
+ "the number of shards to increase throughput."
);
return true;
}
return false;
},
GET_SEQUENCE_NUMBER_RETRY_COUNT
);
List<Record> records = recordsResult.getRecords();
if (!records.isEmpty()) {
return records.get(0).getSequenceNumber();
}
shardIterator = recordsResult.getNextShardIterator();
}
}
catch (InterruptedException e) {
log.warn(e, "InterruptedException while shutting down");
throw e;
}
scheduledExec = Executors.newScheduledThreadPool(
fetchThreads,
Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
);
if (shardIterator == null) {
log.info("Partition[%s] returned a null shard iterator, is the shard closed?", partition.getPartitionId());
return KinesisSequenceNumber.END_OF_SHARD_MARKER;
}
// if we reach here, it usually means either the shard has no more records, or records have not been
// added to this shard
log.warn(
"timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard",
partition.getPartitionId(),
recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN"
);
return null;
});
}
/**
* Given a {@link StreamPartition} and an offset, create a 'shard iterator' for the offset and fetch a single record
* in order to get the lag: {@link GetRecordsResult#getMillisBehindLatest()}. This method is thread safe as it does
* not depend on the internal state of the supplier (it doesn't use the {@link PartitionResource} which have been
* assigned to the supplier), and the Kinesis client is thread safe.
*/
private Long getPartitionTimeLag(StreamPartition<String> partition, String offset)
{
return wrapExceptions(() -> {
final String iteratorType;
final String offsetToUse;
if (offset == null || KinesisSupervisor.OFFSET_NOT_SET.equals(offset)) {
if (useEarliestSequenceNumber) {
iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
offsetToUse = null;
} else {
// if offset is not set and not using earliest, it means we will start reading from latest,
// so lag will be 0 and we have nothing to do here
return 0L;
}
} else {
iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
offsetToUse = offset;
}
String shardIterator = kinesis.getShardIterator(
partition.getStream(),
partition.getPartitionId(),
iteratorType,
offsetToUse
).getShardIterator();
GetRecordsResult recordsResult = kinesis.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
);
return recordsResult.getMillisBehindLatest();
});
}
/**
* Explode if {@link #close()} has been called on the supplier.
*/
private void checkIfClosed()
{
if (closed) {
throw new ISE("Invalid operation - KinesisRecordSupplier has already been closed");
}
}
/**
* This method must be called before a seek operation ({@link #seek}, {@link #seekToLatest}, or
* {@link #seekToEarliest}).
*
* When called, it will nuke the {@link #scheduledExec} that is shared by all {@link PartitionResource}, filters
* records from the buffer for partitions which will have a seek operation performed, and stops background fetch for
* each {@link PartitionResource} to prepare for the seek. If background fetch is not currently running, the
* {@link #scheduledExec} will not be re-created.
*/
private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> partitions) throws InterruptedException
{
checkIfClosed();
if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(true, false)) {
scheduledExec.shutdown();
try {
if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) {
scheduledExec.shutdownNow();
}
}
catch (InterruptedException e) {
log.warn(e, "InterruptedException while shutting down");
throw e;
}
scheduledExec = Executors.newScheduledThreadPool(
fetchThreads,
Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
);
}
// filter records in buffer and only retain ones whose partition was not seeked
BlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new LinkedBlockingQueue<>(recordBufferSize);
@ -762,121 +928,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
records = newQ;
// restart fetching threads
partitionResources.values().forEach(x -> x.started = false);
checkPartitionsStarted = true;
}
@Nullable
private String getSequenceNumberInternal(StreamPartition<String> partition, ShardIteratorType iteratorEnum)
{
return wrapExceptions(() -> getSequenceNumberInternal(
partition,
kinesis.getShardIterator(partition.getStream(), partition.getPartitionId(), iteratorEnum.toString())
.getShardIterator()
));
}
@Nullable
private String getSequenceNumberInternal(StreamPartition<String> partition, String shardIterator)
{
long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout;
GetRecordsResult recordsResult = null;
while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) {
if (closed) {
log.info("KinesisRecordSupplier closed while fetching sequenceNumber");
return null;
}
try {
// we call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard.
// In the case where the shard is constantly removing records that are past their retention period, it is possible
// that we never find the first record in the shard if we use a limit of 1.
recordsResult = kinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1000));
}
catch (ProvisionedThroughputExceededException e) {
log.warn(
e,
"encountered ProvisionedThroughputExceededException while fetching records, this means "
+ "that the request rate for the stream is too high, or the requested data is too large for "
+ "the available throughput. Reduce the frequency or size of your requests. Consider increasing "
+ "the number of shards to increase throughput."
);
try {
Thread.sleep(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS);
continue;
}
catch (InterruptedException e1) {
log.warn(e1, "Thread interrupted!");
Thread.currentThread().interrupt();
break;
}
}
List<Record> records = recordsResult.getRecords();
if (!records.isEmpty()) {
return records.get(0).getSequenceNumber();
}
shardIterator = recordsResult.getNextShardIterator();
}
if (shardIterator == null) {
log.info("Partition[%s] returned a null shard iterator, is the shard closed?", partition.getPartitionId());
return KinesisSequenceNumber.END_OF_SHARD_MARKER;
}
// if we reach here, it usually means either the shard has no more records, or records have not been
// added to this shard
log.warn(
"timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard",
partition.getPartitionId(),
recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN"
);
return null;
}
private void checkIfClosed()
{
if (closed) {
throw new ISE("Invalid operation - KinesisRecordSupplier has already been closed");
}
}
/**
* Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing
* array itself. Does not modify position or limit of the buffer.
*/
private static byte[] toByteArray(final ByteBuffer buffer)
{
if (buffer.hasArray()
&& buffer.arrayOffset() == 0
&& buffer.position() == 0
&& buffer.array().length == buffer.limit()) {
return buffer.array();
} else {
final byte[] retVal = new byte[buffer.remaining()];
buffer.duplicate().get(retVal);
return retVal;
}
}
private static <T> T wrapExceptions(Callable<T> callable)
{
try {
return callable.call();
}
catch (Exception e) {
throw new StreamException(e);
}
}
@VisibleForTesting
public int bufferSize()
{
return records.size();
partitionResources.values().forEach(x -> x.stopBackgroundFetch());
}
}

View File

@ -71,7 +71,8 @@ public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll()
tuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber()
);
}
}

View File

@ -36,27 +36,33 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
*/
public static final String END_OF_SHARD_MARKER = "EOS";
// this special marker is used by the KinesisSupervisor to set the endOffsets
// of newly created indexing tasks. This is necessary because streaming tasks do not
// have endPartitionOffsets. This marker signals to the task that it should continue
// to ingest data until taskDuration has elapsed or the task was stopped or paused or killed
/**
* This special marker is used by the KinesisSupervisor to set the endOffsets of newly created indexing tasks. This
* is necessary because streaming tasks do not have endPartitionOffsets. This marker signals to the task that it
* should continue to ingest data until taskDuration has elapsed or the task was stopped or paused or killed.
*/
public static final String NO_END_SEQUENCE_NUMBER = "NO_END_SEQUENCE_NUMBER";
// this special marker is used by the KinesisSupervisor to mark that a shard has been expired
// (i.e., closed and then the retention period has passed)
/**
* This special marker is used by the KinesisSupervisor to mark that a shard has been expired
* (i.e., closed and then the retention period has passed)
*/
public static final String EXPIRED_MARKER = "EXPIRED";
// this flag is used to indicate either END_OF_SHARD_MARKER
// or NO_END_SEQUENCE_NUMBER so that they can be properly compared
// with other sequence numbers
/**
* this flag is used to indicate either END_OF_SHARD_MARKER
* or NO_END_SEQUENCE_NUMBER so that they can be properly compared
* with other sequence numbers
*/
private final boolean isMaxSequenceNumber;
private final BigInteger intSequence;
private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive)
{
super(sequenceNumber, isExclusive);
if (END_OF_SHARD_MARKER.equals(sequenceNumber)
|| NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) {
if (END_OF_SHARD_MARKER.equals(sequenceNumber) || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) {
isMaxSequenceNumber = true;
this.intSequence = null;
} else {

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.indexer.TaskIdUtils;
@ -49,7 +48,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
@ -64,6 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* Supervisor responsible for managing the KinesisIndexTask for a single dataSource. At a high level, the class accepts a
@ -82,7 +81,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
{
};
public static final String NOT_SET = "-1";
public static final String OFFSET_NOT_SET = "-1";
private final KinesisSupervisorSpec spec;
private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag;
@ -167,7 +166,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KinesisIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
@ -187,8 +186,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
@Override
protected RecordSupplier<String, String> setupRecordSupplier()
throws RuntimeException
protected RecordSupplier<String, String> setupRecordSupplier() throws RuntimeException
{
KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig();
@ -202,13 +200,14 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
),
ioConfig.getRecordsPerFetch(),
ioConfig.getFetchDelayMillis(),
1,
0, // skip starting background fetch, it is not used
ioConfig.isDeaggregate(),
taskTuningConfig.getRecordBufferSize(),
taskTuningConfig.getRecordBufferOfferTimeout(),
taskTuningConfig.getRecordBufferFullWait(),
taskTuningConfig.getFetchSequenceNumberTimeout(),
taskTuningConfig.getMaxRecordsPerPoll()
taskTuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber()
);
}
@ -296,7 +295,19 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
@Override
protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets)
{
return ((KinesisRecordSupplier) recordSupplier).getPartitionTimeLag(currentOffsets);
return currentOffsets
.entrySet()
.stream()
.filter(e -> e.getValue() != null &&
currentPartitionTimeLag != null &&
currentPartitionTimeLag.get(e.getKey()) != null
)
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> currentPartitionTimeLag.get(e.getKey())
)
);
}
@Override
@ -315,13 +326,11 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
}
@Override
protected void updateLatestSequenceFromStream(
RecordSupplier<String, String> recordSupplier,
Set<StreamPartition<String>> streamPartitions
)
protected void updatePartitionLagFromStream()
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets());
// this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock
currentPartitionTimeLag = supplier.getPartitionsTimeLag(getIoConfig().getStream(), getHighestCurrentOffsets());
}
@Override
@ -345,7 +354,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
@Override
protected String getNotSetMarker()
{
return NOT_SET;
return OFFSET_NOT_SET;
}
@Override
@ -454,7 +463,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
}
}
newSequences = new SeekableStreamStartSequenceNumbers<String, String>(
newSequences = new SeekableStreamStartSequenceNumbers<>(
old.getStream(),
null,
newPartitionSequenceNumberMap,
@ -462,7 +471,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
newExclusiveStartPartitions
);
} else {
newSequences = new SeekableStreamEndSequenceNumbers<String, String>(
newSequences = new SeekableStreamEndSequenceNumbers<>(
old.getStream(),
null,
newPartitionSequenceNumberMap,

View File

@ -2034,7 +2034,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
.andReturn(Collections.emptyList())
.anyTimes();
EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
EasyMock.expect(recordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject()))
.andReturn(null)
.anyTimes();

View File

@ -203,7 +203,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
60000,
5
5,
true
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@ -212,6 +213,9 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(ImmutableSet.of(SHARD_ID1, SHARD_ID0), recordSupplier.getPartitionIds(STREAM));
// calling poll would start background fetch if seek was called, but will instead be skipped and the results
// empty
Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100));
verifyAll();
@ -290,7 +294,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
60000,
100
100,
true
);
recordSupplier.assign(partitions);
@ -308,7 +313,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag());
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag());
}
@Test
@ -367,7 +372,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
60000,
100
100,
true
);
recordSupplier.assign(partitions);
@ -386,7 +392,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(9, polledRecords.size());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(4, 12)));
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(1, 2)));
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag());
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag());
}
@ -434,7 +440,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
60000,
100
100,
true
);
recordSupplier.assign(partitions);
@ -468,7 +475,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
60000,
5
5,
true
);
recordSupplier.assign(partitions);
@ -530,7 +538,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
60000,
1
1,
true
);
recordSupplier.assign(partitions);
@ -549,7 +558,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
);
// only one partition in this test. first results come from getRecordsResult1, which has SHARD1_LAG_MILLIS
Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionTimeLag());
Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionResourcesTimeLag());
recordSupplier.seek(StreamPartition.of(STREAM, SHARD_ID1), "7");
recordSupplier.start();
@ -563,7 +572,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(ALL_RECORDS.get(9), record2);
// only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS
Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionTimeLag());
Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionResourcesTimeLag());
verifyAll();
}
@ -622,7 +631,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
60000,
100
100,
true
);
recordSupplier.assign(partitions);
@ -640,7 +650,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag());
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag());
}
@Test
@ -692,7 +702,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
1000,
100
100,
true
);
return recordSupplier;
}
@ -705,16 +716,14 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
EasyMock.eq(SHARD_ID0),
EasyMock.anyString(),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult0).anyTimes();
)).andReturn(getShardIteratorResult0).anyTimes();
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID1),
EasyMock.anyString(),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult1).anyTimes();
)).andReturn(getShardIteratorResult1).anyTimes();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
@ -728,8 +737,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).times(2);
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).times(2);
replayAll();
@ -738,7 +747,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
StreamPartition.of(STREAM, SHARD_ID1)
);
recordSupplier = new KinesisRecordSupplier(
kinesis,
recordsPerFetch,
@ -749,7 +757,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
5000,
60000,
100
100,
true
);
recordSupplier.assign(partitions);
@ -760,16 +769,19 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Thread.sleep(100);
}
Map<String, Long> timeLag = recordSupplier.getPartitionResourcesTimeLag();
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag);
Map<String, String> offsts = ImmutableMap.of(
SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(),
SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber()
);
Map<String, Long> timeLag = recordSupplier.getPartitionTimeLag(offsts);
Map<String, Long> independentTimeLag = recordSupplier.getPartitionsTimeLag(STREAM, offsts);
Assert.assertEquals(SHARDS_LAG_MILLIS, independentTimeLag);
verifyAll();
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag);
}
/**

View File

@ -129,7 +129,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0);
private static final StreamPartition<String> SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1);
private static final StreamPartition<String> SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2);
private static final Map<String, Long> TIME_LAG = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L);
private static DataSchema dataSchema;
private KinesisRecordSupplier supervisorRecordSupplier;
@ -232,9 +231,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -284,6 +280,68 @@ public class KinesisSupervisorTest extends EasyMockSupport
);
}
@Test
public void testRecordSupplier()
{
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
STREAM,
INPUT_FORMAT,
"awsEndpoint",
null,
1,
1,
new Period("PT30M"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
100,
1000,
null,
null,
false
);
KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
KinesisSupervisor supervisor = new KinesisSupervisor(
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
clientFactory,
OBJECT_MAPPER,
new KinesisSupervisorSpec(
null,
dataSchema,
tuningConfig,
kinesisSupervisorIOConfig,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
clientFactory,
OBJECT_MAPPER,
new NoopServiceEmitter(),
new DruidMonitorSchedulerConfig(),
rowIngestionMetersFactory,
null,
new SupervisorStateManagerConfig()
),
rowIngestionMetersFactory,
null
);
KinesisRecordSupplier supplier = (KinesisRecordSupplier) supervisor.setupRecordSupplier();
Assert.assertNotNull(supplier);
Assert.assertEquals(0, supplier.bufferSize());
Assert.assertEquals(Collections.emptySet(), supplier.getAssignment());
// background fetch should not be enabled for supervisor supplier
supplier.start();
Assert.assertFalse(supplier.isBackgroundFetchRunning());
}
@Test
public void testMultiTask() throws Exception
{
@ -302,9 +360,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -364,9 +419,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -444,9 +496,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -499,9 +548,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -561,9 +607,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -661,9 +704,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
// non KinesisIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
@ -730,9 +770,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@ -845,9 +882,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -959,9 +993,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
DateTime now = DateTimes.nowUtc();
DateTime maxi = now.plusMinutes(60);
@ -1099,9 +1130,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -1228,9 +1256,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
final Capture<Task> firstTasks = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -1361,7 +1386,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
@ -1453,9 +1478,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());
Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
Assert.assertEquals(20000000L, (long) payload.getAggregateLagMillis());
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
Assert.assertEquals("id1", publishingReport.getId());
@ -1525,7 +1547,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
@ -1605,9 +1627,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(1, payload.getPublishingTasks().size());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());
Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
Assert.assertEquals(9000L + 1234L, (long) payload.getAggregateLagMillis());
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
@ -1681,7 +1700,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
@ -1859,9 +1878,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -1944,9 +1960,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -2056,10 +2069,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@ -2207,9 +2216,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@ -2618,9 +2624,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@ -2847,9 +2850,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@ -3005,9 +3005,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -3261,9 +3258,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@ -3482,7 +3476,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExistsException
public void testDoNotKillCompatibleTasks()
throws InterruptedException, EntryExistsException
{
// This supervisor always returns true for isTaskCurrent -> it should not kill its tasks
int numReplicas = 2;
@ -3512,9 +3507,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task task = createKinesisIndexTask(
"id2",
@ -3582,7 +3574,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
public void testKillIncompatibleTasks() throws InterruptedException, EntryExistsException
public void testKillIncompatibleTasks()
throws InterruptedException, EntryExistsException
{
// This supervisor always returns false for isTaskCurrent -> it should kill its tasks
int numReplicas = 2;
@ -3611,9 +3604,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Task task = createKinesisIndexTask(
"id1",
@ -3856,9 +3846,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -3967,9 +3954,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@ -4145,9 +4129,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@ -4311,9 +4292,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -4433,9 +4411,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> postMergeCaptured = Capture.newInstance(CaptureType.ALL);
@ -4590,9 +4565,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
.andReturn(TIME_LAG)
.atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);

View File

@ -37,6 +37,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
@ -108,6 +110,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -479,7 +482,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
private final Object stopLock = new Object();
private final Object stateChangeLock = new Object();
private final Object recordSupplierLock = new Object();
private final ReentrantLock recordSupplierLock = new ReentrantLock();
private final boolean useExclusiveStartingSequence;
private boolean listenerRegistered = false;
@ -706,6 +709,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
notices.add(new ResetNotice(dataSourceMetadata));
}
public ReentrantLock getRecordSupplierLock()
{
return recordSupplierLock;
}
@VisibleForTesting
public void tryInit()
@ -1889,10 +1897,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
List<PartitionIdType> previousPartitionIds = new ArrayList<>(partitionIds);
Set<PartitionIdType> partitionIdsFromSupplier;
recordSupplierLock.lock();
try {
synchronized (recordSupplierLock) {
partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream());
}
partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream());
}
catch (Exception e) {
stateManager.recordThrowableEvent(e);
@ -1900,6 +1907,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.debug(e, "full stack trace");
return false;
}
finally {
recordSupplierLock.unlock();
}
if (partitionIdsFromSupplier == null || partitionIdsFromSupplier.size() == 0) {
String errMsg = StringUtils.format("No partitions found for stream [%s]", ioConfig.getStream());
@ -1989,6 +1999,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
Int2ObjectMap<List<PartitionIdType>> newlyDiscovered = new Int2ObjectLinkedOpenHashMap<>();
for (PartitionIdType partitionId : activePartitionsIdsFromSupplier) {
int taskGroupId = getTaskGroupIdForPartition(partitionId);
Set<PartitionIdType> partitionGroup = partitionGroups.computeIfAbsent(
@ -1998,16 +2009,30 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
partitionGroup.add(partitionId);
if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) == null) {
log.info(
log.debug(
"New partition [%s] discovered for stream [%s], added to task group [%d]",
partitionId,
ioConfig.getStream(),
taskGroupId
);
newlyDiscovered.computeIfAbsent(taskGroupId, ArrayList::new).add(partitionId);
}
}
if (newlyDiscovered.size() > 0) {
for (Int2ObjectMap.Entry<List<PartitionIdType>> taskGroupPartitions : newlyDiscovered.int2ObjectEntrySet()) {
log.info(
"New partitions %s discovered for stream [%s], added to task group [%s]",
taskGroupPartitions.getValue(),
ioConfig.getStream(),
taskGroupPartitions.getIntKey()
);
}
}
if (!partitionIds.equals(previousPartitionIds)) {
assignRecordSupplierToPartitionIds();
// the set of partition IDs has changed, have any running tasks stop early so that we can adjust to the
// repartitioning quickly by creating new tasks
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
@ -2034,6 +2059,28 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return true;
}
private void assignRecordSupplierToPartitionIds()
{
recordSupplierLock.lock();
try {
final Set partitions = partitionIds.stream()
.map(partitionId -> new StreamPartition<>(ioConfig.getStream(), partitionId))
.collect(Collectors.toSet());
if (!recordSupplier.getAssignment().containsAll(partitions)) {
recordSupplier.assign(partitions);
try {
recordSupplier.seekToEarliest(partitions);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
finally {
recordSupplierLock.unlock();
}
}
/**
* This method determines the set of expired partitions from the set of partitions currently returned by
* the record supplier and the set of partitions previously tracked in the metadata.
@ -2106,6 +2153,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
partitionIds.clear();
partitionIds.addAll(activePartitionsIdsFromSupplier);
assignRecordSupplierToPartitionIds();
for (Integer groupId : partitionGroups.keySet()) {
if (newPartitionGroups.containsKey(groupId)) {
@ -2773,8 +2821,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return startingOffsets;
}
private void createNewTasks()
throws JsonProcessingException
private void createNewTasks() throws JsonProcessingException
{
// update the checkpoints in the taskGroup to latest ones so that new tasks do not read what is already published
verifyAndMergeCheckpoints(
@ -2993,7 +3040,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (sequence == null) {
throw new ISE("unable to fetch sequence number for partition[%s] from stream", partition);
}
log.info("Getting sequence number [%s] for partition [%s]", sequence, partition);
log.debug("Getting sequence number [%s] for partition [%s]", sequence, partition);
return makeSequenceNumber(sequence, false);
}
}
@ -3023,26 +3070,27 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return Collections.emptyMap();
}
/**
* Fetches the earliest or latest offset from the stream via the {@link RecordSupplier}
*/
@Nullable
private SequenceOffsetType getOffsetFromStreamForPartition(PartitionIdType partition, boolean useEarliestOffset)
{
synchronized (recordSupplierLock) {
recordSupplierLock.lock();
try {
StreamPartition<PartitionIdType> topicPartition = new StreamPartition<>(ioConfig.getStream(), partition);
if (!recordSupplier.getAssignment().contains(topicPartition)) {
final Set partitions = Collections.singleton(topicPartition);
recordSupplier.assign(partitions);
try {
recordSupplier.seekToEarliest(partitions);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
// this shouldn't happen, but in case it does...
throw new IllegalStateException("Record supplier does not match current known partitions");
}
return useEarliestOffset
? recordSupplier.getEarliestSequenceNumber(topicPartition)
: recordSupplier.getLatestSequenceNumber(topicPartition);
}
finally {
recordSupplierLock.unlock();
}
}
private void createTasksForGroup(int groupId, int replicas)
@ -3098,16 +3146,24 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
/**
* monitoring method, fetches current partition offsets and lag in a background reporting thread
*/
@VisibleForTesting
public void updateCurrentAndLatestOffsets()
{
try {
updateCurrentOffsets();
updateLatestOffsetsFromStream();
sequenceLastUpdated = DateTimes.nowUtc();
}
catch (Exception e) {
log.warn(e, "Exception while getting current/latest sequences");
// if we aren't in a steady state, chill out for a bit, don't worry, we'll get called later, but if we aren't
// healthy go ahead and try anyway to try if possible to provide insight into how much time is left to fix the
// issue for cluster operators since this feeds the lag metrics
if (stateManager.isSteadyState() || !stateManager.isHealthy()) {
try {
updateCurrentOffsets();
updatePartitionLagFromStream();
sequenceLastUpdated = DateTimes.nowUtc();
}
catch (Exception e) {
log.warn(e, "Exception while getting current/latest sequences");
}
}
}
@ -3136,34 +3192,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
private void updateLatestOffsetsFromStream() throws InterruptedException
{
synchronized (recordSupplierLock) {
Set<PartitionIdType> partitionIds;
try {
partitionIds = recordSupplier.getPartitionIds(ioConfig.getStream());
}
catch (Exception e) {
log.warn("Could not fetch partitions for topic/stream [%s]", ioConfig.getStream());
throw new StreamException(e);
}
Set<StreamPartition<PartitionIdType>> partitions = partitionIds
.stream()
.map(e -> new StreamPartition<>(ioConfig.getStream(), e))
.collect(Collectors.toSet());
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);
updateLatestSequenceFromStream(recordSupplier, partitions);
}
}
protected abstract void updateLatestSequenceFromStream(
RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier,
Set<StreamPartition<PartitionIdType>> partitions
);
protected abstract void updatePartitionLagFromStream();
/**
* Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets.
@ -3179,17 +3208,21 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
{
if (!spec.isSuspended() || activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) {
return activelyReadingTaskGroups
.values()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream())
.collect(Collectors.toMap(
Entry::getKey,
Entry::getValue,
(v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
));
if (!spec.isSuspended()) {
if (activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) {
return activelyReadingTaskGroups
.values()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream())
.collect(Collectors.toMap(
Entry::getKey,
Entry::getValue,
(v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
));
}
// nothing is running but we are not suspended, so lets just hang out in case we get called while things start up
return ImmutableMap.of();
} else {
// if supervisor is suspended, no tasks are likely running so use offsets in metadata, if exist
return getOffsetsFromMetadataStorage();
@ -3447,8 +3480,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected void emitLag()
{
if (spec.isSuspended()) {
// don't emit metrics if supervisor is suspended (lag should still available in status report)
if (spec.isSuspended() || !stateManager.isSteadyState()) {
// don't emit metrics if supervisor is suspended or not in a healthy running state
// (lag should still available in status report)
return;
}
try {

View File

@ -753,7 +753,6 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
spec = createMock(SeekableStreamSupervisorSpec.class);
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(
"stream",
@ -766,13 +765,20 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
false,
new Period("PT30M"),
null,
null, null
null,
null
)
{
}).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
@Override
public Duration getEmitterPeriod()
{
return new Period("PT1S").toStandardDuration();
}
}).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes();
EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
@ -986,9 +992,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Override
protected void updateLatestSequenceFromStream(
RecordSupplier<String, String> recordSupplier, Set<StreamPartition<String>> streamPartitions
)
protected void updatePartitionLagFromStream()
{
// do nothing
}
@ -1219,7 +1223,9 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
protected void emitLag()
{
super.emitLag();
latch.countDown();
if (stateManager.isSteadyState()) {
latch.countDown();
}
}
@Override

View File

@ -92,7 +92,7 @@ public class SupervisorStateManager
private final Deque<ExceptionEvent> recentEventsQueue = new ConcurrentLinkedDeque<>();
private State supervisorState = BasicState.PENDING;
private volatile State supervisorState = BasicState.PENDING;
private boolean atLeastOneSuccessfulRun = false;
private boolean currentRunSuccessful = true;
@ -214,6 +214,11 @@ public class SupervisorStateManager
return supervisorState != null && supervisorState.isHealthy();
}
public boolean isSteadyState()
{
return healthySteadyState.equals(supervisorState);
}
public boolean isAtLeastOneSuccessfulRun()
{
return atLeastOneSuccessfulRun;