Fix kinesis ingestion bugs (#10761)

* add offsetFetchPeriod to kinesis ingestion doc

* Remove jackson dependencies from extensions

* Use fixed delay for lag collection

* Metrics reset after finishing processing

* comments

* Broaden the list of exceptions to retry for

* Unit tests

* Add more tests

* Refactoring

* re-order metrics

* Doc suggestions

Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com>

* Add tests

Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com>
This commit is contained in:
Abhishek Agarwal 2021-02-05 16:19:58 +05:30 committed by GitHub
parent a3af2ad2bd
commit 96d26e5338
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 410 additions and 87 deletions

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.retry.RetryUtils;
import java.io.IOException;
public class AWSClientUtil
{
/**
* Checks whether an exception can be retried or not. Implementation is copied
* from {@link com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition} except deprecated methods
* have been replaced with their recent versions.
*/
public static boolean isClientExceptionRecoverable(AmazonClientException exception)
{
// Always retry on client exceptions caused by IOException
if (exception.getCause() instanceof IOException) {
return true;
}
// A special check carried forwarded from previous implementation.
if (exception instanceof AmazonServiceException
&& "RequestTimeout".equals(((AmazonServiceException) exception).getErrorCode())) {
return true;
}
// This will retry for 5xx errors.
if (RetryUtils.isRetryableServiceException(exception)) {
return true;
}
if (RetryUtils.isThrottlingException(exception)) {
return true;
}
return RetryUtils.isClockSkewError(exception);
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class AWSClientUtilTest
{
@Test
public void testRecoverableException_IOException()
{
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(new AmazonClientException(new IOException())));
}
@Test
public void testRecoverableException_RequestTimeout()
{
AmazonServiceException ex = new AmazonServiceException(null);
ex.setErrorCode("RequestTimeout");
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex));
}
@Test
public void testRecoverableException_500()
{
AmazonServiceException ex = new AmazonServiceException(null);
ex.setStatusCode(500);
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex));
}
@Test
public void testRecoverableException_502()
{
AmazonServiceException ex = new AmazonServiceException(null);
ex.setStatusCode(502);
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex));
}
@Test
public void testRecoverableException_503()
{
AmazonServiceException ex = new AmazonServiceException(null);
ex.setStatusCode(503);
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex));
}
@Test
public void testRecoverableException_ProvisionedThroughputExceededException()
{
AmazonServiceException ex = new AmazonServiceException(null);
ex.setErrorCode("ProvisionedThroughputExceededException");
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex));
}
@Test
public void testRecoverableException_ClockSkewedError()
{
AmazonServiceException ex = new AmazonServiceException(null);
ex.setErrorCode("RequestExpired");
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex));
}
@Test
public void testNonRecoverableException_RuntimeException()
{
AmazonClientException ex = new AmazonClientException(new RuntimeException());
Assert.assertFalse(AWSClientUtil.isClientExceptionRecoverable(ex));
}
}

View File

@ -197,7 +197,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
| `chatRetries` | Integer | The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive. | no (default == 8) | | `chatRetries` | Integer | The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive. | no (default == 8) |
| `httpTimeout` | ISO8601 Period | How long to wait for a HTTP response from an indexing task. | no (default == PT10S) | | `httpTimeout` | ISO8601 Period | How long to wait for a HTTP response from an indexing task. | no (default == PT10S) |
| `shutdownTimeout` | ISO8601 Period | How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting. | no (default == PT80S) | | `shutdownTimeout` | ISO8601 Period | How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting. | no (default == PT80S) |
| `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag. | no (default == PT30S, min == PT5S) | | `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value (`PT5S`), the supervisor ignores the value and uses the minimum value instead. | no (default == PT30S, min == PT5S) |
| `segmentWriteOutMediumFactory` | Object | Segment write-out medium to use when creating segments. See below for more information. | no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used) | | `segmentWriteOutMediumFactory` | Object | Segment write-out medium to use when creating segments. See below for more information. | no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used) |
| `intermediateHandoffPeriod` | ISO8601 Period | How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier. | no (default == P2147483647D) | | `intermediateHandoffPeriod` | ISO8601 Period | How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier. | no (default == P2147483647D) |
| `logParseExceptions` | Boolean | If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred. | no, default == false | | `logParseExceptions` | Boolean | If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred. | no, default == false |

