Add feature flag for Kinesis listShards API usage (#12383)

listShards API was used to get all the shards for kinesis ingestion to improve its resiliency as part of #12161.

However, this may require additional permissions in the IAM policy where the stream is present. (Please refer to: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html).

A dynamic configuration useListShards has been added to KinesisSupervisorTuningConfig to control the usage of this API and prevent issues upon upgrade. It can be safely turned on (and is recommended when using kinesis ingestion) by setting this configuration to true.
This commit is contained in:
AmatyaAvadhanula 2022-04-04 14:58:10 +05:30 committed by GitHub
parent a1ea658115
commit c5531be553
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 192 additions and 37 deletions

View File

@ -311,6 +311,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 |
| `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600. | no, (default == PT2M) |
| `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value (`PT5S`), the supervisor ignores the value and uses the minimum value instead. | no (default == PT30S, min == PT5S) |
| `useListShards` | Boolean | Indicates if `listShards` API of AWS Kinesis SDK can be used to prevent `LimitExceededException` during ingestion. Please note that the necessary `IAM` permissions must be set for this to work.| no (default == false) |
#### IndexSpec

View File

@ -38,6 +38,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
{
private static final String TYPE = "index_kinesis";
private final boolean useListShards;
private final AWSCredentialsConfig awsCredentialsConfig;
@JsonCreator
@ -48,6 +49,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
@JsonProperty("tuningConfig") KinesisIndexTaskTuningConfig tuningConfig,
@JsonProperty("ioConfig") KinesisIndexTaskIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("useListShards") boolean useListShards,
@JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) AWSCredentialsConfig awsCredentialsConfig
)
{
@ -60,6 +62,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
context,
getFormattedGroupId(dataSchema.getDataSource(), TYPE)
);
this.useListShards = useListShards;
this.awsCredentialsConfig = awsCredentialsConfig;
}
@ -105,7 +108,8 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll(),
false
false,
useListShards
);
}

View File

@ -26,6 +26,7 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
@ -37,11 +38,13 @@ import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.util.AwsHostNameUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.druid.common.aws.AWSClientUtil;
@ -402,6 +405,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
private final int fetchThreads;
private final int recordBufferSize;
private final boolean useEarliestSequenceNumber;
private final boolean useListShards;
private ScheduledExecutorService scheduledExec;
@ -424,7 +428,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
int recordBufferFullWait,
int fetchSequenceNumberTimeout,
int maxRecordsPerPoll,
boolean useEarliestSequenceNumber
boolean useEarliestSequenceNumber,
boolean useListShards
)
{
Preconditions.checkNotNull(amazonKinesis);
@ -439,6 +444,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
this.fetchThreads = fetchThreads;
this.recordBufferSize = recordBufferSize;
this.useEarliestSequenceNumber = useEarliestSequenceNumber;
this.useListShards = useListShards;
this.backgroundFetchEnabled = fetchThreads > 0;
// the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache.
@ -660,7 +666,41 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
public Set<Shard> getShards(String stream)
{
if (useListShards) {
return getShardsUsingListShards(stream);
}
return getShardsUsingDescribeStream(stream);
}
/**
* Default method to avoid incompatibility when user doesn't have sufficient IAM permissions on AWS
* Not advised. getShardsUsingListShards is recommended instead if sufficient permissions are present.
*
* @param stream name of stream
* @return Immutable set of shards
*/
private Set<Shard> getShardsUsingDescribeStream(String stream)
{
ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
DescribeStreamRequest describeRequest = new DescribeStreamRequest();
describeRequest.setStreamName(stream);
while (describeRequest != null) {
StreamDescription description = kinesis.describeStream(describeRequest).getStreamDescription();
List<Shard> shardResult = description.getShards();
shards.addAll(shardResult);
if (description.isHasMoreShards()) {
describeRequest.setExclusiveStartShardId(Iterables.getLast(shardResult).getShardId());
} else {
describeRequest = null;
}
}
return shards.build();
}
/**
* If the user has the IAM policy for listShards, and useListShards is true:
* Use the API listShards which is the recommended way instead of describeStream
* listShards can return 1000 shards per call and has a limit of 100TPS
* This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
@ -668,7 +708,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
* @param stream name of stream
* @return Immutable set of shards
*/
public Set<Shard> getShards(String stream)
private Set<Shard> getShardsUsingListShards(String stream)
{
ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
ListShardsRequest request = new ListShardsRequest().withStreamName(stream);

View File

@ -72,7 +72,8 @@ public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber()
ioConfig.isUseEarliestSequenceNumber(),
tuningConfig.isUseListShards()
);
}
}

View File

