retry 500 and 503 errors against kinesis (#10059)

* retry 500 and 503 errors against kinesis

* add test that exercises retry logic

* more branch coverage

* retry 500 and 503 on getRecords request when fetching sequence numberu

Co-authored-by: Harshpreet Singh <hrshpr@twitch.tv>
This commit is contained in:
Harshpreet Singh 2020-06-23 15:49:34 -07:00 committed by GitHub
parent 0470fcc9da
commit d96aa1586a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 1 deletions

View File

@ -110,7 +110,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
{
final boolean isIOException = ex.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
return isIOException || isTimeout;
final boolean isInternalError = ex.getStatusCode() == 500 || ex.getStatusCode() == 503;
return isIOException || isTimeout || isInternalError;
}
/**
@ -809,6 +810,10 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
);
return true;
}
if (throwable instanceof AmazonServiceException) {
AmazonServiceException ase = (AmazonServiceException) throwable;
return isServiceExceptionRecoverable(ase);
}
return false;
},
GET_SEQUENCE_NUMBER_RETRY_COUNT

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.kinesis;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
@ -316,6 +317,94 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag());
}
@Test
public void testPollWithKinesisInternalFailure() throws InterruptedException
{
recordsPerFetch = 100;
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
EasyMock.anyString(),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult0).anyTimes();
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID1),
EasyMock.anyString(),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult1).anyTimes();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch)))
.andReturn(getRecordsResult0)
.anyTimes();
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch)))
.andReturn(getRecordsResult1)
.anyTimes();
AmazonServiceException getException = new AmazonServiceException("InternalFailure");
getException.setErrorCode("InternalFailure");
getException.setStatusCode(500);
getException.setServiceName("AmazonKinesis");
EasyMock.expect(getRecordsResult0.getRecords()).andThrow(getException).once();
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
AmazonServiceException getException2 = new AmazonServiceException("InternalFailure");
getException2.setErrorCode("InternalFailure");
getException2.setStatusCode(503);
getException2.setServiceName("AmazonKinesis");
EasyMock.expect(getRecordsResult1.getRecords()).andThrow(getException2).once();
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
replayAll();
Set<StreamPartition<String>> partitions = ImmutableSet.of(
StreamPartition.of(STREAM, SHARD_ID0),
StreamPartition.of(STREAM, SHARD_ID1)
);
recordSupplier = new KinesisRecordSupplier(
kinesis,
recordsPerFetch,
0,
2,
false,
100,
5000,
5000,
60000,
100,
true
);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
recordSupplier.start();
while (recordSupplier.bufferSize() < 14) {
Thread.sleep(100);
}
List<OrderedPartitionableRecord<String, String>> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS));
verifyAll();
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag());
}
@Test
public void testSeek()
throws InterruptedException