From 96d26e53380aea78337ff098f239df29b7076382 Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Fri, 5 Feb 2021 16:19:58 +0530 Subject: [PATCH] Fix kinesis ingestion bugs (#10761) * add offsetFetchPeriod to kinesis ingestion doc * Remove jackson dependencies from extensions * Use fixed delay for lag collection * Metrics reset after finishing processing * comments * Broaden the list of exceptions to retry for * Unit tests * Add more tests * Refactoring * re-order metrics * Doc suggestions Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com> * Add tests Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com> --- .../druid/common/aws/AWSClientUtil.java | 59 +++++++++ .../druid/common/aws/AWSClientUtilTest.java | 91 ++++++++++++++ .../extensions-core/kafka-ingestion.md | 2 +- .../extensions-core/kinesis-ingestion.md | 3 +- .../indexing/kafka/KafkaIndexTaskTest.java | 3 + .../kinesis/KinesisRecordSupplier.java | 31 ++--- .../kinesis/KinesisRecordSupplierTest.java | 112 +++++++++++++++++- .../druid/storage/s3/S3DataSegmentPuller.java | 26 +--- .../org/apache/druid/storage/s3/S3Utils.java | 16 +-- .../SeekableStreamIndexTaskRunner.java | 16 ++- .../supervisor/SeekableStreamSupervisor.java | 4 +- .../realtime/FireDepartmentMetrics.java | 56 ++++----- .../realtime/FireDepartmentMetricsTest.java | 75 ++++++++++++ .../apache/druid/cli/PullDependencies.java | 3 + 14 files changed, 410 insertions(+), 87 deletions(-) create mode 100644 cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientUtil.java create mode 100644 cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientUtilTest.java create mode 100644 server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java diff --git a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientUtil.java b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientUtil.java new file mode 100644 index 00000000000..c587ac6580a --- /dev/null +++ b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientUtil.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.aws; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.retry.RetryUtils; + +import java.io.IOException; + +public class AWSClientUtil +{ + /** + * Checks whether an exception can be retried or not. Implementation is copied + * from {@link com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition} except deprecated methods + * have been replaced with their recent versions. + */ + public static boolean isClientExceptionRecoverable(AmazonClientException exception) + { + // Always retry on client exceptions caused by IOException + if (exception.getCause() instanceof IOException) { + return true; + } + + // A special check carried forwarded from previous implementation. + if (exception instanceof AmazonServiceException + && "RequestTimeout".equals(((AmazonServiceException) exception).getErrorCode())) { + return true; + } + + // This will retry for 5xx errors. + if (RetryUtils.isRetryableServiceException(exception)) { + return true; + } + + if (RetryUtils.isThrottlingException(exception)) { + return true; + } + + return RetryUtils.isClockSkewError(exception); + } +} diff --git a/cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientUtilTest.java b/cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientUtilTest.java new file mode 100644 index 00000000000..bb1c4cdca79 --- /dev/null +++ b/cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientUtilTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.aws; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class AWSClientUtilTest +{ + @Test + public void testRecoverableException_IOException() + { + Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(new AmazonClientException(new IOException()))); + } + + @Test + public void testRecoverableException_RequestTimeout() + { + AmazonServiceException ex = new AmazonServiceException(null); + ex.setErrorCode("RequestTimeout"); + Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex)); + } + + @Test + public void testRecoverableException_500() + { + AmazonServiceException ex = new AmazonServiceException(null); + ex.setStatusCode(500); + Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex)); + } + + @Test + public void testRecoverableException_502() + { + AmazonServiceException ex = new AmazonServiceException(null); + ex.setStatusCode(502); + Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex)); + } + + @Test + public void testRecoverableException_503() + { + AmazonServiceException ex = new AmazonServiceException(null); + ex.setStatusCode(503); + Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex)); + } + + @Test + public void testRecoverableException_ProvisionedThroughputExceededException() + { + AmazonServiceException ex = new AmazonServiceException(null); + ex.setErrorCode("ProvisionedThroughputExceededException"); + Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex)); + } + + @Test + public void testRecoverableException_ClockSkewedError() + { + AmazonServiceException ex = new AmazonServiceException(null); + ex.setErrorCode("RequestExpired"); + Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex)); + } + + @Test + public void testNonRecoverableException_RuntimeException() + { + AmazonClientException ex = new AmazonClientException(new RuntimeException()); + Assert.assertFalse(AWSClientUtil.isClientExceptionRecoverable(ex)); + } +} diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 0917d0be492..7b94a99c253 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -197,7 +197,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon | `chatRetries` | Integer | The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive. | no (default == 8) | | `httpTimeout` | ISO8601 Period | How long to wait for a HTTP response from an indexing task. | no (default == PT10S) | | `shutdownTimeout` | ISO8601 Period | How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting. | no (default == PT80S) | -| `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag. | no (default == PT30S, min == PT5S) | +| `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value (`PT5S`), the supervisor ignores the value and uses the minimum value instead. | no (default == PT30S, min == PT5S) | | `segmentWriteOutMediumFactory` | Object | Segment write-out medium to use when creating segments. See below for more information. | no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used) | | `intermediateHandoffPeriod` | ISO8601 Period | How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier. | no (default == P2147483647D) | | `logParseExceptions` | Boolean | If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred. | no, default == false | diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 620f1f9a5d1..03b5416ba1e 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -193,7 +193,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon | `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 | | `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 | | `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600. | no, (default == PT2M) | -| `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. | no (default == PT30S, min == PT5S) | +| `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value (`PT5S`), the supervisor ignores the value and uses the minimum value instead. | no (default == PT30S, min == PT5S) | #### IndexSpec @@ -471,4 +471,3 @@ with an assignment of closed shards that have been fully read and to ensure a ba This window with early task shutdowns and possible task failures will conclude when: - All closed shards have been fully read and the Kinesis ingestion tasks have published the data from those shards, committing the "closed" state to metadata storage - Any remaining tasks that had inactive shards in the assignment have been shutdown (these tasks would have been created before the closed shards were completely drained) - diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 18faf2e1305..caf6ea8fd6f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -406,6 +406,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertNotEquals(-1, task.getRunner().getFireDepartmentMetrics().processingCompletionTime()); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -453,6 +454,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertNotEquals(-1, task.getRunner().getFireDepartmentMetrics().processingCompletionTime()); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -504,6 +506,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertNotEquals(-1, task.getRunner().getFireDepartmentMetrics().processingCompletionTime()); // Check published metadata and segments in deep storage assertEqualsExceptVersion( diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index e90b464b45b..ff3b75170ea 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -19,7 +19,7 @@ package org.apache.druid.indexing.kinesis; -import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; @@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import org.apache.druid.common.aws.AWSClientUtil; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsUtils; import org.apache.druid.data.input.impl.ByteEntity; @@ -62,7 +63,6 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.IOException; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; @@ -107,14 +107,6 @@ public class KinesisRecordSupplier implements RecordSupplier pr.currentFetch) + .anyMatch(fetch -> (fetch != null && !fetch.isDone())); + } + /** * Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call * {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background @@ -803,9 +804,9 @@ public class KinesisRecordSupplier implements RecordSupplier ALL_RECORDS = ImmutableList.builder() + private static final List> ALL_RECORDS = ImmutableList.>builder() .addAll(SHARD0_RECORDS.stream() .map(x -> new OrderedPartitionableRecord<>( STREAM, @@ -171,7 +174,9 @@ public class KinesisRecordSupplierTest extends EasyMockSupport @After public void tearDownTest() { - recordSupplier.close(); + if (null != recordSupplier) { + recordSupplier.close(); + } recordSupplier = null; } @@ -411,6 +416,68 @@ public class KinesisRecordSupplierTest extends EasyMockSupport Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } + @Test + public void testPollWithKinesisNonRetryableFailure() throws InterruptedException + { + recordsPerFetch = 100; + + EasyMock.expect(kinesis.getShardIterator( + EasyMock.anyObject(), + EasyMock.eq(SHARD_ID0), + EasyMock.anyString(), + EasyMock.anyString() + )).andReturn( + getShardIteratorResult0).anyTimes(); + + AmazonServiceException getException = new AmazonServiceException("BadRequest"); + getException.setErrorCode("BadRequest"); + getException.setStatusCode(400); + getException.setServiceName("AmazonKinesis"); + EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + .andThrow(getException) + .once(); + + replayAll(); + + Set> partitions = ImmutableSet.of( + StreamPartition.of(STREAM, SHARD_ID0) + ); + + + recordSupplier = new KinesisRecordSupplier( + kinesis, + recordsPerFetch, + 0, + 1, + false, + 100, + 5000, + 5000, + 60000, + 100, + true + ); + + recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); + recordSupplier.start(); + + int count = 0; + while (recordSupplier.isAnyFetchActive() && count++ < 10) { + Thread.sleep(100); + } + Assert.assertFalse(recordSupplier.isAnyFetchActive()); + + List> polledRecords = cleanRecords(recordSupplier.poll( + POLL_TIMEOUT_MILLIS)); + + verifyAll(); + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertEquals(0, polledRecords.size()); + } + @Test public void testSeek() throws InterruptedException @@ -766,6 +833,47 @@ public class KinesisRecordSupplierTest extends EasyMockSupport verifyAll(); } + @Test + public void getLatestSequenceNumberWhenKinesisRetryableException() + { + EasyMock.expect(kinesis.getShardIterator( + EasyMock.eq(STREAM), + EasyMock.eq(SHARD_ID0), + EasyMock.eq(ShardIteratorType.LATEST.toString()) + )).andReturn( + getShardIteratorResult0).once(); + + EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).once(); + + AmazonClientException ex = new AmazonClientException(new IOException()); + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 1000))) + .andThrow(ex) + .andReturn(getRecordsResult0) + .once(); + + EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); + EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).once(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(0L).once(); + + replayAll(); + + recordSupplier = new KinesisRecordSupplier( + kinesis, + recordsPerFetch, + 0, + 2, + true, + 100, + 5000, + 5000, + 1000, + 100, + true + ); + + Assert.assertEquals("0", recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0))); + } + private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper() { EasyMock.expect(kinesis.getShardIterator( diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java index 9c413e7ec4d..73df48a17d7 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java @@ -19,6 +19,7 @@ package org.apache.druid.storage.s3; +import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.S3Object; @@ -28,6 +29,7 @@ import com.google.common.base.Strings; import com.google.common.io.ByteSource; import com.google.common.io.Files; import com.google.inject.Inject; +import org.apache.druid.common.aws.AWSClientUtil; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; @@ -243,25 +245,7 @@ public class S3DataSegmentPuller implements URIDataPuller @Override public Predicate shouldRetryPredicate() { - // Yay! smart retries! - return new Predicate() - { - @Override - public boolean apply(Throwable e) - { - if (e == null) { - return false; - } - if (e instanceof AmazonServiceException) { - return S3Utils.isServiceExceptionRecoverable((AmazonServiceException) e); - } - if (S3Utils.S3RETRY.apply(e)) { - return true; - } - // Look all the way down the cause chain, just in case something wraps it deep. - return apply(e.getCause()); - } - }; + return S3Utils.S3RETRY; } /** @@ -282,8 +266,8 @@ public class S3DataSegmentPuller implements URIDataPuller S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath()); return StringUtils.format("%d", objectSummary.getLastModified().getTime()); } - catch (AmazonServiceException e) { - if (S3Utils.isServiceExceptionRecoverable(e)) { + catch (AmazonClientException e) { + if (AWSClientUtil.isClientExceptionRecoverable(e)) { // The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable throw new IOE(e, "Could not fetch last modified timestamp from URI [%s]", uri); } else { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 97fde22b03f..75c4e126c10 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -19,7 +19,7 @@ package org.apache.druid.storage.s3; -import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CanonicalGrantee; @@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; +import org.apache.druid.common.aws.AWSClientUtil; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; @@ -56,15 +57,6 @@ public class S3Utils private static final Joiner JOINER = Joiner.on("/").skipNulls(); private static final Logger log = new Logger(S3Utils.class); - - static boolean isServiceExceptionRecoverable(AmazonServiceException ex) - { - final boolean isIOException = ex.getCause() instanceof IOException; - final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode()); - final boolean badStatusCode = ex.getStatusCode() == 400 || ex.getStatusCode() == 403 || ex.getStatusCode() == 404; - return !badStatusCode && (isIOException || isTimeout); - } - public static final Predicate S3RETRY = new Predicate() { @Override @@ -74,8 +66,8 @@ public class S3Utils return false; } else if (e instanceof IOException) { return true; - } else if (e instanceof AmazonServiceException) { - return isServiceExceptionRecoverable((AmazonServiceException) e); + } else if (e instanceof AmazonClientException) { + return AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) e); } else { return apply(e.getCause()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 5ca48523648..24fa360df9d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -214,6 +214,8 @@ public abstract class SeekableStreamIndexTaskRunner nextCheckpointTime) { sequenceToCheckpoint = getLastSequenceMetadata(); } @@ -1356,6 +1364,12 @@ public abstract class SeekableStreamIndexTaskRunner= (current - 10)); + } + + @Test + public void testProcessingOverAfterSnapshot() + { + metrics.reportMessageMaxTimestamp(10); + metrics.snapshot(); + metrics.markProcessingDone(20); + Assert.assertEquals(10, metrics.snapshot().messageGap()); + } + + @Test + public void testProcessingOverWithSystemTime() + { + metrics.reportMessageMaxTimestamp(10); + long current = System.currentTimeMillis(); + metrics.markProcessingDone(); + long completionTime = metrics.processingCompletionTime(); + Assert.assertTrue( + "Completion time: " + completionTime, + completionTime >= current && completionTime < (current + 10_000) + ); + } + +} diff --git a/services/src/main/java/org/apache/druid/cli/PullDependencies.java b/services/src/main/java/org/apache/druid/cli/PullDependencies.java index 37a9f4b7598..f2ea7f1b684 100644 --- a/services/src/main/java/org/apache/druid/cli/PullDependencies.java +++ b/services/src/main/java/org/apache/druid/cli/PullDependencies.java @@ -83,6 +83,9 @@ public class PullDependencies implements Runnable .put("commons-beanutils", "commons-beanutils") .put("org.apache.commons", "commons-compress") .put("org.apache.zookeeper", "zookeeper") + .put("com.fasterxml.jackson.core", "jackson-databind") + .put("com.fasterxml.jackson.core", "jackson-core") + .put("com.fasterxml.jackson.core", "jackson-annotations") .build(); /*