diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 63d2e9a6a03..a22c33bcfda 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -300,7 +300,6 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param |`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)| |`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for space to become available in the buffer before timing out.| no (default == 5000)| |`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again.|no (default == 5000)| -|`fetchSequenceNumberTimeout`|Integer|Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence number for a shard. Kinesis will not return the latest sequence number if no data is actively being written to that shard. In this case, this fetch call will repeatedly timeout and retry until fresh data is written to the stream.|no (default == 60000)| |`fetchThreads`|Integer|Size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|no (default == procs * 2, where "procs" is the number of processors on the server that the task is running on) | |`segmentWriteOutMediumFactory`|Object|Segment write-out medium to use when creating segments. See below for more information.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)| |`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.| no (default == P2147483647D)| diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index fe32ffe5ffb..d64c0a64c5a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -26,6 +26,7 @@ import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; @@ -157,6 +158,14 @@ public class KafkaRecordSupplier implements RecordSupplier partition, OrderedSequenceNumber offset) + { + final Long earliestOffset = getEarliestSequenceNumber(partition); + return earliestOffset != null + && offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset)); + } + @Override public Long getPosition(StreamPartition partition) { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index 48f5d26e243..a0b31f3116f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -106,7 +106,6 @@ public class KinesisIndexTask extends SeekableStreamIndexTask currOffsets = getCurrentOffsets(); for (final StreamPartition streamPartition : assignment) { String sequence = currOffsets.get(streamPartition.getPartitionId()); - String earliestSequenceNumber = recordSupplier.getEarliestSequenceNumber(streamPartition); - if (earliestSequenceNumber == null - || createSequenceNumber(earliestSequenceNumber).compareTo(createSequenceNumber(sequence)) > 0) { + if (!recordSupplier.isOffsetAvailable(streamPartition, KinesisSequenceNumber.of(sequence))) { if (task.getTuningConfig().isResetOffsetAutomatically()) { log.info("Attempting to reset sequences automatically for all partitions"); try { @@ -144,10 +142,9 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner records = RetryUtils.retry( + () -> kinesis.getRecords(getRecordsRequest) + .getRecords(), + (throwable) -> { + if (throwable instanceof ProvisionedThroughputExceededException) { + log.warn( + throwable, + "encountered ProvisionedThroughputExceededException while fetching records, this means " + + "that the request rate for the stream is too high, or the requested data is too large for " + + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " + + "the number of shards to increase throughput." + ); + return true; + } + if (throwable instanceof AmazonClientException) { + AmazonClientException ase = (AmazonClientException) throwable; + return AWSClientUtil.isClientExceptionRecoverable(ase); + } + return false; + }, + GET_SEQUENCE_NUMBER_RETRY_COUNT + ); + return !records.isEmpty() && records.get(0).getSequenceNumber().equals(kinesisSequence.get()); + }); + } + public Set getShards(String stream) { if (useListShards) { @@ -743,11 +816,12 @@ public class KinesisRecordSupplier implements RecordSupplier partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); for (Map.Entry partitionOffset : currentOffsets.entrySet()) { + StreamPartition partition = new StreamPartition<>(stream, partitionOffset.getKey()); + long currentLag = 0L; if (KinesisSequenceNumber.isValidAWSKinesisSequence(partitionOffset.getValue())) { - StreamPartition partition = new StreamPartition<>(stream, partitionOffset.getKey()); - long currentLag = getPartitionTimeLag(partition, partitionOffset.getValue()); - partitionLag.put(partitionOffset.getKey(), currentLag); + currentLag = getPartitionTimeLag(partition, partitionOffset.getValue()); } + partitionLag.put(partitionOffset.getKey(), currentLag); } return partitionLag; } @@ -832,6 +906,11 @@ public class KinesisRecordSupplier implements RecordSupplier partition, ShardIteratorType iteratorEnum) @@ -840,62 +919,60 @@ public class KinesisRecordSupplier implements RecordSupplier kinesis.getRecords(request), + (throwable) -> { + if (throwable instanceof ProvisionedThroughputExceededException) { + log.warn( + throwable, + "encountered ProvisionedThroughputExceededException while fetching records, this means " + + "that the request rate for the stream is too high, or the requested data is too large for " + + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " + + "the number of shards to increase throughput." + ); + return true; + } + if (throwable instanceof AmazonClientException) { + AmazonClientException ase = (AmazonClientException) throwable; + return AWSClientUtil.isClientExceptionRecoverable(ase); + } + return false; + }, + GET_SEQUENCE_NUMBER_RETRY_COUNT + ); - if (closed) { - log.info("KinesisRecordSupplier closed while fetching sequenceNumber"); - return null; - } - final String currentShardIterator = shardIterator; - final GetRecordsRequest request = new GetRecordsRequest().withShardIterator(currentShardIterator) - .withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT); - recordsResult = RetryUtils.retry( - () -> kinesis.getRecords(request), - (throwable) -> { - if (throwable instanceof ProvisionedThroughputExceededException) { - log.warn( - throwable, - "encountered ProvisionedThroughputExceededException while fetching records, this means " - + "that the request rate for the stream is too high, or the requested data is too large for " - + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " - + "the number of shards to increase throughput." - ); - return true; - } - if (throwable instanceof AmazonClientException) { - AmazonClientException ase = (AmazonClientException) throwable; - return AWSClientUtil.isClientExceptionRecoverable(ase); - } - return false; - }, - GET_SEQUENCE_NUMBER_RETRY_COUNT - ); + List records = recordsResult.getRecords(); - List records = recordsResult.getRecords(); - - if (!records.isEmpty()) { - return records.get(0).getSequenceNumber(); - } - - shardIterator = recordsResult.getNextShardIterator(); + if (!records.isEmpty()) { + return records.get(0).getSequenceNumber(); } - if (shardIterator == null) { - log.info("Partition[%s] returned a null shard iterator, is the shard closed?", partition.getPartitionId()); + if (recordsResult.getNextShardIterator() == null) { + log.info("Partition[%s] is closed and empty", partition.getPartitionId()); return KinesisSequenceNumber.END_OF_SHARD_MARKER; } + if (iteratorEnum.equals(ShardIteratorType.LATEST)) { + log.info("Partition[%s] has no records at LATEST offset", partition.getPartitionId()); + return KinesisSequenceNumber.UNREAD_LATEST; + } - // if we reach here, it usually means either the shard has no more records, or records have not been - // added to this shard - log.warn( - "timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard", - partition.getPartitionId(), - recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN" - ); + // Even if there are records in the shard, they may not be returned on the first call to getRecords with TRIM_HORIZON + // Reference: https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html + // Section: GetRecords Returns Empty Records Array Even When There is Data in the Stream + if (iteratorEnum.equals(ShardIteratorType.TRIM_HORIZON)) { + log.info("Partition[%s] has no records at TRIM_HORIZON offset", partition.getPartitionId()); + return KinesisSequenceNumber.UNREAD_TRIM_HORIZON; + } + + log.warn("Could not fetch sequence number for Partition[%s]", partition.getPartitionId()); return null; }); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java index 278256553c0..b4a84ef1056 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java @@ -70,7 +70,6 @@ public class KinesisSamplerSpec extends SeekableStreamSamplerSpec tuningConfig.getRecordBufferSize(), tuningConfig.getRecordBufferOfferTimeout(), tuningConfig.getRecordBufferFullWait(), - tuningConfig.getFetchSequenceNumberTimeout(), tuningConfig.getMaxRecordsPerPoll(), ioConfig.isUseEarliestSequenceNumber(), tuningConfig.isUseListShards() diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java index 46b6b7385b3..ce5025238e6 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java @@ -51,6 +51,16 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber */ public static final String EXPIRED_MARKER = "EXPIRED"; + /** + * Indicates that records have not been read from a shard which needs to be processed from sequence type: TRIM_HORIZON + */ + public static final String UNREAD_TRIM_HORIZON = "UNREAD_TRIM_HORIZON"; + + /** + * Indicates that records have not been read from a shard which needs to be processed from sequence type: LATEST + */ + public static final String UNREAD_LATEST = "UNREAD_LATEST"; + /** * this flag is used to indicate either END_OF_SHARD_MARKER, NO_END_SEQUENCE_NUMBER * or EXPIRED_MARKER so that they can be properly compared @@ -62,14 +72,12 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive) { super(sequenceNumber, isExclusive); - if (END_OF_SHARD_MARKER.equals(sequenceNumber) - || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber) - || EXPIRED_MARKER.equals(sequenceNumber)) { - isMaxSequenceNumber = true; + if (!isValidAWSKinesisSequence(sequenceNumber)) { + isMaxSequenceNumber = !isUnreadSequence(sequenceNumber); this.intSequence = null; } else { - isMaxSequenceNumber = false; this.intSequence = new BigInteger(sequenceNumber); + this.isMaxSequenceNumber = false; } } @@ -93,6 +101,8 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber return !(END_OF_SHARD_MARKER.equals(sequenceNumber) || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber) || EXPIRED_MARKER.equals(sequenceNumber) + || UNREAD_TRIM_HORIZON.equals(sequenceNumber) + || UNREAD_LATEST.equals(sequenceNumber) ); } @@ -100,14 +110,50 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber public int compareTo(OrderedSequenceNumber o) { KinesisSequenceNumber num = (KinesisSequenceNumber) o; + if (isUnread() && num.isUnread()) { + return 0; + } else if (isUnread()) { + return -1; + } else if (num.isUnread()) { + return 1; + } if (isMaxSequenceNumber && num.isMaxSequenceNumber) { return 0; } else if (isMaxSequenceNumber) { return 1; } else if (num.isMaxSequenceNumber) { return -1; - } else { - return this.intSequence.compareTo(new BigInteger(o.get())); } + return this.intSequence.compareTo(new BigInteger(o.get())); + } + + @Override + public boolean isAvailableWithEarliest(OrderedSequenceNumber earliest) + { + if (isUnread()) { + return true; + } + return super.isAvailableWithEarliest(earliest); + } + + @Override + public boolean isMoreToReadBeforeReadingRecord(OrderedSequenceNumber end, boolean isEndOffsetExclusive) + { + // Kinesis sequence number checks are exclusive for AWS numeric sequences + // However, If a record is UNREAD and the end offset is finalized to be UNREAD, we have caught up. (inclusive) + if (isUnreadSequence(end.get())) { + return false; + } + return super.isMoreToReadBeforeReadingRecord(end, isEndOffsetExclusive); + } + + public boolean isUnread() + { + return isUnreadSequence(get()); + } + + private boolean isUnreadSequence(String sequence) + { + return UNREAD_TRIM_HORIZON.equals(sequence) || UNREAD_LATEST.equals(sequence); } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index e8ee5a46c2d..e506893034b 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -212,7 +212,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor zeroOffsets = ImmutableMap.of( + SHARD_ID1, 0L, + SHARD_ID0, 0L + ); + + Assert.assertEquals(zeroOffsets, recordSupplier.getPartitionsTimeLag(STREAM, offsets)); } verifyAll(); } @@ -1122,7 +1119,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport 100, 5000, 5000, - 60000, 5, true, false @@ -1150,16 +1146,80 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNonNullNextIterator)); } - private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId, - List expectedRecords, String expectedNextIterator) + @Test + public void testIsOffsetAvailable() { - String shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString(); + AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class); + KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis, + recordsPerFetch, + 0, + 2, + false, + 100, + 5000, + 5000, + 5, + true, + false + ); + StreamPartition partition = new StreamPartition<>(STREAM, SHARD_ID0); + + setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null, + ShardIteratorType.AT_SEQUENCE_NUMBER, "-1", + Collections.emptyList(), "whatever"); + + Record record0 = new Record().withSequenceNumber("5"); + setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null, + ShardIteratorType.AT_SEQUENCE_NUMBER, "0", + Collections.singletonList(record0), "whatever"); + + Record record10 = new Record().withSequenceNumber("10"); + setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null, + ShardIteratorType.AT_SEQUENCE_NUMBER, "10", + Collections.singletonList(record10), "whatever"); + + EasyMock.replay(mockKinesis); + + Assert.assertTrue(target.isOffsetAvailable(partition, KinesisSequenceNumber.of(UNREAD_TRIM_HORIZON))); + + Assert.assertFalse(target.isOffsetAvailable(partition, KinesisSequenceNumber.of(END_OF_SHARD_MARKER))); + + Assert.assertFalse(target.isOffsetAvailable(partition, KinesisSequenceNumber.of("-1"))); + + Assert.assertFalse(target.isOffsetAvailable(partition, KinesisSequenceNumber.of("0"))); + + Assert.assertTrue(target.isOffsetAvailable(partition, KinesisSequenceNumber.of("10"))); + } + + private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId, + List records, String nextIterator) + { + setupMockKinesisForShardId(kinesis, shardId, 1, ShardIteratorType.TRIM_HORIZON, null, records, nextIterator); + } + + private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId, Integer limit, + ShardIteratorType iteratorType, String sequenceNumber, + List records, String nextIterator) + { + String shardIteratorType = iteratorType.toString(); String shardIterator = "shardIterator" + shardId; + if (sequenceNumber != null) { + shardIterator += sequenceNumber; + } GetShardIteratorResult shardIteratorResult = new GetShardIteratorResult().withShardIterator(shardIterator); - EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType)).andReturn(shardIteratorResult).once(); - GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1); - GetRecordsResult result = new GetRecordsResult().withRecords(expectedRecords) - .withNextShardIterator(expectedNextIterator); + if (sequenceNumber == null) { + EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType)) + .andReturn(shardIteratorResult) + .once(); + } else { + EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType, sequenceNumber)) + .andReturn(shardIteratorResult) + .once(); + } + GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator) + .withLimit(limit); + GetRecordsResult result = new GetRecordsResult().withRecords(records) + .withNextShardIterator(nextIterator); EasyMock.expect(kinesis.getRecords(request)).andReturn(result); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 8112de3c901..cf3e9d361e9 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -199,7 +199,6 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, - 5000, null, null, null, @@ -831,6 +830,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(true) + .anyTimes(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -885,6 +887,15 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + // Any sequence number greater than or equal to 0 must be available + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), + EasyMock.eq(KinesisSequenceNumber.of("101")))) + .andReturn(true) + .anyTimes(); + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), + EasyMock.eq(KinesisSequenceNumber.of("-1")))) + .andReturn(false) + .anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); @@ -2758,6 +2769,19 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + // Only sequence numbers >= 300 are available + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), + EasyMock.eq(KinesisSequenceNumber.of("400")))) + .andReturn(true) + .anyTimes(); + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), + EasyMock.eq(KinesisSequenceNumber.of("200")))) + .andReturn(false) + .anyTimes(); + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), + EasyMock.eq(KinesisSequenceNumber.of("100")))) + .andReturn(false) + .anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3938,7 +3962,6 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, - 5000, null, null, null, @@ -4073,6 +4096,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(true) + .anyTimes(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -4356,6 +4382,9 @@ public class KinesisSupervisorTest extends EasyMockSupport supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(true) + .anyTimes(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4789,6 +4818,9 @@ public class KinesisSupervisorTest extends EasyMockSupport EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new StreamPartition<>(STREAM, SHARD_ID2))) .andReturn("200").anyTimes(); + EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(true) + .anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); @@ -5043,7 +5075,6 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, - 5000, null, null, null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index ed3a20cf20f..c6ce08ab2ac 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -56,7 +56,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu @JsonProperty("recordBufferSize") Integer recordBufferSize, @JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout, @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait, - @JsonProperty("fetchSequenceNumberTimeout") Integer fetchSequenceNumberTimeout, @JsonProperty("fetchThreads") Integer fetchThreads, @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @@ -86,7 +85,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu recordBufferSize, recordBufferOfferTimeout, recordBufferFullWait, - fetchSequenceNumberTimeout, fetchThreads, segmentWriteOutMediumFactory, logParseExceptions, @@ -119,7 +117,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu base.getRecordBufferSize(), base.getRecordBufferOfferTimeout(), base.getRecordBufferFullWait(), - base.getFetchSequenceNumberTimeout(), base.getFetchThreads(), base.getSegmentWriteOutMediumFactory(), base.isLogParseExceptions(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index dbc28573367..cc6d8e61e6a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1253,10 +1253,10 @@ public abstract class SeekableStreamIndexTaskRunner ", isExclusive=" + isExclusive + '}'; } + + public boolean isAvailableWithEarliest(OrderedSequenceNumber earliest) + { + return earliest.compareTo(this) <= 0; + } + + /** + * Returns true if, given that we want to start reading from this sequence number and stop at the sequence number end, + * there is more left to read. Used in pre-read checks to determine if there is anything left to read. + * + * @param end the end offset of the partition for a given task + * @param isEndOffsetExclusive indicates if the TaskRunner considers the end offsets to be exclusive + * @return true if more records need to be read given that this is the current sequence number + */ + public boolean isMoreToReadBeforeReadingRecord(OrderedSequenceNumber end, + boolean isEndOffsetExclusive) + { + final int compareToEnd = this.compareTo(end); + return isEndOffsetExclusive ? compareToEnd < 0 : compareToEnd <= 0; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index 7487892b493..df2b5c943b6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -105,6 +105,14 @@ public interface RecordSupplier partition); + /** + * Checks if a provided offset is still available for a given partition in the stream + * @param partition stream partition to check in + * @param offset offset to be checked + * @return availability of offset + */ + boolean isOffsetAvailable(StreamPartition partition, + OrderedSequenceNumber offset); /** * returns the sequence number of the next record diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index e4445f20a2e..59526028a80 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3916,9 +3916,19 @@ public abstract class SeekableStreamSupervisor streamPartition = StreamPartition.of(ioConfig.getStream(), partition); + OrderedSequenceNumber sequenceNumber = makeSequenceNumber(offsetFromMetadata); + recordSupplierLock.lock(); + if (!recordSupplier.getAssignment().contains(streamPartition)) { + // this shouldn't happen, but in case it does... + throw new IllegalStateException("Record supplier does not match current known partitions"); + } + try { + return recordSupplier.isOffsetAvailable(streamPartition, sequenceNumber); + } + finally { + recordSupplierLock.unlock(); + } } protected void emitNoticeProcessTime(String noticeType, long timeInMillis) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index 89a4642d099..8244f3dcc45 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -44,6 +44,7 @@ import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.seekablestream.RecordSupplierInputSource; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.jackson.DefaultObjectMapper; @@ -1601,6 +1602,12 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest return null; } + @Override + public boolean isOffsetAvailable(StreamPartition partition, OrderedSequenceNumber offset) + { + return true; + } + @Override public Long getPosition(StreamPartition partition) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java index 036f230532c..eb3a0835274 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java @@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.DateTimes; @@ -214,6 +215,12 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest throw new UnsupportedOperationException(); } + @Override + public boolean isOffsetAvailable(StreamPartition partition, OrderedSequenceNumber offset) + { + return true; + } + @Override public Integer getPosition(StreamPartition partition) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java index f0c66fcb8b9..f27e016a3c5 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java @@ -111,4 +111,9 @@ public class KinesisEventWriter implements StreamEventWriter "Waiting for all Kinesis writes to be flushed" ); } + + protected KinesisProducer getKinesisProducer() + { + return kinesisProducer; + } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisSingleShardEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisSingleShardEventWriter.java new file mode 100644 index 00000000000..31f437cc4fd --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisSingleShardEventWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import java.nio.ByteBuffer; + +/** + * Writes only to a single shard + */ +public class KinesisSingleShardEventWriter extends KinesisEventWriter +{ + + public KinesisSingleShardEventWriter(String endpoint, boolean aggregate) throws Exception + { + super(endpoint, aggregate); + } + + @Override + public void write(String streamName, byte[] event) + { + getKinesisProducer().addUserRecord( + streamName, + "0", + ByteBuffer.wrap(event) + ); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceEmptyShardsTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceEmptyShardsTest.java new file mode 100644 index 00000000000..3228842ec13 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceEmptyShardsTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.utils.KinesisSingleShardEventWriter; +import org.apache.druid.testing.utils.StreamEventWriter; + +import javax.annotation.Nullable; + +public abstract class AbstractKinesisIndexingServiceEmptyShardsTest extends AbstractKinesisIndexingServiceTest +{ + private static final Logger LOG = new Logger(AbstractKinesisIndexingServiceEmptyShardsTest.class); + + @Override + StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, @Nullable Boolean transactionEnabled) + throws Exception + { + if (transactionEnabled != null) { + LOG.warn( + "Kinesis event writer doesn't support transaction. Ignoring the given parameter transactionEnabled[%s]", + transactionEnabled + ); + } + return new KinesisSingleShardEventWriter(config.getStreamEndpoint(), false); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceEmptyShardsSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceEmptyShardsSerializedTest.java new file mode 100644 index 00000000000..ef005b2d5ca --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceEmptyShardsSerializedTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.tests.TestNGGroup; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +/** + * Variant of ITKinesisIndexingSericeSerializedTest where event writer publishes to a single shard + * and there may be empty ones as a result + */ +@Test(groups = TestNGGroup.KINESIS_INDEX) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKinesisIndexingServiceEmptyShardsSerializedTest extends AbstractKinesisIndexingServiceEmptyShardsTest +{ + @Override + public String getTestNamePrefix() + { + return "kinesis_emptyShards_serialized"; + } + + @BeforeClass + public void beforeClass() throws Exception + { + doBeforeClass(); + } + + /** + * This test must be run individually due to their resource consumption requirement (task slot, memory, etc.) + */ + @Test + public void testKinesisIndexDataWithStartStopSupervisor() throws Exception + { + doTestIndexDataWithStartStopSupervisor(null); + } + + /** + * This test must be run individually due to their resource consumption requirement (task slot, memory, etc.) + */ + @Test + public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception + { + doTestIndexDataWithStreamReshardSplit(null); + } + + /** + * This test must be run individually due to their resource consumption requirement (task slot, memory, etc.) + */ + @Test + public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception + { + doTestIndexDataWithStreamReshardMerge(); + } +} diff --git a/web-console/src/druid-models/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec.tsx index fd7a92a1b4b..29881dfe0a2 100644 --- a/web-console/src/druid-models/ingestion-spec.tsx +++ b/web-console/src/druid-models/ingestion-spec.tsx @@ -1377,7 +1377,6 @@ export interface TuningConfig { recordBufferSize?: number; recordBufferOfferTimeout?: number; recordBufferFullWait?: number; - fetchSequenceNumberTimeout?: number; fetchThreads?: number; } @@ -2023,21 +2022,6 @@ const TUNING_FORM_FIELDS: Field[] = [ ), }, - { - name: 'spec.tuningConfig.fetchSequenceNumberTimeout', - type: 'number', - defaultValue: 60000, - defined: typeIs('kinesis'), - hideInMore: true, - info: ( - <> - Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence - number for a shard. Kinesis will not return the latest sequence number if no data is - actively being written to that shard. In this case, this fetch call will repeatedly timeout - and retry until fresh data is written to the stream. - - ), - }, { name: 'spec.tuningConfig.fetchThreads', type: 'number',