From 1f63b447c45cb852924d5d3101a1005b670a054b Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula <95239065+AmatyaAvadhanula@users.noreply.github.com> Date: Fri, 21 Jan 2022 10:15:51 +0530 Subject: [PATCH] Mitigate Kinesis stream LimitExceededException by using listShards API (#12161) Makes kinesis ingestion resilient to `LimitExceededException` caused by resharding. Replace `describeStream` with `listShards` (recommended) to get shard related info. `describeStream` has a limit (100) to the number of shards returned per call and a low default TPS limit of 10. `listShards` returns the info for at most 1000 shards and has a higher TPS limit of 100 as well. Key changed/added classes in this PR * `KinesisRecordSupplier` * `KinesisAdminClient` --- .../kinesis/KinesisRecordSupplier.java | 55 +++++++++--------- .../kinesis/KinesisRecordSupplierTest.java | 50 ++++++++-------- .../testing/utils/KinesisAdminClient.java | 57 +++++++++++++------ 3 files changed, 90 insertions(+), 72 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 64e3bad0486..c6b136298c2 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -26,24 +26,22 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.InvalidArgumentException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.amazonaws.services.kinesis.model.StreamDescription; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; import com.amazonaws.util.AwsHostNameUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Queues; import org.apache.druid.common.aws.AWSClientUtil; @@ -70,11 +68,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -663,34 +661,35 @@ public class KinesisRecordSupplier implements RecordSupplier getPartitionIds(String stream) { - return wrapExceptions( - () -> { - final Set retVal = new HashSet<>(); - DescribeStreamRequest request = new DescribeStreamRequest(); - request.setStreamName(stream); - - while (request != null) { - final DescribeStreamResult result = kinesis.describeStream(request); - final StreamDescription streamDescription = result.getStreamDescription(); - final List shards = streamDescription.getShards(); - - for (Shard shard : shards) { - retVal.add(shard.getShardId()); - } - - if (streamDescription.isHasMoreShards()) { - request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId()); - } else { - request = null; - } - } - + return wrapExceptions(() -> { + final Set retVal = new TreeSet<>(); + ListShardsRequest request = new ListShardsRequest().withStreamName(stream); + while (true) { + ListShardsResult result = kinesis.listShards(request); + retVal.addAll(result.getShards() + .stream() + .map(Shard::getShardId) + .collect(Collectors.toList()) + ); + String nextToken = result.getNextToken(); + if (nextToken == null) { return retVal; } - ); + request = new ListShardsRequest().withNextToken(nextToken); + } + }); } /** diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index ec8ab623ac7..dda0e2079b3 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -23,15 +23,14 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; -import com.amazonaws.services.kinesis.model.StreamDescription; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -146,14 +145,12 @@ public class KinesisRecordSupplierTest extends EasyMockSupport private static int recordsPerFetch; private static AmazonKinesis kinesis; - private static DescribeStreamResult describeStreamResult0; - private static DescribeStreamResult describeStreamResult1; + private static ListShardsResult listShardsResult0; + private static ListShardsResult listShardsResult1; private static GetShardIteratorResult getShardIteratorResult0; private static GetShardIteratorResult getShardIteratorResult1; private static GetRecordsResult getRecordsResult0; private static GetRecordsResult getRecordsResult1; - private static StreamDescription streamDescription0; - private static StreamDescription streamDescription1; private static Shard shard0; private static Shard shard1; private static KinesisRecordSupplier recordSupplier; @@ -162,14 +159,12 @@ public class KinesisRecordSupplierTest extends EasyMockSupport public void setupTest() { kinesis = createMock(AmazonKinesisClient.class); - describeStreamResult0 = createMock(DescribeStreamResult.class); - describeStreamResult1 = createMock(DescribeStreamResult.class); + listShardsResult0 = createMock(ListShardsResult.class); + listShardsResult1 = createMock(ListShardsResult.class); getShardIteratorResult0 = createMock(GetShardIteratorResult.class); getShardIteratorResult1 = createMock(GetShardIteratorResult.class); getRecordsResult0 = createMock(GetRecordsResult.class); getRecordsResult1 = createMock(GetRecordsResult.class); - streamDescription0 = createMock(StreamDescription.class); - streamDescription1 = createMock(StreamDescription.class); shard0 = createMock(Shard.class); shard1 = createMock(Shard.class); recordsPerFetch = 1; @@ -187,19 +182,17 @@ public class KinesisRecordSupplierTest extends EasyMockSupport @Test public void testSupplierSetup() { - final Capture capturedRequest = Capture.newInstance(); + final Capture capturedRequest0 = Capture.newInstance(); + final Capture capturedRequest1 = Capture.newInstance(); - EasyMock.expect(kinesis.describeStream(EasyMock.capture(capturedRequest))).andReturn(describeStreamResult0).once(); - EasyMock.expect(describeStreamResult0.getStreamDescription()).andReturn(streamDescription0).once(); - EasyMock.expect(streamDescription0.getShards()).andReturn(ImmutableList.of(shard0)).once(); - EasyMock.expect(streamDescription0.isHasMoreShards()).andReturn(true).once(); - EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).times(2); - EasyMock.expect(kinesis.describeStream(EasyMock.anyObject(DescribeStreamRequest.class))) - .andReturn(describeStreamResult1) - .once(); - EasyMock.expect(describeStreamResult1.getStreamDescription()).andReturn(streamDescription1).once(); - EasyMock.expect(streamDescription1.getShards()).andReturn(ImmutableList.of(shard1)).once(); - EasyMock.expect(streamDescription1.isHasMoreShards()).andReturn(false).once(); + EasyMock.expect(kinesis.listShards(EasyMock.capture(capturedRequest0))).andReturn(listShardsResult0).once(); + EasyMock.expect(listShardsResult0.getShards()).andReturn(ImmutableList.of(shard0)).once(); + String nextToken = "nextToken"; + EasyMock.expect(listShardsResult0.getNextToken()).andReturn(nextToken).once(); + EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).once(); + EasyMock.expect(kinesis.listShards(EasyMock.capture(capturedRequest1))).andReturn(listShardsResult1).once(); + EasyMock.expect(listShardsResult1.getShards()).andReturn(ImmutableList.of(shard1)).once(); + EasyMock.expect(listShardsResult1.getNextToken()).andReturn(null).once(); EasyMock.expect(shard1.getShardId()).andReturn(SHARD_ID1).once(); replayAll(); @@ -236,10 +229,13 @@ public class KinesisRecordSupplierTest extends EasyMockSupport verifyAll(); - final DescribeStreamRequest expectedRequest = new DescribeStreamRequest(); - expectedRequest.setStreamName(STREAM); - expectedRequest.setExclusiveStartShardId("0"); - Assert.assertEquals(expectedRequest, capturedRequest.getValue()); + final ListShardsRequest expectedRequest0 = new ListShardsRequest(); + expectedRequest0.setStreamName(STREAM); + Assert.assertEquals(expectedRequest0, capturedRequest0.getValue()); + + final ListShardsRequest expectedRequest1 = new ListShardsRequest(); + expectedRequest1.setNextToken(nextToken); + Assert.assertEquals(expectedRequest1, capturedRequest1.getValue()); } private static GetRecordsRequest generateGetRecordsReq(String shardIterator, int limit) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java index 7c8759ae0e3..53e328477a0 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java @@ -30,21 +30,27 @@ import com.amazonaws.services.kinesis.model.AddTagsToStreamResult; import com.amazonaws.services.kinesis.model.CreateStreamResult; import com.amazonaws.services.kinesis.model.DeleteStreamResult; import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ScalingType; +import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.StreamDescription; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.UpdateShardCountRequest; import com.amazonaws.services.kinesis.model.UpdateShardCountResult; import com.amazonaws.util.AwsHostNameUtils; +import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.ISE; import java.io.FileInputStream; +import java.util.Arrays; import java.util.Map; import java.util.Properties; +import java.util.Set; public class KinesisAdminClient implements StreamAdminClient { - private AmazonKinesis amazonKinesis; + private final AmazonKinesis amazonKinesis; public KinesisAdminClient(String endpoint) throws Exception { @@ -107,6 +113,9 @@ public class KinesisAdminClient implements StreamAdminClient public void updatePartitionCount(String streamName, int newShardCount, boolean blocksUntilStarted) { int originalShardCount = getStreamPartitionCount(streamName); + if (originalShardCount == newShardCount) { + return; + } UpdateShardCountRequest updateShardCountRequest = new UpdateShardCountRequest(); updateShardCountRequest.setStreamName(streamName); updateShardCountRequest.setTargetShardCount(newShardCount); @@ -119,13 +128,13 @@ public class KinesisAdminClient implements StreamAdminClient // Wait until the resharding started (or finished) ITRetryUtil.retryUntil( () -> { - StreamDescription streamDescription = getStreamDescription(streamName); - int updatedShardCount = getStreamShardCount(streamDescription); - return verifyStreamStatus(streamDescription, StreamStatus.UPDATING) || - (verifyStreamStatus(streamDescription, StreamStatus.ACTIVE) && updatedShardCount > originalShardCount); - }, - true, - 30, + int updatedShardCount = getStreamPartitionCount(streamName); + // Stream should be in active or updating state AND + // the number of shards must have increased irrespective of the value of newShardCount + return verifyStreamStatus(streamName, StreamStatus.ACTIVE, StreamStatus.UPDATING) + && updatedShardCount > originalShardCount; + }, true, + 300, // higher value to avoid exceeding kinesis TPS limit 30, "Kinesis stream resharding to start (or finished)" ); @@ -135,15 +144,13 @@ public class KinesisAdminClient implements StreamAdminClient @Override public boolean isStreamActive(String streamName) { - StreamDescription streamDescription = getStreamDescription(streamName); - return verifyStreamStatus(streamDescription, StreamStatus.ACTIVE); + return verifyStreamStatus(streamName, StreamStatus.ACTIVE); } @Override public int getStreamPartitionCount(String streamName) { - StreamDescription streamDescription = getStreamDescription(streamName); - return getStreamShardCount(streamDescription); + return listShards(streamName).size(); } @Override @@ -156,15 +163,31 @@ public class KinesisAdminClient implements StreamAdminClient return actualShardCount == oldShardCount + newShardCount; } - - private boolean verifyStreamStatus(StreamDescription streamDescription, StreamStatus streamStatusToCheck) + private Set listShards(String streamName) { - return streamStatusToCheck.toString().equals(streamDescription.getStreamStatus()); + ListShardsRequest listShardsRequest = new ListShardsRequest().withStreamName(streamName); + ImmutableSet.Builder shards = ImmutableSet.builder(); + while (true) { + ListShardsResult listShardsResult = amazonKinesis.listShards(listShardsRequest); + shards.addAll(listShardsResult.getShards()); + String nextToken = listShardsResult.getNextToken(); + if (nextToken == null) { + return shards.build(); + } + listShardsRequest = new ListShardsRequest().withNextToken(listShardsResult.getNextToken()); + } } - private int getStreamShardCount(StreamDescription streamDescription) + private boolean verifyStreamStatus(String streamName, StreamStatus... streamStatuses) { - return streamDescription.getShards().size(); + return Arrays.stream(streamStatuses) + .map(StreamStatus::toString) + .anyMatch(getStreamStatus(streamName)::equals); + } + + private String getStreamStatus(String streamName) + { + return getStreamDescription(streamName).getStreamStatus(); } private StreamDescription getStreamDescription(String streamName)