mirror of
https://github.com/apache/druid.git
synced 2025-02-20 00:47:40 +00:00
Update S3 retry logic to account for the underlying cause in case of IOException
(#15238)
* Update S3 retry logic based on the underlying cause in case of IOException. 4xx and other errors wrapped in IOException for instance aren't retriable. * Fix CI
This commit is contained in:
parent
65b69cded4
commit
63e3e9531d
@ -73,6 +73,10 @@ public class S3Utils
|
|||||||
if (e == null) {
|
if (e == null) {
|
||||||
return false;
|
return false;
|
||||||
} else if (e instanceof IOException) {
|
} else if (e instanceof IOException) {
|
||||||
|
if (e.getCause() != null) {
|
||||||
|
// Recurse with the underlying cause to see if it's retriable.
|
||||||
|
return apply(e.getCause());
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
} else if (e instanceof SdkClientException
|
} else if (e instanceof SdkClientException
|
||||||
&& e.getMessage().contains("Data read has a different length than the expected")) {
|
&& e.getMessage().contains("Data read has a different length than the expected")) {
|
||||||
|
@ -131,7 +131,7 @@ public class S3DataSegmentPullerTest
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGZUncompressRetries() throws IOException, SegmentLoadingException
|
public void testGZUncompressOn4xxError() throws IOException
|
||||||
{
|
{
|
||||||
final String bucket = "bucket";
|
final String bucket = "bucket";
|
||||||
final String keyPrefix = "prefix/dir/0";
|
final String keyPrefix = "prefix/dir/0";
|
||||||
@ -165,6 +165,65 @@ public class S3DataSegmentPullerTest
|
|||||||
AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest");
|
AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest");
|
||||||
exception.setErrorCode("NoSuchKey");
|
exception.setErrorCode("NoSuchKey");
|
||||||
exception.setStatusCode(404);
|
exception.setStatusCode(404);
|
||||||
|
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
||||||
|
.andReturn(true)
|
||||||
|
.once();
|
||||||
|
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
|
||||||
|
.andThrow(exception)
|
||||||
|
.once();
|
||||||
|
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
|
||||||
|
|
||||||
|
EasyMock.replay(s3Client);
|
||||||
|
Assert.assertThrows(
|
||||||
|
SegmentLoadingException.class,
|
||||||
|
() -> puller.getSegmentFiles(
|
||||||
|
new CloudObjectLocation(
|
||||||
|
bucket,
|
||||||
|
object0.getKey()
|
||||||
|
), tmpDir
|
||||||
|
)
|
||||||
|
);
|
||||||
|
EasyMock.verify(s3Client);
|
||||||
|
|
||||||
|
File expected = new File(tmpDir, "renames-0");
|
||||||
|
Assert.assertFalse(expected.exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGZUncompressOn5xxError() throws IOException, SegmentLoadingException
|
||||||
|
{
|
||||||
|
final String bucket = "bucket";
|
||||||
|
final String keyPrefix = "prefix/dir/0";
|
||||||
|
final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||||
|
final byte[] value = bucket.getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
final File tmpFile = temporaryFolder.newFile("gzTest.gz");
|
||||||
|
|
||||||
|
try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) {
|
||||||
|
outputStream.write(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
S3Object object0 = new S3Object();
|
||||||
|
|
||||||
|
object0.setBucketName(bucket);
|
||||||
|
object0.setKey(keyPrefix + "/renames-0.gz");
|
||||||
|
object0.getObjectMetadata().setLastModified(new Date(0));
|
||||||
|
object0.setObjectContent(new FileInputStream(tmpFile));
|
||||||
|
|
||||||
|
final S3ObjectSummary objectSummary = new S3ObjectSummary();
|
||||||
|
objectSummary.setBucketName(bucket);
|
||||||
|
objectSummary.setKey(keyPrefix + "/renames-0.gz");
|
||||||
|
objectSummary.setLastModified(new Date(0));
|
||||||
|
|
||||||
|
final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result();
|
||||||
|
listObjectsResult.setKeyCount(1);
|
||||||
|
listObjectsResult.getObjectSummaries().add(objectSummary);
|
||||||
|
|
||||||
|
File tmpDir = temporaryFolder.newFolder("gzTestDir");
|
||||||
|
|
||||||
|
AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest");
|
||||||
|
exception.setErrorCode("Slow Down");
|
||||||
|
exception.setStatusCode(503);
|
||||||
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
|
||||||
.andReturn(true)
|
.andReturn(true)
|
||||||
.once();
|
.once();
|
||||||
|
@ -0,0 +1,111 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.storage.s3;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
public class S3UtilsTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testRetryWithIOExceptions()
|
||||||
|
{
|
||||||
|
final int maxRetries = 3;
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
Assert.assertThrows(
|
||||||
|
IOException.class,
|
||||||
|
() -> S3Utils.retryS3Operation(
|
||||||
|
() -> {
|
||||||
|
count.incrementAndGet();
|
||||||
|
throw new IOException("hmm");
|
||||||
|
},
|
||||||
|
maxRetries
|
||||||
|
));
|
||||||
|
Assert.assertEquals(maxRetries, count.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryWith4XXErrors()
|
||||||
|
{
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
Assert.assertThrows(
|
||||||
|
IOException.class,
|
||||||
|
() -> S3Utils.retryS3Operation(
|
||||||
|
() -> {
|
||||||
|
if (count.incrementAndGet() >= 2) {
|
||||||
|
return "hey";
|
||||||
|
} else {
|
||||||
|
AmazonS3Exception s3Exception = new AmazonS3Exception("a 403 s3 exception");
|
||||||
|
s3Exception.setStatusCode(403);
|
||||||
|
throw new IOException(s3Exception);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
3
|
||||||
|
));
|
||||||
|
Assert.assertEquals(1, count.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryWith5XXErrorsNotExceedingMaxRetries() throws Exception
|
||||||
|
{
|
||||||
|
final int maxRetries = 3;
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
S3Utils.retryS3Operation(
|
||||||
|
() -> {
|
||||||
|
if (count.incrementAndGet() >= maxRetries) {
|
||||||
|
return "hey";
|
||||||
|
} else {
|
||||||
|
AmazonS3Exception s3Exception = new AmazonS3Exception("a 5xx s3 exception");
|
||||||
|
s3Exception.setStatusCode(500);
|
||||||
|
throw new IOException(s3Exception);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
maxRetries
|
||||||
|
);
|
||||||
|
Assert.assertEquals(maxRetries, count.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryWith5XXErrorsExceedingMaxRetries()
|
||||||
|
{
|
||||||
|
final int maxRetries = 3;
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
Assert.assertThrows(
|
||||||
|
IOException.class,
|
||||||
|
() -> S3Utils.retryS3Operation(
|
||||||
|
() -> {
|
||||||
|
if (count.incrementAndGet() > maxRetries) {
|
||||||
|
return "hey";
|
||||||
|
} else {
|
||||||
|
AmazonS3Exception s3Exception = new AmazonS3Exception("a 5xx s3 exception");
|
||||||
|
s3Exception.setStatusCode(500);
|
||||||
|
throw new IOException(s3Exception);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
maxRetries
|
||||||
|
)
|
||||||
|
);
|
||||||
|
Assert.assertEquals(maxRetries, count.get());
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user