Kinesis ingestion with empty shards (#12792)

Kinesis ingestion requires all shards to have at least 1 record at the required position in druid.
Even if this is satisified initially, resharding the stream can lead to empty intermediate shards. A significant delay in writing to newly created shards was also problematic.

Kinesis shard sequence numbers are big integers. Introduce two more custom sequence tokens UNREAD_TRIM_HORIZON and UNREAD_LATEST to indicate that a shard has not been read from and that it needs to be read from the start or the end respectively.
These values can be used to avoid the need to read at least one record to obtain a sequence number for ingesting a newly discovered shard.

If a record cannot be obtained immediately, use a marker to obtain the relevant shardIterator and use this shardIterator to obtain a valid sequence number. As long as a valid sequence number is not obtained, continue storing the token as the offset.

These tokens (UNREAD_TRIM_HORIZON and UNREAD_LATEST) are logically ordered to be earlier than any valid sequence number.

However, the ordering requires a few subtle changes to the existing mechanism for record sequence validation:

The sequence availability check ensures that the current offset is before the earliest available sequence in the shard. However, current token being an UNREAD token indicates that any sequence number in the shard is valid (despite the ordering)

Kinesis sequence numbers are inclusive i.e if current sequence == end sequence, there are more records left to read.
However, the equality check is exclusive when dealing with UNREAD tokens.
This commit is contained in:
AmatyaAvadhanula 2022-08-05 22:38:58 +05:30 committed by GitHub
parent 24f8f9e1ab
commit d294404924
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 553 additions and 166 deletions

View File

@ -300,7 +300,6 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)|
|`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for space to become available in the buffer before timing out.| no (default == 5000)|
|`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again.|no (default == 5000)|
|`fetchSequenceNumberTimeout`|Integer|Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence number for a shard. Kinesis will not return the latest sequence number if no data is actively being written to that shard. In this case, this fetch call will repeatedly timeout and retry until fresh data is written to the stream.|no (default == 60000)|
|`fetchThreads`|Integer|Size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|no (default == procs * 2, where "procs" is the number of processors on the server that the task is running on) |
|`segmentWriteOutMediumFactory`|Object|Segment write-out medium to use when creating segments. See below for more information.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.| no (default == P2147483647D)|

View File

@ -26,6 +26,7 @@ import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
@ -157,6 +158,14 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
return nextPos;
}
@Override
public boolean isOffsetAvailable(StreamPartition<Integer> partition, OrderedSequenceNumber<Long> offset)
{
final Long earliestOffset = getEarliestSequenceNumber(partition);
return earliestOffset != null
&& offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
}
@Override
public Long getPosition(StreamPartition<Integer> partition)
{

View File

@ -106,7 +106,6 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
tuningConfig.getRecordBufferSize(),
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll(),
false,
useListShards

View File

@ -127,9 +127,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
String earliestSequenceNumber = recordSupplier.getEarliestSequenceNumber(streamPartition);
if (earliestSequenceNumber == null
|| createSequenceNumber(earliestSequenceNumber).compareTo(createSequenceNumber(sequence)) > 0) {
if (!recordSupplier.isOffsetAvailable(streamPartition, KinesisSequenceNumber.of(sequence))) {
if (task.getTuningConfig().isResetOffsetAutomatically()) {
log.info("Attempting to reset sequences automatically for all partitions");
try {
@ -144,10 +142,9 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
}
} else {
throw new ISE(
"Starting sequenceNumber [%s] is no longer available for partition [%s] (earliest: [%s]) and resetOffsetAutomatically is not enabled",
"Starting sequenceNumber [%s] is no longer available for partition [%s] and resetOffsetAutomatically is not enabled",
sequence,
streamPartition.getPartitionId(),
earliestSequenceNumber
streamPartition.getPartitionId()
);
}
}

View File

@ -39,13 +39,11 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
private static final int DEFAULT_RECORD_BUFFER_SIZE = 10000;
private static final int DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT = 5000;
private static final int DEFAULT_RECORD_BUFFER_FULL_WAIT = 5000;
private static final int DEFAULT_FETCH_SEQUENCE_NUMBER_TIMEOUT = 20000;
private static final int DEFAULT_MAX_RECORDS_PER_POLL = 100;
private final int recordBufferSize;
private final int recordBufferOfferTimeout;
private final int recordBufferFullWait;
private final int fetchSequenceNumberTimeout;
private final Integer fetchThreads;
private final int maxRecordsPerPoll;
@ -69,7 +67,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
@JsonProperty("recordBufferSize") Integer recordBufferSize,
@JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
@JsonProperty("fetchSequenceNumberTimeout") Integer fetchSequenceNumberTimeout,
@JsonProperty("fetchThreads") Integer fetchThreads,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@ -106,8 +103,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
: recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait == null ? DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
this.fetchSequenceNumberTimeout = fetchSequenceNumberTimeout
== null ? DEFAULT_FETCH_SEQUENCE_NUMBER_TIMEOUT : fetchSequenceNumberTimeout;
this.fetchThreads = fetchThreads; // we handle this being null later
this.maxRecordsPerPoll = maxRecordsPerPoll == null ? DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
@ -135,12 +130,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
return recordBufferFullWait;
}
@JsonProperty
public int getFetchSequenceNumberTimeout()
{
return fetchSequenceNumberTimeout;
}
@JsonProperty
public Integer getFetchThreads()
{
@ -175,7 +164,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
getRecordBufferSize(),
getRecordBufferOfferTimeout(),
getRecordBufferFullWait(),
getFetchSequenceNumberTimeout(),
getFetchThreads(),
getSegmentWriteOutMediumFactory(),
isLogParseExceptions(),
@ -202,7 +190,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
return recordBufferSize == that.recordBufferSize &&
recordBufferOfferTimeout == that.recordBufferOfferTimeout &&
recordBufferFullWait == that.recordBufferFullWait &&
fetchSequenceNumberTimeout == that.fetchSequenceNumberTimeout &&
maxRecordsPerPoll == that.maxRecordsPerPoll &&
Objects.equals(fetchThreads, that.fetchThreads);
}
@ -215,7 +202,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
recordBufferSize,
recordBufferOfferTimeout,
recordBufferFullWait,
fetchSequenceNumberTimeout,
fetchThreads,
maxRecordsPerPoll
);
@ -241,7 +227,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
", recordBufferSize=" + recordBufferSize +
", recordBufferOfferTimeout=" + recordBufferOfferTimeout +
", recordBufferFullWait=" + recordBufferFullWait +
", fetchSequenceNumberTimeout=" + fetchSequenceNumberTimeout +
", fetchThreads=" + fetchThreads +
", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
", logParseExceptions=" + isLogParseExceptions() +

