retry when killing s3 based segments (#14776)

### Description

s3 deleteObjects request sent when killing s3 based segments now being retried, if failure is retry-able.
This commit is contained in:
zachjsh 2023-08-10 14:04:16 -04:00 committed by GitHub
parent 37ed0f4a17
commit 23306c4d80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 136 additions and 3 deletions

View File

@ -22,11 +22,50 @@ package org.apache.druid.common.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.retry.RetryUtils;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Set;
public class AWSClientUtil
{
/**
* This list of error code come from {@link RetryUtils}, and
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html">...</a>. At the moment, aws sdk
* does not expose a good way of retrying
* {@link com.amazonaws.services.s3.AmazonS3#deleteObjects(DeleteObjectsRequest)} requests. This request is used in
* org.apache.druid.storage.s3.S3DataSegmentKiller to delete a batch of segments from deep storage.
*/
private static final Set<String> RECOVERABLE_ERROR_CODES = ImmutableSet.of(
"503 SlowDown",
"AuthFailure",
"BandwidthLimitExceeded",
"EC2ThrottledException",
"IDPCommunicationError",
"InternalError",
"InvalidSignatureException",
"PriorRequestNotComplete",
"ProvisionedThroughputExceededException",
"RequestExpired",
"RequestInTheFuture",
"RequestLimitExceeded",
"RequestThrottled",
"RequestThrottledException",
"RequestTimeTooSkewed",
"RequestTimeout",
"RequestTimeoutException",
"ServiceUnavailable",
"SignatureDoesNotMatch",
"SlowDown",
"ThrottledException",
"ThrottlingException",
"TooManyRequestsException",
"TransactionInProgressException",
"Throttling"
);
/**
* Checks whether an exception can be retried or not. Implementation is copied
* from {@link com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition} except deprecated methods
@ -54,6 +93,19 @@ public class AWSClientUtil
return true;
}
return RetryUtils.isClockSkewError(exception);
if (RetryUtils.isClockSkewError(exception)) {
return true;
}
if (exception instanceof MultiObjectDeleteException) {
MultiObjectDeleteException multiObjectDeleteException = (MultiObjectDeleteException) exception;
for (MultiObjectDeleteException.DeleteError error : multiObjectDeleteException.getErrors()) {
if (RECOVERABLE_ERROR_CODES.contains(error.getCode())) {
return true;
}
}
}
return false;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.druid.common.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Test;
@ -82,6 +84,20 @@ public class AWSClientUtilTest
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex));
}
@Test
public void testRecoverableException_MultiObjectDeleteException()
{
MultiObjectDeleteException.DeleteError retryableError = new MultiObjectDeleteException.DeleteError();
retryableError.setCode("RequestLimitExceeded");
MultiObjectDeleteException.DeleteError nonRetryableError = new MultiObjectDeleteException.DeleteError();
nonRetryableError.setCode("nonRetryableError");
MultiObjectDeleteException ex = new MultiObjectDeleteException(
ImmutableList.of(retryableError, nonRetryableError),
ImmutableList.of()
);
Assert.assertTrue(AWSClientUtil.isClientExceptionRecoverable(ex));
}
@Test
public void testNonRecoverableException_RuntimeException()
{

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
@ -50,6 +51,8 @@ public class S3DataSegmentKiller implements DataSegmentKiller
// AWS has max limit of 1000 objects that can be requested to be deleted at a time.
private static final int MAX_MULTI_OBJECT_DELETE_SIZE = 1000;
private static final String MULTI_OBJECT_DELETE_EXEPTION_ERROR_FORMAT = "message: [%s], code: [%s]";
/**
* Any implementation of DataSegmentKiller is initialized when an ingestion job starts if the extension is loaded,
* even when the implementation of DataSegmentKiller is not used. As a result, if we have a s3 client instead
@ -150,13 +153,23 @@ public class S3DataSegmentKiller implements DataSegmentKiller
s3Bucket,
keysToDeleteStrings
);
s3Client.deleteObjects(deleteObjectsRequest);
S3Utils.retryS3Operation(
() -> {
s3Client.deleteObjects(deleteObjectsRequest);
return null;
},
3
);
}
catch (MultiObjectDeleteException e) {
hadException = true;
Map<String, List<String>> errorToKeys = new HashMap<>();
for (MultiObjectDeleteException.DeleteError error : e.getErrors()) {
errorToKeys.computeIfAbsent(error.getMessage(), k -> new ArrayList<>()).add(error.getKey());
errorToKeys.computeIfAbsent(StringUtils.format(
MULTI_OBJECT_DELETE_EXEPTION_ERROR_FORMAT,
error.getMessage(),
error.getCode()
), k -> new ArrayList<>()).add(error.getKey());
}
errorToKeys.forEach((key, value) -> log.error(
"Unable to delete from bucket [%s], the following keys [%s], because [%s]",
@ -173,6 +186,14 @@ public class S3DataSegmentKiller implements DataSegmentKiller
chunkOfKeys.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.joining(", "))
);
}
catch (Exception e) {
hadException = true;
log.noStackTrace().warn(e,
"Unexpected exception occurred when deleting from bucket [%s], the following keys [%s]",
s3Bucket,
chunkOfKeys.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.joining(", "))
);
}
}
return hadException;
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.storage.s3;
import com.amazonaws.AbortedException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@ -405,4 +406,47 @@ public class S3DataSegmentKillerTest extends EasyMockSupport
);
Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage());
}
@Test
public void test_kill_listOfSegments_retryableExceptionThrown() throws SegmentLoadingException
{
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_1_PATH);
s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class));
MultiObjectDeleteException.DeleteError retryableError = new MultiObjectDeleteException.DeleteError();
retryableError.setCode("RequestLimitExceeded");
MultiObjectDeleteException.DeleteError nonRetryableError = new MultiObjectDeleteException.DeleteError();
nonRetryableError.setCode("nonRetryableError");
EasyMock.expectLastCall()
.andThrow(new MultiObjectDeleteException(
ImmutableList.of(retryableError, nonRetryableError),
ImmutableList.of()
))
.once();
EasyMock.expectLastCall().andVoid().times(2);
EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_1));
}
@Test
public void test_kill_listOfSegments_unexpectedExceptionIsThrown()
{
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET);
deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_2_PATH);
// struggled with the idea of making it match on equaling this
s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class));
EasyMock.expectLastCall().andThrow(new AbortedException("")).once();
EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig);
segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig);
SegmentLoadingException thrown = Assert.assertThrows(
SegmentLoadingException.class,
() -> segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_2))
);
Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage());
}
}