mirror of https://github.com/apache/druid.git
Mitigate Kinesis stream LimitExceededException by using listShards API (#12161)
Makes kinesis ingestion resilient to `LimitExceededException` caused by resharding. Replace `describeStream` with `listShards` (recommended) to get shard related info. `describeStream` has a limit (100) to the number of shards returned per call and a low default TPS limit of 10. `listShards` returns the info for at most 1000 shards and has a higher TPS limit of 100 as well. Key changed/added classes in this PR * `KinesisRecordSupplier` * `KinesisAdminClient`
This commit is contained in:
parent
376d7c069d
commit
1f63b447c4
|
@ -26,24 +26,22 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
|
|||
import com.amazonaws.client.builder.AwsClientBuilder;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
|
||||
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
|
||||
import com.amazonaws.services.kinesis.model.ListShardsRequest;
|
||||
import com.amazonaws.services.kinesis.model.ListShardsResult;
|
||||
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||
import com.amazonaws.services.kinesis.model.StreamDescription;
|
||||
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
|
||||
import com.amazonaws.util.AwsHostNameUtils;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Queues;
|
||||
import org.apache.druid.common.aws.AWSClientUtil;
|
||||
|
@ -70,11 +68,11 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -663,34 +661,35 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
|
|||
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use the API listShards which is the recommended way instead of describeStream
|
||||
* listShards can return 1000 shards per call and has a limit of 100TPS
|
||||
* This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
|
||||
*
|
||||
* @param stream name of stream
|
||||
*
|
||||
* @return Set of Shard ids
|
||||
*/
|
||||
@Override
|
||||
public Set<String> getPartitionIds(String stream)
|
||||
{
|
||||
return wrapExceptions(
|
||||
() -> {
|
||||
final Set<String> retVal = new HashSet<>();
|
||||
DescribeStreamRequest request = new DescribeStreamRequest();
|
||||
request.setStreamName(stream);
|
||||
|
||||
while (request != null) {
|
||||
final DescribeStreamResult result = kinesis.describeStream(request);
|
||||
final StreamDescription streamDescription = result.getStreamDescription();
|
||||
final List<Shard> shards = streamDescription.getShards();
|
||||
|
||||
for (Shard shard : shards) {
|
||||
retVal.add(shard.getShardId());
|
||||
}
|
||||
|
||||
if (streamDescription.isHasMoreShards()) {
|
||||
request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
|
||||
} else {
|
||||
request = null;
|
||||
}
|
||||
}
|
||||
|
||||
return wrapExceptions(() -> {
|
||||
final Set<String> retVal = new TreeSet<>();
|
||||
ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
|
||||
while (true) {
|
||||
ListShardsResult result = kinesis.listShards(request);
|
||||
retVal.addAll(result.getShards()
|
||||
.stream()
|
||||
.map(Shard::getShardId)
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
String nextToken = result.getNextToken();
|
||||
if (nextToken == null) {
|
||||
return retVal;
|
||||
}
|
||||
);
|
||||
request = new ListShardsRequest().withNextToken(nextToken);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,15 +23,14 @@ import com.amazonaws.AmazonClientException;
|
|||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesis;
|
||||
import com.amazonaws.services.kinesis.AmazonKinesisClient;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
|
||||
import com.amazonaws.services.kinesis.model.ListShardsRequest;
|
||||
import com.amazonaws.services.kinesis.model.ListShardsResult;
|
||||
import com.amazonaws.services.kinesis.model.Record;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||
import com.amazonaws.services.kinesis.model.StreamDescription;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -146,14 +145,12 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
|
|||
|
||||
private static int recordsPerFetch;
|
||||
private static AmazonKinesis kinesis;
|
||||
private static DescribeStreamResult describeStreamResult0;
|
||||
private static DescribeStreamResult describeStreamResult1;
|
||||
private static ListShardsResult listShardsResult0;
|
||||
private static ListShardsResult listShardsResult1;
|
||||
private static GetShardIteratorResult getShardIteratorResult0;
|
||||
private static GetShardIteratorResult getShardIteratorResult1;
|
||||
private static GetRecordsResult getRecordsResult0;
|
||||
private static GetRecordsResult getRecordsResult1;
|
||||
private static StreamDescription streamDescription0;
|
||||
private static StreamDescription streamDescription1;
|
||||
private static Shard shard0;
|
||||
private static Shard shard1;
|
||||
private static KinesisRecordSupplier recordSupplier;
|
||||
|
@ -162,14 +159,12 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
|
|||
public void setupTest()
|
||||
{
|
||||
kinesis = createMock(AmazonKinesisClient.class);
|
||||
describeStreamResult0 = createMock(DescribeStreamResult.class);
|
||||
describeStreamResult1 = createMock(DescribeStreamResult.class);
|
||||
listShardsResult0 = createMock(ListShardsResult.class);
|
||||
listShardsResult1 = createMock(ListShardsResult.class);
|
||||
getShardIteratorResult0 = createMock(GetShardIteratorResult.class);
|
||||
getShardIteratorResult1 = createMock(GetShardIteratorResult.class);
|
||||
getRecordsResult0 = createMock(GetRecordsResult.class);
|
||||
getRecordsResult1 = createMock(GetRecordsResult.class);
|
||||
streamDescription0 = createMock(StreamDescription.class);
|
||||
streamDescription1 = createMock(StreamDescription.class);
|
||||
shard0 = createMock(Shard.class);
|
||||
shard1 = createMock(Shard.class);
|
||||
recordsPerFetch = 1;
|
||||
|
@ -187,19 +182,17 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
|
|||
@Test
|
||||
public void testSupplierSetup()
|
||||
{
|
||||
final Capture<DescribeStreamRequest> capturedRequest = Capture.newInstance();
|
||||
final Capture<ListShardsRequest> capturedRequest0 = Capture.newInstance();
|
||||
final Capture<ListShardsRequest> capturedRequest1 = Capture.newInstance();
|
||||
|
||||
EasyMock.expect(kinesis.describeStream(EasyMock.capture(capturedRequest))).andReturn(describeStreamResult0).once();
|
||||
EasyMock.expect(describeStreamResult0.getStreamDescription()).andReturn(streamDescription0).once();
|
||||
EasyMock.expect(streamDescription0.getShards()).andReturn(ImmutableList.of(shard0)).once();
|
||||
EasyMock.expect(streamDescription0.isHasMoreShards()).andReturn(true).once();
|
||||
EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).times(2);
|
||||
EasyMock.expect(kinesis.describeStream(EasyMock.anyObject(DescribeStreamRequest.class)))
|
||||
.andReturn(describeStreamResult1)
|
||||
.once();
|
||||
EasyMock.expect(describeStreamResult1.getStreamDescription()).andReturn(streamDescription1).once();
|
||||
EasyMock.expect(streamDescription1.getShards()).andReturn(ImmutableList.of(shard1)).once();
|
||||
EasyMock.expect(streamDescription1.isHasMoreShards()).andReturn(false).once();
|
||||
EasyMock.expect(kinesis.listShards(EasyMock.capture(capturedRequest0))).andReturn(listShardsResult0).once();
|
||||
EasyMock.expect(listShardsResult0.getShards()).andReturn(ImmutableList.of(shard0)).once();
|
||||
String nextToken = "nextToken";
|
||||
EasyMock.expect(listShardsResult0.getNextToken()).andReturn(nextToken).once();
|
||||
EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).once();
|
||||
EasyMock.expect(kinesis.listShards(EasyMock.capture(capturedRequest1))).andReturn(listShardsResult1).once();
|
||||
EasyMock.expect(listShardsResult1.getShards()).andReturn(ImmutableList.of(shard1)).once();
|
||||
EasyMock.expect(listShardsResult1.getNextToken()).andReturn(null).once();
|
||||
EasyMock.expect(shard1.getShardId()).andReturn(SHARD_ID1).once();
|
||||
|
||||
replayAll();
|
||||
|
@ -236,10 +229,13 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
|
|||
|
||||
verifyAll();
|
||||
|
||||
final DescribeStreamRequest expectedRequest = new DescribeStreamRequest();
|
||||
expectedRequest.setStreamName(STREAM);
|
||||
expectedRequest.setExclusiveStartShardId("0");
|
||||
Assert.assertEquals(expectedRequest, capturedRequest.getValue());
|
||||
final ListShardsRequest expectedRequest0 = new ListShardsRequest();
|
||||
expectedRequest0.setStreamName(STREAM);
|
||||
Assert.assertEquals(expectedRequest0, capturedRequest0.getValue());
|
||||
|
||||
final ListShardsRequest expectedRequest1 = new ListShardsRequest();
|
||||
expectedRequest1.setNextToken(nextToken);
|
||||
Assert.assertEquals(expectedRequest1, capturedRequest1.getValue());
|
||||
}
|
||||
|
||||
private static GetRecordsRequest generateGetRecordsReq(String shardIterator, int limit)
|
||||
|
|
|
@ -30,21 +30,27 @@ import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
|
|||
import com.amazonaws.services.kinesis.model.CreateStreamResult;
|
||||
import com.amazonaws.services.kinesis.model.DeleteStreamResult;
|
||||
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
|
||||
import com.amazonaws.services.kinesis.model.ListShardsRequest;
|
||||
import com.amazonaws.services.kinesis.model.ListShardsResult;
|
||||
import com.amazonaws.services.kinesis.model.ScalingType;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
import com.amazonaws.services.kinesis.model.StreamDescription;
|
||||
import com.amazonaws.services.kinesis.model.StreamStatus;
|
||||
import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
|
||||
import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
|
||||
import com.amazonaws.util.AwsHostNameUtils;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
public class KinesisAdminClient implements StreamAdminClient
|
||||
{
|
||||
private AmazonKinesis amazonKinesis;
|
||||
private final AmazonKinesis amazonKinesis;
|
||||
|
||||
public KinesisAdminClient(String endpoint) throws Exception
|
||||
{
|
||||
|
@ -107,6 +113,9 @@ public class KinesisAdminClient implements StreamAdminClient
|
|||
public void updatePartitionCount(String streamName, int newShardCount, boolean blocksUntilStarted)
|
||||
{
|
||||
int originalShardCount = getStreamPartitionCount(streamName);
|
||||
if (originalShardCount == newShardCount) {
|
||||
return;
|
||||
}
|
||||
UpdateShardCountRequest updateShardCountRequest = new UpdateShardCountRequest();
|
||||
updateShardCountRequest.setStreamName(streamName);
|
||||
updateShardCountRequest.setTargetShardCount(newShardCount);
|
||||
|
@ -119,13 +128,13 @@ public class KinesisAdminClient implements StreamAdminClient
|
|||
// Wait until the resharding started (or finished)
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> {
|
||||
StreamDescription streamDescription = getStreamDescription(streamName);
|
||||
int updatedShardCount = getStreamShardCount(streamDescription);
|
||||
return verifyStreamStatus(streamDescription, StreamStatus.UPDATING) ||
|
||||
(verifyStreamStatus(streamDescription, StreamStatus.ACTIVE) && updatedShardCount > originalShardCount);
|
||||
},
|
||||
true,
|
||||
30,
|
||||
int updatedShardCount = getStreamPartitionCount(streamName);
|
||||
// Stream should be in active or updating state AND
|
||||
// the number of shards must have increased irrespective of the value of newShardCount
|
||||
return verifyStreamStatus(streamName, StreamStatus.ACTIVE, StreamStatus.UPDATING)
|
||||
&& updatedShardCount > originalShardCount;
|
||||
}, true,
|
||||
300, // higher value to avoid exceeding kinesis TPS limit
|
||||
30,
|
||||
"Kinesis stream resharding to start (or finished)"
|
||||
);
|
||||
|
@ -135,15 +144,13 @@ public class KinesisAdminClient implements StreamAdminClient
|
|||
@Override
|
||||
public boolean isStreamActive(String streamName)
|
||||
{
|
||||
StreamDescription streamDescription = getStreamDescription(streamName);
|
||||
return verifyStreamStatus(streamDescription, StreamStatus.ACTIVE);
|
||||
return verifyStreamStatus(streamName, StreamStatus.ACTIVE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStreamPartitionCount(String streamName)
|
||||
{
|
||||
StreamDescription streamDescription = getStreamDescription(streamName);
|
||||
return getStreamShardCount(streamDescription);
|
||||
return listShards(streamName).size();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -156,15 +163,31 @@ public class KinesisAdminClient implements StreamAdminClient
|
|||
return actualShardCount == oldShardCount + newShardCount;
|
||||
}
|
||||
|
||||
|
||||
private boolean verifyStreamStatus(StreamDescription streamDescription, StreamStatus streamStatusToCheck)
|
||||
private Set<Shard> listShards(String streamName)
|
||||
{
|
||||
return streamStatusToCheck.toString().equals(streamDescription.getStreamStatus());
|
||||
ListShardsRequest listShardsRequest = new ListShardsRequest().withStreamName(streamName);
|
||||
ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
|
||||
while (true) {
|
||||
ListShardsResult listShardsResult = amazonKinesis.listShards(listShardsRequest);
|
||||
shards.addAll(listShardsResult.getShards());
|
||||
String nextToken = listShardsResult.getNextToken();
|
||||
if (nextToken == null) {
|
||||
return shards.build();
|
||||
}
|
||||
listShardsRequest = new ListShardsRequest().withNextToken(listShardsResult.getNextToken());
|
||||
}
|
||||
}
|
||||
|
||||
private int getStreamShardCount(StreamDescription streamDescription)
|
||||
private boolean verifyStreamStatus(String streamName, StreamStatus... streamStatuses)
|
||||
{
|
||||
return streamDescription.getShards().size();
|
||||
return Arrays.stream(streamStatuses)
|
||||
.map(StreamStatus::toString)
|
||||
.anyMatch(getStreamStatus(streamName)::equals);
|
||||
}
|
||||
|
||||
private String getStreamStatus(String streamName)
|
||||
{
|
||||
return getStreamDescription(streamName).getStreamStatus();
|
||||
}
|
||||
|
||||
private StreamDescription getStreamDescription(String streamName)
|
||||
|
|
Loading…
Reference in New Issue