@ -184,6 +184,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
(KinesisIndexTaskTuningConfig) taskTuningConfig,
(KinesisIndexTaskIOConfig) taskIoConfig,
context,
spec.getSpec().getTuningConfig().isUseListShards(),
awsCredentialsConfig
));
}
@ -213,7 +214,8 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
taskTuningConfig.getRecordBufferFullWait(),
taskTuningConfig.getFetchSequenceNumberTimeout(),
taskTuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber()
ioConfig.isUseEarliestSequenceNumber(),
spec.getSpec().getTuningConfig().isUseListShards()
);
}

View File

@ -41,6 +41,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration shutdownTimeout;
private final Duration repartitionTransitionDuration;
private final Duration offsetFetchPeriod;
private final boolean useListShards;
private final boolean skipIgnorableShards;
public static KinesisSupervisorTuningConfig defaultConfig()
@ -79,6 +80,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
null,
null,
null,
null,
null
);
}
@ -117,6 +119,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("useListShards") Boolean useListShards,
@JsonProperty("skipIgnorableShards") Boolean skipIgnorableShards
)
{
@ -165,6 +168,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
offsetFetchPeriod,
DEFAULT_OFFSET_FETCH_PERIOD
);
this.useListShards = (useListShards != null ? useListShards : false);
this.skipIgnorableShards = (skipIgnorableShards != null ? skipIgnorableShards : false);
}
@ -216,6 +220,12 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
return offsetFetchPeriod;
}
@JsonProperty
public boolean isUseListShards()
{
return useListShards;
}
@JsonProperty
public boolean isSkipIgnorableShards()
{
@ -256,6 +266,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
", maxRecordsPerPoll=" + getMaxRecordsPerPoll() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
", useListShards=" + isUseListShards() +
", skipIgnorableShards=" + isSkipIgnorableShards() +
'}';
}

View File

@ -117,6 +117,7 @@ public class KinesisIndexTaskSerdeTest
TUNING_CONFIG,
IO_CONFIG,
null,
false,
null
);
ObjectMapper objectMapper = createObjectMapper();

View File

@ -3188,6 +3188,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
tuningConfig,
ioConfig,
context,
false,
awsCredentialsConfig
);
}

View File

@ -318,6 +318,7 @@ public class KinesisIndexTaskTuningConfigTest
null,
null,
null,
null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();

View File

@ -23,6 +23,8 @@ import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
@ -31,6 +33,7 @@ import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -150,6 +153,10 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
private static ListShardsResult listShardsResult1;
private static GetShardIteratorResult getShardIteratorResult0;
private static GetShardIteratorResult getShardIteratorResult1;
private static DescribeStreamResult describeStreamResult0;
private static DescribeStreamResult describeStreamResult1;
private static StreamDescription streamDescription0;
private static StreamDescription streamDescription1;
private static GetRecordsResult getRecordsResult0;
private static GetRecordsResult getRecordsResult1;
private static Shard shard0;
@ -162,6 +169,10 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
kinesis = createMock(AmazonKinesisClient.class);
listShardsResult0 = createMock(ListShardsResult.class);
listShardsResult1 = createMock(ListShardsResult.class);
describeStreamResult0 = createMock(DescribeStreamResult.class);
describeStreamResult1 = createMock(DescribeStreamResult.class);
streamDescription0 = createMock(StreamDescription.class);
streamDescription1 = createMock(StreamDescription.class);
getShardIteratorResult0 = createMock(GetShardIteratorResult.class);
getShardIteratorResult1 = createMock(GetShardIteratorResult.class);
getRecordsResult0 = createMock(GetRecordsResult.class);
@ -181,7 +192,69 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
}
@Test
public void testSupplierSetup()
public void testSupplierSetup_withoutListShards()
{
final Capture<DescribeStreamRequest> capturedRequest0 = Capture.newInstance();
final Capture<DescribeStreamRequest> capturedRequest1 = Capture.newInstance();
EasyMock.expect(kinesis.describeStream(EasyMock.capture(capturedRequest0))).andReturn(describeStreamResult0).once();
EasyMock.expect(describeStreamResult0.getStreamDescription()).andReturn(streamDescription0).once();
EasyMock.expect(streamDescription0.getShards()).andReturn(ImmutableList.of(shard0, shard1)).once();
EasyMock.expect(shard0.getShardId()).andReturn(SHARD_ID0).once();
EasyMock.expect(shard1.getShardId()).andReturn(SHARD_ID1).times(2);
EasyMock.expect(streamDescription0.isHasMoreShards()).andReturn(true).once();
EasyMock.expect(kinesis.describeStream(EasyMock.capture(capturedRequest1))).andReturn(describeStreamResult1).once();
EasyMock.expect(describeStreamResult1.getStreamDescription()).andReturn(streamDescription1).once();
EasyMock.expect(streamDescription1.getShards()).andReturn(ImmutableList.of()).once();
EasyMock.expect(streamDescription1.isHasMoreShards()).andReturn(false).once();
replayAll();
Set<StreamPartition<String>> partitions = ImmutableSet.of(
StreamPartition.of(STREAM, SHARD_ID0),
StreamPartition.of(STREAM, SHARD_ID1)
);
recordSupplier = new KinesisRecordSupplier(
kinesis,
recordsPerFetch,
0,
2,
false,
100,
5000,
5000,
60000,
5,
true,
false
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
recordSupplier.assign(partitions);
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(ImmutableSet.of(SHARD_ID0, SHARD_ID1), recordSupplier.getPartitionIds(STREAM));
// calling poll would start background fetch if seek was called, but will instead be skipped and the results
// empty
Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100));
verifyAll();
// Since the same request is modified, every captured argument will be the same at the end
Assert.assertEquals(capturedRequest0.getValues(), capturedRequest1.getValues());
final DescribeStreamRequest expectedRequest = new DescribeStreamRequest();
expectedRequest.setStreamName(STREAM);
expectedRequest.setExclusiveStartShardId(SHARD_ID1);
Assert.assertEquals(expectedRequest, capturedRequest1.getValue());
}
@Test
public void testSupplierSetup_withListShards()
{
final Capture<ListShardsRequest> capturedRequest0 = Capture.newInstance();
final Capture<ListShardsRequest> capturedRequest1 = Capture.newInstance();
@ -214,6 +287,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
5,
true,
true
);
@ -308,7 +382,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
100,
true
true,
false
);
recordSupplier.assign(partitions);
@ -396,7 +471,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
100,
true
true,
false
);
recordSupplier.assign(partitions);
@ -457,7 +533,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
100,
true
true,
false
);
recordSupplier.assign(partitions);
@ -536,7 +613,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
100,
true
true,
false
);
recordSupplier.assign(partitions);
@ -604,7 +682,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
100,
true
true,
false
);
recordSupplier.assign(partitions);
@ -639,7 +718,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
5,
true
true,
false
);
recordSupplier.assign(partitions);
@ -702,7 +782,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
1,
true
true,
false
);
recordSupplier.assign(partitions);
@ -795,7 +876,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
100,
true
true,
false
);
recordSupplier.assign(partitions);
@ -869,7 +951,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
1000,
100,
true
true,
false
);
Assert.assertEquals("0", recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
@ -907,7 +990,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
1000,
100,
true
true,
false
);
return recordSupplier;
}
@ -990,7 +1074,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
100,
true
true,
false
);
recordSupplier.assign(partitions);
@ -1039,7 +1124,8 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
5000,
60000,
5,
true
true,
false
);
Record record = new Record();

