diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java index 079f46676ca..dee3bf320f5 100644 --- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java +++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java @@ -19,8 +19,10 @@ package com.metamx.druid.common.s3; +import com.google.common.base.Throwables; import com.metamx.common.logger.Logger; import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; @@ -28,6 +30,8 @@ import org.jets3t.service.model.S3Object; import java.io.File; import java.io.IOException; import java.security.NoSuchAlgorithmException; +import java.util.Random; +import java.util.concurrent.Callable; /** * @@ -80,4 +84,41 @@ public class S3Utils } } + + /** + * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not + * found, etc) are not retried. + */ + public static T retryS3Operation(Callable f) throws ServiceException, InterruptedException + { + int nTry = 0; + final int maxTries = 3; + final long baseSleepMillis = 1000; + final double fuzziness = 0.2; + while (true) { + try { + nTry++; + return f.call(); + } + catch (ServiceException e) { + if (nTry <= maxTries && + (e.getCause() instanceof IOException || + (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) { + // Retryable + final long sleepMillis = Math.max( + baseSleepMillis, + (long) (baseSleepMillis * Math.pow(2, nTry) * + (1 + new Random().nextGaussian() * fuzziness)) + ); + log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis); + Thread.sleep(sleepMillis); + } else { + throw e; + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } } diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java index 210decc1178..59100b5fdeb 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java @@ -83,7 +83,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller } try { - retryS3Operation( + S3Utils.retryS3Operation( new Callable() { @Override @@ -140,7 +140,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException { try { - return retryS3Operation( + return S3Utils.retryS3Operation( new Callable() { @Override @@ -164,7 +164,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller { final S3Coords coords = new S3Coords(segment); try { - final StorageObject objDetails = retryS3Operation( + final StorageObject objDetails = S3Utils.retryS3Operation( new Callable() { @Override @@ -184,39 +184,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller } } - private T retryS3Operation(Callable f) throws ServiceException, InterruptedException - { - int nTry = 0; - final int maxTries = 3; - final long baseSleepMillis = 1000; - final double fuzziness = 0.2; - while (true) { - try { - nTry++; - return f.call(); - } - catch (ServiceException e) { - if (nTry <= maxTries && - (e.getCause() instanceof IOException || - (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) { - // Retryable - final long sleepMillis = Math.max( - baseSleepMillis, - (long) (baseSleepMillis * Math.pow(2, nTry) * - (1 + new Random().nextGaussian() * fuzziness)) - ); - log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis); - Thread.sleep(sleepMillis); - } else { - throw e; - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - private static class S3Coords { String bucket;