View File

@ -53,6 +53,7 @@ import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
@ -400,7 +401,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
private final boolean deaggregate;
private final int recordBufferOfferTimeout;
private final int recordBufferFullWait;
private final int fetchSequenceNumberTimeout;
private final int maxRecordsPerPoll;
private final int fetchThreads;
private final int recordBufferSize;
@ -426,7 +426,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
int recordBufferSize,
int recordBufferOfferTimeout,
int recordBufferFullWait,
int fetchSequenceNumberTimeout,
int maxRecordsPerPoll,
boolean useEarliestSequenceNumber,
boolean useListShards
@ -439,7 +438,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
this.deaggregate = deaggregate;
this.recordBufferOfferTimeout = recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait;
this.fetchSequenceNumberTimeout = fetchSequenceNumberTimeout;
this.maxRecordsPerPoll = maxRecordsPerPoll;
this.fetchThreads = fetchThreads;
this.recordBufferSize = recordBufferSize;
@ -596,7 +594,13 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
public void seek(StreamPartition<String> partition, String sequenceNumber) throws InterruptedException
{
filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition));
partitionSeek(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER);
if (KinesisSequenceNumber.UNREAD_TRIM_HORIZON.equals(sequenceNumber)) {
partitionSeek(partition, null, ShardIteratorType.TRIM_HORIZON);
} else if (KinesisSequenceNumber.UNREAD_LATEST.equals(sequenceNumber)) {
partitionSeek(partition, null, ShardIteratorType.LATEST);
} else {
partitionSeek(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER);
}
}
@Override
@ -666,6 +670,75 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
@Override
public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset)
{
return wrapExceptions(() -> {
KinesisSequenceNumber kinesisSequence = (KinesisSequenceNumber) offset;
// No records have been read from the stream and any record is valid
if (kinesisSequence.isUnread()) {
return true;
}
// Any other custom sequence number
if (!KinesisSequenceNumber.isValidAWSKinesisSequence(kinesisSequence.get())) {
return false;
}
// The first record using AT_SEQUENCE_NUMBER should match the offset
// Should not return empty records provided the record is present
// Reference: https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html
// Section: GetRecords Returns Empty Records Array Even When There is Data in the Stream
String shardIterator = RetryUtils.retry(
() -> kinesis.getShardIterator(partition.getStream(),
partition.getPartitionId(),
ShardIteratorType.AT_SEQUENCE_NUMBER.name(),
kinesisSequence.get())
.getShardIterator(),
(throwable) -> {
if (throwable instanceof ProvisionedThroughputExceededException) {
log.warn(
throwable,
"encountered ProvisionedThroughputExceededException while fetching records, this means "
+ "that the request rate for the stream is too high, or the requested data is too large for "
+ "the available throughput. Reduce the frequency or size of your requests. Consider increasing "
+ "the number of shards to increase throughput."
);
return true;
}
if (throwable instanceof AmazonClientException) {
AmazonClientException ase = (AmazonClientException) throwable;
return AWSClientUtil.isClientExceptionRecoverable(ase);
}
return false;
},
GET_SEQUENCE_NUMBER_RETRY_COUNT
);
GetRecordsRequest getRecordsRequest = new GetRecordsRequest().withShardIterator(shardIterator);
List<Record> records = RetryUtils.retry(
() -> kinesis.getRecords(getRecordsRequest)
.getRecords(),
(throwable) -> {
if (throwable instanceof ProvisionedThroughputExceededException) {
log.warn(
throwable,
"encountered ProvisionedThroughputExceededException while fetching records, this means "
+ "that the request rate for the stream is too high, or the requested data is too large for "
+ "the available throughput. Reduce the frequency or size of your requests. Consider increasing "
+ "the number of shards to increase throughput."
);
return true;
}
if (throwable instanceof AmazonClientException) {
AmazonClientException ase = (AmazonClientException) throwable;
return AWSClientUtil.isClientExceptionRecoverable(ase);
}
return false;
},
GET_SEQUENCE_NUMBER_RETRY_COUNT
);
return !records.isEmpty() && records.get(0).getSequenceNumber().equals(kinesisSequence.get());
});
}
public Set<Shard> getShards(String stream)
{
if (useListShards) {
@ -743,11 +816,12 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
{
Map<String, Long> partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size());
for (Map.Entry<String, String> partitionOffset : currentOffsets.entrySet()) {
StreamPartition<String> partition = new StreamPartition<>(stream, partitionOffset.getKey());
long currentLag = 0L;
if (KinesisSequenceNumber.isValidAWSKinesisSequence(partitionOffset.getValue())) {
StreamPartition<String> partition = new StreamPartition<>(stream, partitionOffset.getKey());
long currentLag = getPartitionTimeLag(partition, partitionOffset.getValue());
partitionLag.put(partitionOffset.getKey(), currentLag);
currentLag = getPartitionTimeLag(partition, partitionOffset.getValue());
}
partitionLag.put(partitionOffset.getKey(), currentLag);
}
return partitionLag;
}
@ -832,6 +906,11 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
* {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first sequence number from the result set.
* This method is thread safe as it does not depend on the internal state of the supplier (it doesn't use the
* {@link PartitionResource} which have been assigned to the supplier), and the Kinesis client is thread safe.
*
* When there are no records at the offset corresponding to the ShardIteratorType,
* If shard is closed, return custom EOS sequence marker
* While getting the earliest sequence number, return a custom marker corresponding to TRIM_HORIZON
* While getting the most recent sequence number, return a custom marker corresponding to LATEST
*/
@Nullable
private String getSequenceNumber(StreamPartition<String> partition, ShardIteratorType iteratorEnum)
@ -840,62 +919,60 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
String shardIterator =
kinesis.getShardIterator(partition.getStream(), partition.getPartitionId(), iteratorEnum.toString())
.getShardIterator();
long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout;
GetRecordsResult recordsResult = null;
while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) {
if (closed) {
log.info("KinesisRecordSupplier closed while fetching sequenceNumber");
return null;
}
final GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator)
.withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT);
GetRecordsResult recordsResult = RetryUtils.retry(
() -> kinesis.getRecords(request),
(throwable) -> {
if (throwable instanceof ProvisionedThroughputExceededException) {
log.warn(
throwable,
"encountered ProvisionedThroughputExceededException while fetching records, this means "
+ "that the request rate for the stream is too high, or the requested data is too large for "
+ "the available throughput. Reduce the frequency or size of your requests. Consider increasing "
+ "the number of shards to increase throughput."
);
return true;
}
if (throwable instanceof AmazonClientException) {
AmazonClientException ase = (AmazonClientException) throwable;
return AWSClientUtil.isClientExceptionRecoverable(ase);
}
return false;
},
GET_SEQUENCE_NUMBER_RETRY_COUNT
);
if (closed) {
log.info("KinesisRecordSupplier closed while fetching sequenceNumber");
return null;
}
final String currentShardIterator = shardIterator;
final GetRecordsRequest request = new GetRecordsRequest().withShardIterator(currentShardIterator)
.withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT);
recordsResult = RetryUtils.retry(
() -> kinesis.getRecords(request),
(throwable) -> {
if (throwable instanceof ProvisionedThroughputExceededException) {
log.warn(
throwable,
"encountered ProvisionedThroughputExceededException while fetching records, this means "
+ "that the request rate for the stream is too high, or the requested data is too large for "
+ "the available throughput. Reduce the frequency or size of your requests. Consider increasing "
+ "the number of shards to increase throughput."
);
return true;
}
if (throwable instanceof AmazonClientException) {
AmazonClientException ase = (AmazonClientException) throwable;
return AWSClientUtil.isClientExceptionRecoverable(ase);
}
return false;
},
GET_SEQUENCE_NUMBER_RETRY_COUNT
);
List<Record> records = recordsResult.getRecords();
List<Record> records = recordsResult.getRecords();
if (!records.isEmpty()) {
return records.get(0).getSequenceNumber();
}
shardIterator = recordsResult.getNextShardIterator();
if (!records.isEmpty()) {
return records.get(0).getSequenceNumber();
}
if (shardIterator == null) {
log.info("Partition[%s] returned a null shard iterator, is the shard closed?", partition.getPartitionId());
if (recordsResult.getNextShardIterator() == null) {
log.info("Partition[%s] is closed and empty", partition.getPartitionId());
return KinesisSequenceNumber.END_OF_SHARD_MARKER;
}
if (iteratorEnum.equals(ShardIteratorType.LATEST)) {
log.info("Partition[%s] has no records at LATEST offset", partition.getPartitionId());
return KinesisSequenceNumber.UNREAD_LATEST;
}
// if we reach here, it usually means either the shard has no more records, or records have not been
// added to this shard
log.warn(
"timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard",
partition.getPartitionId(),
recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN"
);
// Even if there are records in the shard, they may not be returned on the first call to getRecords with TRIM_HORIZON
// Reference: https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html
// Section: GetRecords Returns Empty Records Array Even When There is Data in the Stream
if (iteratorEnum.equals(ShardIteratorType.TRIM_HORIZON)) {
log.info("Partition[%s] has no records at TRIM_HORIZON offset", partition.getPartitionId());
return KinesisSequenceNumber.UNREAD_TRIM_HORIZON;
}
log.warn("Could not fetch sequence number for Partition[%s]", partition.getPartitionId());
return null;
});
}

