diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 087d6684c7f..0948f46096b 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -73,6 +73,10 @@ public class S3Utils if (e == null) { return false; } else if (e instanceof IOException) { + if (e.getCause() != null) { + // Recurse with the underlying cause to see if it's retriable. + return apply(e.getCause()); + } return true; } else if (e instanceof SdkClientException && e.getMessage().contains("Data read has a different length than the expected")) { diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java index 4577d5ba980..55773f0f2a1 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java @@ -131,7 +131,7 @@ public class S3DataSegmentPullerTest } @Test - public void testGZUncompressRetries() throws IOException, SegmentLoadingException + public void testGZUncompressOn4xxError() throws IOException { final String bucket = "bucket"; final String keyPrefix = "prefix/dir/0"; @@ -165,6 +165,65 @@ public class S3DataSegmentPullerTest AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest"); exception.setErrorCode("NoSuchKey"); exception.setStatusCode(404); + EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(true) + .once(); + EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) + .andThrow(exception) + .once(); + S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); + + EasyMock.replay(s3Client); + Assert.assertThrows( + SegmentLoadingException.class, + () -> puller.getSegmentFiles( + new CloudObjectLocation( + bucket, + object0.getKey() + ), tmpDir + ) + ); + EasyMock.verify(s3Client); + + File expected = new File(tmpDir, "renames-0"); + Assert.assertFalse(expected.exists()); + } + + @Test + public void testGZUncompressOn5xxError() throws IOException, SegmentLoadingException + { + final String bucket = "bucket"; + final String keyPrefix = "prefix/dir/0"; + final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class); + final byte[] value = bucket.getBytes(StandardCharsets.UTF_8); + + final File tmpFile = temporaryFolder.newFile("gzTest.gz"); + + try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) { + outputStream.write(value); + } + + S3Object object0 = new S3Object(); + + object0.setBucketName(bucket); + object0.setKey(keyPrefix + "/renames-0.gz"); + object0.getObjectMetadata().setLastModified(new Date(0)); + object0.setObjectContent(new FileInputStream(tmpFile)); + + final S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucket); + objectSummary.setKey(keyPrefix + "/renames-0.gz"); + objectSummary.setLastModified(new Date(0)); + + final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result(); + listObjectsResult.setKeyCount(1); + listObjectsResult.getObjectSummaries().add(objectSummary); + + File tmpDir = temporaryFolder.newFolder("gzTestDir"); + + AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest"); + exception.setErrorCode("Slow Down"); + exception.setStatusCode(503); EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) .andReturn(true) .once(); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3UtilsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3UtilsTest.java new file mode 100644 index 00000000000..6667d78a8c6 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3UtilsTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.amazonaws.services.s3.model.AmazonS3Exception; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class S3UtilsTest +{ + @Test + public void testRetryWithIOExceptions() + { + final int maxRetries = 3; + final AtomicInteger count = new AtomicInteger(); + Assert.assertThrows( + IOException.class, + () -> S3Utils.retryS3Operation( + () -> { + count.incrementAndGet(); + throw new IOException("hmm"); + }, + maxRetries + )); + Assert.assertEquals(maxRetries, count.get()); + } + + @Test + public void testRetryWith4XXErrors() + { + final AtomicInteger count = new AtomicInteger(); + Assert.assertThrows( + IOException.class, + () -> S3Utils.retryS3Operation( + () -> { + if (count.incrementAndGet() >= 2) { + return "hey"; + } else { + AmazonS3Exception s3Exception = new AmazonS3Exception("a 403 s3 exception"); + s3Exception.setStatusCode(403); + throw new IOException(s3Exception); + } + }, + 3 + )); + Assert.assertEquals(1, count.get()); + } + + @Test + public void testRetryWith5XXErrorsNotExceedingMaxRetries() throws Exception + { + final int maxRetries = 3; + final AtomicInteger count = new AtomicInteger(); + S3Utils.retryS3Operation( + () -> { + if (count.incrementAndGet() >= maxRetries) { + return "hey"; + } else { + AmazonS3Exception s3Exception = new AmazonS3Exception("a 5xx s3 exception"); + s3Exception.setStatusCode(500); + throw new IOException(s3Exception); + } + }, + maxRetries + ); + Assert.assertEquals(maxRetries, count.get()); + } + + @Test + public void testRetryWith5XXErrorsExceedingMaxRetries() + { + final int maxRetries = 3; + final AtomicInteger count = new AtomicInteger(); + Assert.assertThrows( + IOException.class, + () -> S3Utils.retryS3Operation( + () -> { + if (count.incrementAndGet() > maxRetries) { + return "hey"; + } else { + AmazonS3Exception s3Exception = new AmazonS3Exception("a 5xx s3 exception"); + s3Exception.setStatusCode(500); + throw new IOException(s3Exception); + } + }, + maxRetries + ) + ); + Assert.assertEquals(maxRetries, count.get()); + } +}