View File

@ -193,7 +193,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
| `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 | | `maxSavedParseExceptions` | Integer | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set. | no, default == 0 |
| `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 | | `maxRecordsPerPoll` | Integer | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))` | no, default == 100 |
| `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600. | no, (default == PT2M) | | `repartitionTransitionDuration` | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600. | no, (default == PT2M) |
| `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. | no (default == PT30S, min == PT5S) | | `offsetFetchPeriod` | ISO8601 Period | How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value (`PT5S`), the supervisor ignores the value and uses the minimum value instead. | no (default == PT30S, min == PT5S) |
#### IndexSpec #### IndexSpec
@ -471,4 +471,3 @@ with an assignment of closed shards that have been fully read and to ensure a ba
This window with early task shutdowns and possible task failures will conclude when: This window with early task shutdowns and possible task failures will conclude when:
- All closed shards have been fully read and the Kinesis ingestion tasks have published the data from those shards, committing the "closed" state to metadata storage - All closed shards have been fully read and the Kinesis ingestion tasks have published the data from those shards, committing the "closed" state to metadata storage
- Any remaining tasks that had inactive shards in the assignment have been shutdown (these tasks would have been created before the closed shards were completely drained) - Any remaining tasks that had inactive shards in the assignment have been shutdown (these tasks would have been created before the closed shards were completely drained)

View File

@ -406,6 +406,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertNotEquals(-1, task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
// Check published metadata and segments in deep storage // Check published metadata and segments in deep storage
assertEqualsExceptVersion( assertEqualsExceptVersion(
@ -453,6 +454,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertNotEquals(-1, task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
// Check published metadata and segments in deep storage // Check published metadata and segments in deep storage
assertEqualsExceptVersion( assertEqualsExceptVersion(
@ -504,6 +506,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertNotEquals(-1, task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
// Check published metadata and segments in deep storage // Check published metadata and segments in deep storage
assertEqualsExceptVersion( assertEqualsExceptVersion(

View File

@ -19,7 +19,7 @@
package org.apache.druid.indexing.kinesis; package org.apache.druid.indexing.kinesis;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Queues; import com.google.common.collect.Queues;
import org.apache.druid.common.aws.AWSClientUtil;
import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils; import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.ByteEntity;
@ -62,7 +63,6 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method; import java.lang.reflect.Method;
@ -107,14 +107,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000; private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10; private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
private static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
{
final boolean isIOException = ex.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
final boolean isInternalError = ex.getStatusCode() == 500 || ex.getStatusCode() == 503;
return isIOException || isTimeout || isInternalError;
}
/** /**
* Catch any exception and wrap it in a {@link StreamException} * Catch any exception and wrap it in a {@link StreamException}
*/ */
@ -357,8 +349,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
log.error(e, "encounted AWS error while attempting to fetch records, will not retry"); log.error(e, "encounted AWS error while attempting to fetch records, will not retry");
throw e; throw e;
} }
catch (AmazonServiceException e) { catch (AmazonClientException e) {
if (isServiceExceptionRecoverable(e)) { if (AWSClientUtil.isClientExceptionRecoverable(e)) {
log.warn(e, "encounted unknown recoverable AWS exception, retrying in [%,dms]", EXCEPTION_RETRY_DELAY_MS); log.warn(e, "encounted unknown recoverable AWS exception, retrying in [%,dms]", EXCEPTION_RETRY_DELAY_MS);
scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS); scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS);
} else { } else {
@ -750,6 +742,15 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
return partitionsFetchStarted.get(); return partitionsFetchStarted.get();
} }
@VisibleForTesting
public boolean isAnyFetchActive()
{
return partitionResources.values()
.stream()
.map(pr -> pr.currentFetch)
.anyMatch(fetch -> (fetch != null && !fetch.isDone()));
}
/** /**
* Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call * Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call
* {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background * {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background
@ -803,9 +804,9 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
); );
return true; return true;
} }
if (throwable instanceof AmazonServiceException) { if (throwable instanceof AmazonClientException) {
AmazonServiceException ase = (AmazonServiceException) throwable; AmazonClientException ase = (AmazonClientException) throwable;
return isServiceExceptionRecoverable(ase); return AWSClientUtil.isClientExceptionRecoverable(ase);
} }
return false; return false;
}, },

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.kinesis; package org.apache.druid.indexing.kinesis;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.AmazonKinesisClient;
@ -29,6 +30,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription; import com.amazonaws.services.kinesis.model.StreamDescription;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -47,6 +49,7 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -88,7 +91,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"), new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"),
new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9") new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9")
); );
private static final List<Object> ALL_RECORDS = ImmutableList.builder() private static final List<OrderedPartitionableRecord<String, String, ByteEntity>> ALL_RECORDS = ImmutableList.<OrderedPartitionableRecord<String, String, ByteEntity>>builder()
.addAll(SHARD0_RECORDS.stream() .addAll(SHARD0_RECORDS.stream()
.map(x -> new OrderedPartitionableRecord<>( .map(x -> new OrderedPartitionableRecord<>(
STREAM, STREAM,
@ -171,7 +174,9 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
@After @After
public void tearDownTest() public void tearDownTest()
{ {
recordSupplier.close(); if (null != recordSupplier) {
recordSupplier.close();
}
recordSupplier = null; recordSupplier = null;
} }
@ -411,6 +416,68 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag());
} }
@Test
public void testPollWithKinesisNonRetryableFailure() throws InterruptedException
{
recordsPerFetch = 100;
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
EasyMock.anyString(),
EasyMock.anyString()
)).andReturn(
getShardIteratorResult0).anyTimes();
AmazonServiceException getException = new AmazonServiceException("BadRequest");
getException.setErrorCode("BadRequest");
getException.setStatusCode(400);
getException.setServiceName("AmazonKinesis");
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch)))
.andThrow(getException)
.once();
replayAll();
Set<StreamPartition<String>> partitions = ImmutableSet.of(
StreamPartition.of(STREAM, SHARD_ID0)
);
recordSupplier = new KinesisRecordSupplier(
kinesis,
recordsPerFetch,
0,
1,
false,
100,
5000,
5000,
60000,
100,
true
);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
recordSupplier.start();
int count = 0;
while (recordSupplier.isAnyFetchActive() && count++ < 10) {
Thread.sleep(100);
}
Assert.assertFalse(recordSupplier.isAnyFetchActive());
List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS));
verifyAll();
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(0, polledRecords.size());
}
@Test @Test
public void testSeek() public void testSeek()
throws InterruptedException throws InterruptedException
@ -766,6 +833,47 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
verifyAll(); verifyAll();
} }
@Test
public void getLatestSequenceNumberWhenKinesisRetryableException()
{
EasyMock.expect(kinesis.getShardIterator(
EasyMock.eq(STREAM),
EasyMock.eq(SHARD_ID0),
EasyMock.eq(ShardIteratorType.LATEST.toString())
)).andReturn(
getShardIteratorResult0).once();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).once();
AmazonClientException ex = new AmazonClientException(new IOException());
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 1000)))
.andThrow(ex)
.andReturn(getRecordsResult0)
.once();
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).once();
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(0L).once();
replayAll();
recordSupplier = new KinesisRecordSupplier(
kinesis,
recordsPerFetch,
0,
2,
true,
100,
5000,
5000,
1000,
100,
true
);
Assert.assertEquals("0", recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
}
private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper() private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper()
{ {
EasyMock.expect(kinesis.getShardIterator( EasyMock.expect(kinesis.getShardIterator(

View File

@ -19,6 +19,7 @@
package org.apache.druid.storage.s3; package org.apache.druid.storage.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3Object;
@ -28,6 +29,7 @@ import com.google.common.base.Strings;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.common.aws.AWSClientUtil;
import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
@ -243,25 +245,7 @@ public class S3DataSegmentPuller implements URIDataPuller
@Override @Override
public Predicate<Throwable> shouldRetryPredicate() public Predicate<Throwable> shouldRetryPredicate()
{ {
// Yay! smart retries! return S3Utils.S3RETRY;
return new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
if (e == null) {
return false;
}
if (e instanceof AmazonServiceException) {
return S3Utils.isServiceExceptionRecoverable((AmazonServiceException) e);
}
if (S3Utils.S3RETRY.apply(e)) {
return true;
}
// Look all the way down the cause chain, just in case something wraps it deep.
return apply(e.getCause());
}
};
} }
/** /**
@ -282,8 +266,8 @@ public class S3DataSegmentPuller implements URIDataPuller
S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath()); S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath());
return StringUtils.format("%d", objectSummary.getLastModified().getTime()); return StringUtils.format("%d", objectSummary.getLastModified().getTime());
} }
catch (AmazonServiceException e) { catch (AmazonClientException e) {
if (S3Utils.isServiceExceptionRecoverable(e)) { if (AWSClientUtil.isClientExceptionRecoverable(e)) {
// The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable // The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable
throw new IOE(e, "Could not fetch last modified timestamp from URI [%s]", uri); throw new IOE(e, "Could not fetch last modified timestamp from URI [%s]", uri);
} else { } else {

View File

@ -19,7 +19,7 @@
package org.apache.druid.storage.s3; package org.apache.druid.storage.s3;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CanonicalGrantee; import com.amazonaws.services.s3.model.CanonicalGrantee;
@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.common.aws.AWSClientUtil;
import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils;
@ -56,15 +57,6 @@ public class S3Utils
private static final Joiner JOINER = Joiner.on("/").skipNulls(); private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final Logger log = new Logger(S3Utils.class); private static final Logger log = new Logger(S3Utils.class);
static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
{
final boolean isIOException = ex.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
final boolean badStatusCode = ex.getStatusCode() == 400 || ex.getStatusCode() == 403 || ex.getStatusCode() == 404;
return !badStatusCode && (isIOException || isTimeout);
}
public static final Predicate<Throwable> S3RETRY = new Predicate<Throwable>() public static final Predicate<Throwable> S3RETRY = new Predicate<Throwable>()
{ {
@Override @Override
@ -74,8 +66,8 @@ public class S3Utils
return false; return false;
} else if (e instanceof IOException) { } else if (e instanceof IOException) {
return true; return true;
} else if (e instanceof AmazonServiceException) { } else if (e instanceof AmazonClientException) {
return isServiceExceptionRecoverable((AmazonServiceException) e); return AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) e);
} else { } else {
return apply(e.getCause()); return apply(e.getCause());
} }

View File

@ -214,6 +214,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private RowIngestionMeters rowIngestionMeters; private RowIngestionMeters rowIngestionMeters;
@MonotonicNonNull @MonotonicNonNull
private ParseExceptionHandler parseExceptionHandler; private ParseExceptionHandler parseExceptionHandler;
@MonotonicNonNull
private FireDepartmentMetrics fireDepartmentMetrics;
@MonotonicNonNull @MonotonicNonNull
private AuthorizerMapper authorizerMapper; private AuthorizerMapper authorizerMapper;
@ -395,7 +397,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
new RealtimeIOConfig(null, null), new RealtimeIOConfig(null, null),
null null
); );
FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); this.fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
toolbox.addMonitor(TaskRealtimeMetricsMonitorBuilder.build(task, fireDepartmentForMetrics, rowIngestionMeters)); toolbox.addMonitor(TaskRealtimeMetricsMonitorBuilder.build(task, fireDepartmentForMetrics, rowIngestionMeters));
final String lookupTier = task.getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER); final String lookupTier = task.getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER);
@ -712,6 +714,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} }
} }
if (!stillReading) {
// We let the fireDepartmentMetrics know that all messages have been read. This way, some metrics such as
// high message gap need not be reported
fireDepartmentMetrics.markProcessingDone();
}
if (System.currentTimeMillis() > nextCheckpointTime) { if (System.currentTimeMillis() > nextCheckpointTime) {
sequenceToCheckpoint = getLastSequenceMetadata(); sequenceToCheckpoint = getLastSequenceMetadata();
} }
@ -1356,6 +1364,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return rowIngestionMeters; return rowIngestionMeters;
} }
@VisibleForTesting
public FireDepartmentMetrics getFireDepartmentMetrics()
{
return fireDepartmentMetrics;
}
public void stopForcefully() public void stopForcefully()
{ {
log.info("Stopping forcefully (status: [%s])", status); log.info("Stopping forcefully (status: [%s])", status);

View File

@ -3424,7 +3424,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{ {
SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig(); SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
SeekableStreamSupervisorTuningConfig tuningConfig = spec.getTuningConfig(); SeekableStreamSupervisorTuningConfig tuningConfig = spec.getTuningConfig();
reportingExec.scheduleAtFixedRate( // Lag is collected with fixed delay instead of fixed rate as lag collection can involve calling external
// services and with fixed delay, a cooling buffer is guaranteed between successive calls
reportingExec.scheduleWithFixedDelay(
this::updateCurrentAndLatestOffsets, this::updateCurrentAndLatestOffsets,
ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
Math.max( Math.max(

View File

@ -19,7 +19,7 @@
package org.apache.druid.segment.realtime; package org.apache.druid.segment.realtime;
import com.google.common.base.Preconditions; import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class FireDepartmentMetrics public class FireDepartmentMetrics
{ {
private static final long DEFAULT_PROCESSING_COMPLETION_TIME = -1L;
private final AtomicLong processedCount = new AtomicLong(0); private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong processedWithErrorsCount = new AtomicLong(0); private final AtomicLong processedWithErrorsCount = new AtomicLong(0);
private final AtomicLong thrownAwayCount = new AtomicLong(0); private final AtomicLong thrownAwayCount = new AtomicLong(0);
@ -45,6 +47,7 @@ public class FireDepartmentMetrics
private final AtomicLong sinkCount = new AtomicLong(0); private final AtomicLong sinkCount = new AtomicLong(0);
private final AtomicLong messageMaxTimestamp = new AtomicLong(0); private final AtomicLong messageMaxTimestamp = new AtomicLong(0);
private final AtomicLong messageGap = new AtomicLong(0); private final AtomicLong messageGap = new AtomicLong(0);
private final AtomicLong messageProcessingCompletionTime = new AtomicLong(DEFAULT_PROCESSING_COMPLETION_TIME);
public void incrementProcessed() public void incrementProcessed()
{ {
@ -131,6 +134,23 @@ public class FireDepartmentMetrics
this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp, this.messageMaxTimestamp.get())); this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp, this.messageMaxTimestamp.get()));
} }
public void markProcessingDone()
{
markProcessingDone(System.currentTimeMillis());
}
@VisibleForTesting
void markProcessingDone(long timestamp)
{
this.messageProcessingCompletionTime.compareAndSet(DEFAULT_PROCESSING_COMPLETION_TIME, timestamp);
}
@VisibleForTesting
public long processingCompletionTime()
{
return messageProcessingCompletionTime.get();
}
public long processed() public long processed()
{ {
return processedCount.get(); return processedCount.get();
@ -241,37 +261,9 @@ public class FireDepartmentMetrics
retVal.handOffCount.set(handOffCount.get()); retVal.handOffCount.set(handOffCount.get());
retVal.sinkCount.set(sinkCount.get()); retVal.sinkCount.set(sinkCount.get());
retVal.messageMaxTimestamp.set(messageMaxTimestamp.get()); retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
retVal.messageGap.set(System.currentTimeMillis() - messageMaxTimestamp.get()); retVal.messageProcessingCompletionTime.set(messageProcessingCompletionTime.get());
retVal.messageProcessingCompletionTime.compareAndSet(DEFAULT_PROCESSING_COMPLETION_TIME, System.currentTimeMillis());
retVal.messageGap.set(retVal.messageProcessingCompletionTime.get() - messageMaxTimestamp.get());
return retVal; return retVal;
} }
/**
* merge other FireDepartmentMetrics, will modify this object's data
*
* @return this object
*/
public FireDepartmentMetrics merge(FireDepartmentMetrics other)
{
Preconditions.checkNotNull(other, "Cannot merge a null FireDepartmentMetrics");
FireDepartmentMetrics otherSnapshot = other.snapshot();
processedCount.addAndGet(otherSnapshot.processed());
processedWithErrorsCount.addAndGet(otherSnapshot.processedWithErrors());
thrownAwayCount.addAndGet(otherSnapshot.thrownAway());
rowOutputCount.addAndGet(otherSnapshot.rowOutput());
unparseableCount.addAndGet(otherSnapshot.unparseable());
dedupCount.addAndGet(otherSnapshot.dedup());
numPersists.addAndGet(otherSnapshot.numPersists());
persistTimeMillis.addAndGet(otherSnapshot.persistTimeMillis());
persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis());
failedPersists.addAndGet(otherSnapshot.failedPersists());
failedHandoffs.addAndGet(otherSnapshot.failedHandoffs());
mergeTimeMillis.addAndGet(otherSnapshot.mergeTimeMillis());
mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime());
persistCpuTime.addAndGet(otherSnapshot.persistCpuTime());
handOffCount.addAndGet(otherSnapshot.handOffCount());
sinkCount.addAndGet(otherSnapshot.sinkCount());
messageMaxTimestamp.set(Math.max(messageMaxTimestamp(), otherSnapshot.messageMaxTimestamp()));
messageGap.set(Math.max(messageGap(), otherSnapshot.messageGap()));
return this;
}
} }

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class FireDepartmentMetricsTest
{
private FireDepartmentMetrics metrics;
@Before
public void setup()
{
metrics = new FireDepartmentMetrics();
}
@Test
public void testSnapshotAfterProcessingOver()
{
metrics.reportMessageMaxTimestamp(10);
metrics.markProcessingDone(30L);
Assert.assertEquals(20, metrics.snapshot().messageGap());
}
@Test
public void testSnapshotBeforeProcessingOver()
{
metrics.reportMessageMaxTimestamp(10);
long current = System.currentTimeMillis();
long messageGap = metrics.snapshot().messageGap();
Assert.assertTrue("Message gap: " + messageGap, messageGap >= (current - 10));
}
@Test
public void testProcessingOverAfterSnapshot()
{
metrics.reportMessageMaxTimestamp(10);
metrics.snapshot();
metrics.markProcessingDone(20);
Assert.assertEquals(10, metrics.snapshot().messageGap());
}
@Test
public void testProcessingOverWithSystemTime()
{
metrics.reportMessageMaxTimestamp(10);
long current = System.currentTimeMillis();
metrics.markProcessingDone();
long completionTime = metrics.processingCompletionTime();
Assert.assertTrue(
"Completion time: " + completionTime,
completionTime >= current && completionTime < (current + 10_000)
);
}
}

View File

@ -83,6 +83,9 @@ public class PullDependencies implements Runnable
.put("commons-beanutils", "commons-beanutils") .put("commons-beanutils", "commons-beanutils")
.put("org.apache.commons", "commons-compress") .put("org.apache.commons", "commons-compress")
.put("org.apache.zookeeper", "zookeeper") .put("org.apache.zookeeper", "zookeeper")
.put("com.fasterxml.jackson.core", "jackson-databind")
.put("com.fasterxml.jackson.core", "jackson-core")
.put("com.fasterxml.jackson.core", "jackson-annotations")
.build(); .build();
/* /*