View File

@ -70,7 +70,6 @@ public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
tuningConfig.getRecordBufferSize(),
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber(),
tuningConfig.isUseListShards()

View File

@ -51,6 +51,16 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
*/
public static final String EXPIRED_MARKER = "EXPIRED";
/**
* Indicates that records have not been read from a shard which needs to be processed from sequence type: TRIM_HORIZON
*/
public static final String UNREAD_TRIM_HORIZON = "UNREAD_TRIM_HORIZON";
/**
* Indicates that records have not been read from a shard which needs to be processed from sequence type: LATEST
*/
public static final String UNREAD_LATEST = "UNREAD_LATEST";
/**
* this flag is used to indicate either END_OF_SHARD_MARKER, NO_END_SEQUENCE_NUMBER
* or EXPIRED_MARKER so that they can be properly compared
@ -62,14 +72,12 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive)
{
super(sequenceNumber, isExclusive);
if (END_OF_SHARD_MARKER.equals(sequenceNumber)
|| NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
|| EXPIRED_MARKER.equals(sequenceNumber)) {
isMaxSequenceNumber = true;
if (!isValidAWSKinesisSequence(sequenceNumber)) {
isMaxSequenceNumber = !isUnreadSequence(sequenceNumber);
this.intSequence = null;
} else {
isMaxSequenceNumber = false;
this.intSequence = new BigInteger(sequenceNumber);
this.isMaxSequenceNumber = false;
}
}
@ -93,6 +101,8 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
return !(END_OF_SHARD_MARKER.equals(sequenceNumber)
|| NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
|| EXPIRED_MARKER.equals(sequenceNumber)
|| UNREAD_TRIM_HORIZON.equals(sequenceNumber)
|| UNREAD_LATEST.equals(sequenceNumber)
);
}
@ -100,14 +110,50 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
public int compareTo(OrderedSequenceNumber<String> o)
{
KinesisSequenceNumber num = (KinesisSequenceNumber) o;
if (isUnread() && num.isUnread()) {
return 0;
} else if (isUnread()) {
return -1;
} else if (num.isUnread()) {
return 1;
}
if (isMaxSequenceNumber && num.isMaxSequenceNumber) {
return 0;
} else if (isMaxSequenceNumber) {
return 1;
} else if (num.isMaxSequenceNumber) {
return -1;
} else {
return this.intSequence.compareTo(new BigInteger(o.get()));
}
return this.intSequence.compareTo(new BigInteger(o.get()));
}
@Override
public boolean isAvailableWithEarliest(OrderedSequenceNumber<String> earliest)
{
if (isUnread()) {
return true;
}
return super.isAvailableWithEarliest(earliest);
}
@Override
public boolean isMoreToReadBeforeReadingRecord(OrderedSequenceNumber<String> end, boolean isEndOffsetExclusive)
{
// Kinesis sequence number checks are exclusive for AWS numeric sequences
// However, If a record is UNREAD and the end offset is finalized to be UNREAD, we have caught up. (inclusive)
if (isUnreadSequence(end.get())) {
return false;
}
return super.isMoreToReadBeforeReadingRecord(end, isEndOffsetExclusive);
}
public boolean isUnread()
{
return isUnreadSequence(get());
}
private boolean isUnreadSequence(String sequence)
{
return UNREAD_TRIM_HORIZON.equals(sequence) || UNREAD_LATEST.equals(sequence);
}
}

View File

@ -212,7 +212,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
taskTuningConfig.getRecordBufferSize(),
taskTuningConfig.getRecordBufferOfferTimeout(),
taskTuningConfig.getRecordBufferFullWait(),
taskTuningConfig.getFetchSequenceNumberTimeout(),
taskTuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber(),
spec.getSpec().getTuningConfig().isUseListShards()

View File

@ -80,7 +80,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
null,
null,
null,
null,
null
);
}
@ -110,7 +109,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
@JsonProperty("recordBufferSize") Integer recordBufferSize,
@JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
@JsonProperty("fetchSequenceNumberTimeout") Integer fetchSequenceNumberTimeout,
@JsonProperty("fetchThreads") Integer fetchThreads,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@ -142,7 +140,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
recordBufferSize,
recordBufferOfferTimeout,
recordBufferFullWait,
fetchSequenceNumberTimeout,
fetchThreads,
segmentWriteOutMediumFactory,
logParseExceptions,
@ -257,7 +254,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
", recordBufferSize=" + getRecordBufferSize() +
", recordBufferOfferTimeout=" + getRecordBufferOfferTimeout() +
", recordBufferFullWait=" + getRecordBufferFullWait() +
", fetchSequenceNumberTimeout=" + getFetchSequenceNumberTimeout() +
", fetchThreads=" + getFetchThreads() +
", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
", logParseExceptions=" + isLogParseExceptions() +
@ -293,7 +289,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
getRecordBufferSize(),
getRecordBufferOfferTimeout(),
getRecordBufferFullWait(),
getFetchSequenceNumberTimeout(),
getFetchThreads(),
getSegmentWriteOutMediumFactory(),
isLogParseExceptions(),

View File

@ -72,7 +72,6 @@ public class KinesisIndexTaskSerdeTest
null,
null,
null,
null,
null
);
private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig(

View File

@ -2941,7 +2941,6 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
null,
null,
null,
null,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,

View File

@ -79,7 +79,6 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(10000, config.getRecordBufferSize());
Assert.assertEquals(5000, config.getRecordBufferOfferTimeout());
Assert.assertEquals(5000, config.getRecordBufferFullWait());
Assert.assertEquals(20000, config.getFetchSequenceNumberTimeout());
Assert.assertNull(config.getFetchThreads());
Assert.assertFalse(config.isSkipSequenceNumberAvailabilityCheck());
Assert.assertFalse(config.isResetOffsetAutomatically());
@ -100,7 +99,6 @@ public class KinesisIndexTaskTuningConfigTest
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"recordBufferFullWait\": 500,\n"
+ " \"fetchSequenceNumberTimeout\": 6000,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"skipSequenceNumberAvailabilityCheck\": true,\n"
+ " \"fetchThreads\": 2,\n"
@ -128,7 +126,6 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(1000, config.getRecordBufferSize());
Assert.assertEquals(500, config.getRecordBufferOfferTimeout());
Assert.assertEquals(500, config.getRecordBufferFullWait());
Assert.assertEquals(6000, config.getFetchSequenceNumberTimeout());
Assert.assertEquals(2, (int) config.getFetchThreads());
Assert.assertTrue(config.isSkipSequenceNumberAvailabilityCheck());
Assert.assertFalse(config.isResetOffsetAutomatically());
@ -156,7 +153,6 @@ public class KinesisIndexTaskTuningConfigTest
1000,
1000,
500,
null,
42,
null,
false,
@ -216,7 +212,6 @@ public class KinesisIndexTaskTuningConfigTest
1000,
1000,
500,
null,
42,
null,
false,
@ -268,7 +263,6 @@ public class KinesisIndexTaskTuningConfigTest
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"recordBufferFullWait\": 500,\n"
+ " \"fetchSequenceNumberTimeout\": 6000,\n"
+ " \"resetOffsetAutomatically\": true,\n"
+ " \"skipSequenceNumberAvailabilityCheck\": true,\n"
+ " \"fetchThreads\": 2\n"
@ -309,7 +303,6 @@ public class KinesisIndexTaskTuningConfigTest
1000,
500,
500,
6000,
2,
null,
null,
@ -337,7 +330,6 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(1000, copy.getRecordBufferSize());
Assert.assertEquals(500, copy.getRecordBufferOfferTimeout());
Assert.assertEquals(500, copy.getRecordBufferFullWait());
Assert.assertEquals(6000, copy.getFetchSequenceNumberTimeout());
Assert.assertEquals(2, (int) copy.getFetchThreads());
Assert.assertFalse(copy.isSkipSequenceNumberAvailabilityCheck());
Assert.assertTrue(copy.isResetOffsetAutomatically());

View File

@ -64,6 +64,8 @@ import java.util.stream.Collectors;
import static org.apache.druid.indexing.kinesis.KinesisSequenceNumber.END_OF_SHARD_MARKER;
import static org.apache.druid.indexing.kinesis.KinesisSequenceNumber.EXPIRED_MARKER;
import static org.apache.druid.indexing.kinesis.KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER;
import static org.apache.druid.indexing.kinesis.KinesisSequenceNumber.UNREAD_LATEST;
import static org.apache.druid.indexing.kinesis.KinesisSequenceNumber.UNREAD_TRIM_HORIZON;
public class KinesisRecordSupplierTest extends EasyMockSupport
{
@ -225,7 +227,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
5,
true,
false
@ -285,7 +286,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
5,
true,
true
@ -380,7 +380,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
100,
true,
false
@ -469,7 +468,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
100,
true,
false
@ -531,7 +529,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
100,
true,
false
@ -611,7 +608,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
100,
true,
false
@ -680,7 +676,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
100,
true,
false
@ -716,7 +711,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
5,
true,
false
@ -780,7 +774,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
1,
true,
false
@ -874,7 +867,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
100,
true,
false
@ -899,20 +891,22 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
}
@Test
public void getLatestSequenceNumberWhenShardIsEmptyShouldReturnsNull()
public void getLatestSequenceNumberWhenShardIsEmptyShouldReturnUnreadToken()
{
KinesisRecordSupplier recordSupplier = getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper();
Assert.assertNull(recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
KinesisRecordSupplier recordSupplier = getSequenceNumberWhenNoRecordsHelperForOpenShard();
Assert.assertEquals(KinesisSequenceNumber.UNREAD_LATEST,
recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
verifyAll();
}
@Test
public void getEarliestSequenceNumberWhenShardIsEmptyShouldReturnsNull()
public void getEarliestSequenceNumberWhenShardIsEmptyShouldReturnUnreadToken()
{
KinesisRecordSupplier recordSupplier = getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper();
Assert.assertNull(recordSupplier.getEarliestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
KinesisRecordSupplier recordSupplier = getSequenceNumberWhenNoRecordsHelperForOpenShard();
Assert.assertEquals(KinesisSequenceNumber.UNREAD_TRIM_HORIZON,
recordSupplier.getEarliestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
verifyAll();
}
@ -949,7 +943,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
1000,
100,
true,
false
@ -958,24 +951,23 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals("0", recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
}
private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper()
private KinesisRecordSupplier getSequenceNumberWhenNoRecordsHelperForOpenShard()
{
EasyMock.expect(kinesis.getShardIterator(
EasyMock.eq(STREAM),
EasyMock.eq(SHARD_ID0),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult0).anyTimes();
getShardIteratorResult0).times(1);
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).times(1);
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 1000)))
.andReturn(getRecordsResult0)
.times(1, Integer.MAX_VALUE);
.times(1);
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(Collections.emptyList()).times(1, Integer.MAX_VALUE);
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(SHARD0_ITERATOR).times(1, Integer.MAX_VALUE);
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(0L).once();
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(Collections.emptyList()).times(1);
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(SHARD0_ITERATOR).times(1);
replayAll();
@ -988,7 +980,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
1000,
100,
true,
false
@ -1072,7 +1063,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
100,
true,
false
@ -1100,12 +1090,19 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(SHARDS_LAG_MILLIS_EMPTY, independentTimeLag);
// Verify that kinesis apis are not called for custom sequence numbers
for (String sequenceNum : Arrays.asList(NO_END_SEQUENCE_NUMBER, END_OF_SHARD_MARKER, EXPIRED_MARKER)) {
for (String sequenceNum : Arrays.asList(NO_END_SEQUENCE_NUMBER, END_OF_SHARD_MARKER, EXPIRED_MARKER,
UNREAD_LATEST, UNREAD_TRIM_HORIZON)) {
offsets = ImmutableMap.of(
SHARD_ID1, sequenceNum,
SHARD_ID0, sequenceNum
);
Assert.assertEquals(Collections.emptyMap(), recordSupplier.getPartitionsTimeLag(STREAM, offsets));
Map<String, Long> zeroOffsets = ImmutableMap.of(
SHARD_ID1, 0L,
SHARD_ID0, 0L
);
Assert.assertEquals(zeroOffsets, recordSupplier.getPartitionsTimeLag(STREAM, offsets));
}
verifyAll();
}
@ -1122,7 +1119,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
100,
5000,
5000,
60000,
5,
true,
false
@ -1150,16 +1146,80 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNonNullNextIterator));
}
private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId,
List<Record> expectedRecords, String expectedNextIterator)
@Test
public void testIsOffsetAvailable()
{
String shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString();
AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
recordsPerFetch,
0,
2,
false,
100,
5000,
5000,
5,
true,
false
);
StreamPartition<String> partition = new StreamPartition<>(STREAM, SHARD_ID0);
setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null,
ShardIteratorType.AT_SEQUENCE_NUMBER, "-1",
Collections.emptyList(), "whatever");
Record record0 = new Record().withSequenceNumber("5");
setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null,
ShardIteratorType.AT_SEQUENCE_NUMBER, "0",
Collections.singletonList(record0), "whatever");
Record record10 = new Record().withSequenceNumber("10");
setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null,
ShardIteratorType.AT_SEQUENCE_NUMBER, "10",
Collections.singletonList(record10), "whatever");
EasyMock.replay(mockKinesis);
Assert.assertTrue(target.isOffsetAvailable(partition, KinesisSequenceNumber.of(UNREAD_TRIM_HORIZON)));
Assert.assertFalse(target.isOffsetAvailable(partition, KinesisSequenceNumber.of(END_OF_SHARD_MARKER)));
Assert.assertFalse(target.isOffsetAvailable(partition, KinesisSequenceNumber.of("-1")));
Assert.assertFalse(target.isOffsetAvailable(partition, KinesisSequenceNumber.of("0")));
Assert.assertTrue(target.isOffsetAvailable(partition, KinesisSequenceNumber.of("10")));
}
private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId,
List<Record> records, String nextIterator)
{
setupMockKinesisForShardId(kinesis, shardId, 1, ShardIteratorType.TRIM_HORIZON, null, records, nextIterator);
}
private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId, Integer limit,
ShardIteratorType iteratorType, String sequenceNumber,
List<Record> records, String nextIterator)
{
String shardIteratorType = iteratorType.toString();
String shardIterator = "shardIterator" + shardId;
if (sequenceNumber != null) {
shardIterator += sequenceNumber;
}
GetShardIteratorResult shardIteratorResult = new GetShardIteratorResult().withShardIterator(shardIterator);
EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType)).andReturn(shardIteratorResult).once();
GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1);
GetRecordsResult result = new GetRecordsResult().withRecords(expectedRecords)
.withNextShardIterator(expectedNextIterator);
if (sequenceNumber == null) {
EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType))
.andReturn(shardIteratorResult)
.once();
} else {
EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType, sequenceNumber))
.andReturn(shardIteratorResult)
.once();
}
GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator)
.withLimit(limit);
GetRecordsResult result = new GetRecordsResult().withRecords(records)
.withNextShardIterator(nextIterator);
EasyMock.expect(kinesis.getRecords(request)).andReturn(result);
}
}

View File

@ -199,7 +199,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
5000,
null,
null,
null,
@ -831,6 +830,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(true)
.anyTimes();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -885,6 +887,15 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
// Any sequence number greater than or equal to 0 must be available
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.eq(KinesisSequenceNumber.of("101"))))
.andReturn(true)
.anyTimes();
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.eq(KinesisSequenceNumber.of("-1"))))
.andReturn(false)
.anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
@ -2758,6 +2769,19 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
// Only sequence numbers >= 300 are available
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.eq(KinesisSequenceNumber.of("400"))))
.andReturn(true)
.anyTimes();
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.eq(KinesisSequenceNumber.of("200"))))
.andReturn(false)
.anyTimes();
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.eq(KinesisSequenceNumber.of("100"))))
.andReturn(false)
.anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -3938,7 +3962,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
5000,
null,
null,
null,
@ -4073,6 +4096,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(true)
.anyTimes();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@ -4356,6 +4382,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(true)
.anyTimes();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@ -4789,6 +4818,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2)))
.andReturn("200").anyTimes();
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(true)
.anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
@ -5043,7 +5075,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
5000,
null,
null,
null,

View File

@ -56,7 +56,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
@JsonProperty("recordBufferSize") Integer recordBufferSize,
@JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
@JsonProperty("fetchSequenceNumberTimeout") Integer fetchSequenceNumberTimeout,
@JsonProperty("fetchThreads") Integer fetchThreads,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@ -86,7 +85,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
recordBufferSize,
recordBufferOfferTimeout,
recordBufferFullWait,
fetchSequenceNumberTimeout,
fetchThreads,
segmentWriteOutMediumFactory,
logParseExceptions,
@ -119,7 +117,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
base.getRecordBufferSize(),
base.getRecordBufferOfferTimeout(),
base.getRecordBufferFullWait(),
base.getFetchSequenceNumberTimeout(),
base.getFetchThreads(),
base.getSegmentWriteOutMediumFactory(),
base.isLogParseExceptions(),

View File

@ -1253,10 +1253,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final SequenceOffsetType endSequenceNumber
)
{
final int compareToEnd = createSequenceNumber(recordSequenceNumber)
.compareTo(createSequenceNumber(endSequenceNumber));
return isEndOffsetExclusive() ? compareToEnd < 0 : compareToEnd <= 0;
return createSequenceNumber(recordSequenceNumber).isMoreToReadBeforeReadingRecord(
createSequenceNumber(endSequenceNumber),
isEndOffsetExclusive()
);
}
/**

View File

@ -81,4 +81,24 @@ public abstract class OrderedSequenceNumber<SequenceOffsetType>
", isExclusive=" + isExclusive +
'}';
}
public boolean isAvailableWithEarliest(OrderedSequenceNumber<SequenceOffsetType> earliest)
{
return earliest.compareTo(this) <= 0;
}
/**
* Returns true if, given that we want to start reading from this sequence number and stop at the sequence number end,
* there is more left to read. Used in pre-read checks to determine if there is anything left to read.
*
* @param end the end offset of the partition for a given task
* @param isEndOffsetExclusive indicates if the TaskRunner considers the end offsets to be exclusive
* @return true if more records need to be read given that this is the current sequence number
*/
public boolean isMoreToReadBeforeReadingRecord(OrderedSequenceNumber<SequenceOffsetType> end,
boolean isEndOffsetExclusive)
{
final int compareToEnd = this.compareTo(end);
return isEndOffsetExclusive ? compareToEnd < 0 : compareToEnd <= 0;
}
}

