diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 65ae1f62ba0..23e5bec573f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -110,7 +110,8 @@ public class KinesisRecordSupplier implements RecordSupplier { final boolean isIOException = ex.getCause() instanceof IOException; final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode()); - return isIOException || isTimeout; + final boolean isInternalError = ex.getStatusCode() == 500 || ex.getStatusCode() == 503; + return isIOException || isTimeout || isInternalError; } /** @@ -809,6 +810,10 @@ public class KinesisRecordSupplier implements RecordSupplier ); return true; } + if (throwable instanceof AmazonServiceException) { + AmazonServiceException ase = (AmazonServiceException) throwable; + return isServiceExceptionRecoverable(ase); + } return false; }, GET_SEQUENCE_NUMBER_RETRY_COUNT diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 6b2f32f6cdd..881750de2a7 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.kinesis; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; @@ -316,6 +317,94 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } + @Test + public void testPollWithKinesisInternalFailure() throws InterruptedException + { + recordsPerFetch = 100; + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult0).anyTimes(); + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID1), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult1).anyTimes(); + + EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); + EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + .andReturn(getRecordsResult0) + .anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) + .andReturn(getRecordsResult1) + .anyTimes(); + AmazonServiceException getException = new AmazonServiceException("InternalFailure"); + getException.setErrorCode("InternalFailure"); + getException.setStatusCode(500); + getException.setServiceName("AmazonKinesis"); + EasyMock.expect(getRecordsResult0.getRecords()).andThrow(getException).once(); + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); + AmazonServiceException getException2 = new AmazonServiceException("InternalFailure"); + getException2.setErrorCode("InternalFailure"); + getException2.setStatusCode(503); + getException2.setServiceName("AmazonKinesis"); + EasyMock.expect(getRecordsResult1.getRecords()).andThrow(getException2).once(); + EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); + EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); + + replayAll(); + + Set> partitions = ImmutableSet.of( + StreamPartition.of(STREAM, SHARD_ID0), + StreamPartition.of(STREAM, SHARD_ID1) + ); + + + recordSupplier = new KinesisRecordSupplier( + kinesis, + recordsPerFetch, + 0, + 2, + false, + 100, + 5000, + 5000, + 60000, + 100, + true + ); + + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + recordSupplier.start(); + + while (recordSupplier.bufferSize() < 14) { + Thread.sleep(100); + } + + List> polledRecords = cleanRecords(recordSupplier.poll( + POLL_TIMEOUT_MILLIS)); + + verifyAll(); + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); + } + @Test public void testSeek() throws InterruptedException