View File

@ -208,6 +208,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@ -3946,6 +3947,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
null
);
@ -5050,6 +5052,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
null
);
@ -5465,6 +5468,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
false
),
Collections.emptyMap(),
false,
null
);
}

View File

@ -29,9 +29,8 @@ import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
import com.amazonaws.services.kinesis.model.CreateStreamResult;
import com.amazonaws.services.kinesis.model.DeleteStreamResult;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ScalingType;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.StreamDescription;
@ -39,14 +38,17 @@ import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
import com.amazonaws.util.AwsHostNameUtils;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.ISE;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
public class KinesisAdminClient implements StreamAdminClient
{
@ -152,7 +154,23 @@ public class KinesisAdminClient implements StreamAdminClient
@Override
public int getStreamPartitionCount(String streamName)
{
return listShards(streamName).size();
Set<String> shardIds = new HashSet<>();
DescribeStreamRequest request = new DescribeStreamRequest();
request.setStreamName(streamName);
while (request != null) {
StreamDescription description = amazonKinesis.describeStream(request).getStreamDescription();
List<String> shardIdResult = description.getShards()
.stream()
.map(Shard::getShardId)
.collect(Collectors.toList());
shardIds.addAll(shardIdResult);
if (description.isHasMoreShards()) {
request.setExclusiveStartShardId(Iterables.getLast(shardIdResult));
} else {
request = null;
}
}
return shardIds.size();
}
@Override
@ -165,21 +183,6 @@ public class KinesisAdminClient implements StreamAdminClient
return actualShardCount == oldShardCount + newShardCount;
}
private Set<Shard> listShards(String streamName)
{
ListShardsRequest listShardsRequest = new ListShardsRequest().withStreamName(streamName);
ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
while (true) {
ListShardsResult listShardsResult = amazonKinesis.listShards(listShardsRequest);
shards.addAll(listShardsResult.getShards());
String nextToken = listShardsResult.getNextToken();
if (nextToken == null) {
return shards.build();
}
listShardsRequest = new ListShardsRequest().withNextToken(nextToken);
}
}
private boolean verifyStreamStatus(String streamName, StreamStatus... streamStatuses)
{
return Arrays.stream(streamStatuses)