View File

@ -105,6 +105,14 @@ public interface RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType
@Nullable
SequenceOffsetType getEarliestSequenceNumber(StreamPartition<PartitionIdType> partition);
/**
* Checks if a provided offset is still available for a given partition in the stream
* @param partition stream partition to check in
* @param offset offset to be checked
* @return availability of offset
*/
boolean isOffsetAvailable(StreamPartition<PartitionIdType> partition,
OrderedSequenceNumber<SequenceOffsetType> offset);
/**
* returns the sequence number of the next record

View File

@ -3916,9 +3916,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@NotNull SequenceOffsetType offsetFromMetadata
)
{
final SequenceOffsetType earliestOffset = getOffsetFromStreamForPartition(partition, true);
return earliestOffset != null
&& makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata)) <= 0;
StreamPartition<PartitionIdType> streamPartition = StreamPartition.of(ioConfig.getStream(), partition);
OrderedSequenceNumber<SequenceOffsetType> sequenceNumber = makeSequenceNumber(offsetFromMetadata);
recordSupplierLock.lock();
if (!recordSupplier.getAssignment().contains(streamPartition)) {
// this shouldn't happen, but in case it does...
throw new IllegalStateException("Record supplier does not match current known partitions");
}
try {
return recordSupplier.isOffsetAvailable(streamPartition, sequenceNumber);
}
finally {
recordSupplierLock.unlock();
}
}
protected void emitNoticeProcessTime(String noticeType, long timeInMillis)

View File

@ -44,6 +44,7 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.seekablestream.RecordSupplierInputSource;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.jackson.DefaultObjectMapper;
@ -1601,6 +1602,12 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
return null;
}
@Override
public boolean isOffsetAvailable(StreamPartition<Integer> partition, OrderedSequenceNumber<Long> offset)
{
return true;
}
@Override
public Long getPosition(StreamPartition<Integer> partition)
{

View File

@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.DateTimes;
@ -214,6 +215,12 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
throw new UnsupportedOperationException();
}
@Override
public boolean isOffsetAvailable(StreamPartition<Integer> partition, OrderedSequenceNumber<Integer> offset)
{
return true;
}
@Override
public Integer getPosition(StreamPartition<Integer> partition)
{

View File

@ -111,4 +111,9 @@ public class KinesisEventWriter implements StreamEventWriter
"Waiting for all Kinesis writes to be flushed"
);
}
protected KinesisProducer getKinesisProducer()
{
return kinesisProducer;
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import java.nio.ByteBuffer;
/**
* Writes only to a single shard
*/
public class KinesisSingleShardEventWriter extends KinesisEventWriter
{
public KinesisSingleShardEventWriter(String endpoint, boolean aggregate) throws Exception
{
super(endpoint, aggregate);
}
@Override
public void write(String streamName, byte[] event)
{
getKinesisProducer().addUserRecord(
streamName,
"0",
ByteBuffer.wrap(event)
);
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.KinesisSingleShardEventWriter;
import org.apache.druid.testing.utils.StreamEventWriter;
import javax.annotation.Nullable;
public abstract class AbstractKinesisIndexingServiceEmptyShardsTest extends AbstractKinesisIndexingServiceTest
{
private static final Logger LOG = new Logger(AbstractKinesisIndexingServiceEmptyShardsTest.class);
@Override
StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, @Nullable Boolean transactionEnabled)
throws Exception
{
if (transactionEnabled != null) {
LOG.warn(
"Kinesis event writer doesn't support transaction. Ignoring the given parameter transactionEnabled[%s]",
transactionEnabled
);
}
return new KinesisSingleShardEventWriter(config.getStreamEndpoint(), false);
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
/**
* Variant of ITKinesisIndexingSericeSerializedTest where event writer publishes to a single shard
* and there may be empty ones as a result
*/
@Test(groups = TestNGGroup.KINESIS_INDEX)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKinesisIndexingServiceEmptyShardsSerializedTest extends AbstractKinesisIndexingServiceEmptyShardsTest
{
@Override
public String getTestNamePrefix()
{
return "kinesis_emptyShards_serialized";
}
@BeforeClass
public void beforeClass() throws Exception
{
doBeforeClass();
}
/**
* This test must be run individually due to their resource consumption requirement (task slot, memory, etc.)
*/
@Test
public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
{
doTestIndexDataWithStartStopSupervisor(null);
}
/**
* This test must be run individually due to their resource consumption requirement (task slot, memory, etc.)
*/
@Test
public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
{
doTestIndexDataWithStreamReshardSplit(null);
}
/**
* This test must be run individually due to their resource consumption requirement (task slot, memory, etc.)
*/
@Test
public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
{
doTestIndexDataWithStreamReshardMerge();
}
}

View File

@ -1377,7 +1377,6 @@ export interface TuningConfig {
recordBufferSize?: number;
recordBufferOfferTimeout?: number;
recordBufferFullWait?: number;
fetchSequenceNumberTimeout?: number;
fetchThreads?: number;
}
@ -2023,21 +2022,6 @@ const TUNING_FORM_FIELDS: Field<IngestionSpec>[] = [
</>
),
},
{
name: 'spec.tuningConfig.fetchSequenceNumberTimeout',
type: 'number',
defaultValue: 60000,
defined: typeIs('kinesis'),
hideInMore: true,
info: (
<>
Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence
number for a shard. Kinesis will not return the latest sequence number if no data is
actively being written to that shard. In this case, this fetch call will repeatedly timeout
and retry until fresh data is written to the stream.
</>
),
},
{
name: 'spec.tuningConfig.fetchThreads',
type: 'number',