Handle internal kinesis sequence numbers when reporting lag (#10315)

* Handle internal kinesis sequence numbers when reporting lag

* add unit test
This commit is contained in:
Abhishek Agarwal 2020-08-26 23:57:37 +05:30 committed by GitHub
parent ab60661008
commit d4ac62f284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 42 additions and 32 deletions

View File

@ -718,9 +718,11 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
{
Map<String, Long> partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size());
for (Map.Entry<String, String> partitionOffset : currentOffsets.entrySet()) {
StreamPartition<String> partition = new StreamPartition<>(stream, partitionOffset.getKey());
long currentLag = getPartitionTimeLag(partition, partitionOffset.getValue());
partitionLag.put(partitionOffset.getKey(), currentLag);
if (KinesisSequenceNumber.isValidAWSKinesisSequence(partitionOffset.getValue())) {
StreamPartition<String> partition = new StreamPartition<>(stream, partitionOffset.getKey());
long currentLag = getPartitionTimeLag(partition, partitionOffset.getValue());
partitionLag.put(partitionOffset.getKey(), currentLag);
}
}
return partitionLag;
}

View File

@ -52,8 +52,8 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
public static final String EXPIRED_MARKER = "EXPIRED";
/**
* this flag is used to indicate either END_OF_SHARD_MARKER
* or NO_END_SEQUENCE_NUMBER so that they can be properly compared
* this flag is used to indicate either END_OF_SHARD_MARKER, NO_END_SEQUENCE_NUMBER
* or EXPIRED_MARKER so that they can be properly compared
* with other sequence numbers
*/
private final boolean isMaxSequenceNumber;
@ -62,7 +62,9 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive)
{
super(sequenceNumber, isExclusive);
if (END_OF_SHARD_MARKER.equals(sequenceNumber) || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) {
if (END_OF_SHARD_MARKER.equals(sequenceNumber)
|| NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
|| EXPIRED_MARKER.equals(sequenceNumber)) {
isMaxSequenceNumber = true;
this.intSequence = null;
} else {
@ -81,6 +83,19 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber<String>
return new KinesisSequenceNumber(sequenceNumber, isExclusive);
}
/**
* Checks whether the sequence number is recognized by kinesis client library
* @param sequenceNumber
* @return
*/
public static boolean isValidAWSKinesisSequence(String sequenceNumber)
{
return !(END_OF_SHARD_MARKER.equals(sequenceNumber)
|| NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
|| EXPIRED_MARKER.equals(sequenceNumber)
);
}
@Override
public int compareTo(OrderedSequenceNumber<String> o)
{

View File

@ -417,19 +417,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
);
}
@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> closedPartitionIds
)
{
log.info("Marking closed shards in metadata: " + closedPartitionIds);
return createDataSourceMetadataWithClosedOrExpiredPartitions(
currentMetadata,
closedPartitionIds,
KinesisSequenceNumber.END_OF_SHARD_MARKER
);
}
private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> terminatedPartitionIds,

View File

@ -47,12 +47,17 @@ import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.druid.indexing.kinesis.KinesisSequenceNumber.END_OF_SHARD_MARKER;
import static org.apache.druid.indexing.kinesis.KinesisSequenceNumber.EXPIRED_MARKER;
import static org.apache.druid.indexing.kinesis.KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER;
public class KinesisRecordSupplierTest extends EasyMockSupport
{
private static final String STREAM = "stream";
@ -237,7 +242,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
{
return records.stream()
.filter(x -> !x.getSequenceNumber()
.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER))
.equals(END_OF_SHARD_MARKER))
.collect(Collectors.toList());
}
@ -804,14 +809,14 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
EasyMock.anyString(),
EasyMock.anyString()
EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull())
)).andReturn(getShardIteratorResult0).anyTimes();
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID1),
EasyMock.anyString(),
EasyMock.anyString()
EasyMock.or(EasyMock.matches("\\d+"), EasyMock.isNull())
)).andReturn(getShardIteratorResult1).anyTimes();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
@ -864,12 +869,21 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag);
Map<String, String> offsts = ImmutableMap.of(
Map<String, String> offsets = ImmutableMap.of(
SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(),
SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber()
);
Map<String, Long> independentTimeLag = recordSupplier.getPartitionsTimeLag(STREAM, offsts);
Map<String, Long> independentTimeLag = recordSupplier.getPartitionsTimeLag(STREAM, offsets);
Assert.assertEquals(SHARDS_LAG_MILLIS, independentTimeLag);
// Verify that kinesis apis are not called for custom sequence numbers
for (String sequenceNum : Arrays.asList(NO_END_SEQUENCE_NUMBER, END_OF_SHARD_MARKER, EXPIRED_MARKER)) {
offsets = ImmutableMap.of(
SHARD_ID1, sequenceNum,
SHARD_ID0, sequenceNum
);
Assert.assertEquals(Collections.emptyMap(), recordSupplier.getPartitionsTimeLag(STREAM, offsets));
}
verifyAll();
}

View File

@ -2202,14 +2202,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
throw new UnsupportedOperationException("This supervisor type does not support partition expiration.");
}
protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithClosedPartitions(
SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata,
Set<PartitionIdType> closedPartitionIds
)
{
throw new UnsupportedOperationException("This supervisor type does not support partition closing.");
}
/**
* Perform a sanity check on the datasource metadata returned by
* {@link #createDataSourceMetadataWithExpiredPartitions}.