Retry s3 operations on non-wrapped IOExceptions

Can happen if we get a socket related mishap while fetching an s3 object.
This commit is contained in:
Gian Merlino 2013-07-16 08:25:19 -07:00
parent 4201d9ff24
commit 6258d77398
2 changed files with 22 additions and 16 deletions

View File

@ -21,15 +21,10 @@ package com.metamx.druid.common.s3;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.concurrent.Callable;
@ -62,25 +57,23 @@ public class S3Utils
{
int nTry = 0;
final int maxTries = 3;
final long baseSleepMillis = 1000;
final double fuzziness = 0.2;
while (true) {
try {
nTry++;
return f.call();
}
catch (IOException e) {
if (nTry <= maxTries) {
awaitNextRetry(e, nTry);
} else {
throw Throwables.propagate(e);
}
}
catch (ServiceException e) {
if (nTry <= maxTries &&
(e.getCause() instanceof IOException ||
(e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
// Retryable
final long sleepMillis = Math.max(
baseSleepMillis,
(long) (baseSleepMillis * Math.pow(2, nTry) *
(1 + new Random().nextGaussian() * fuzziness))
);
log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis);
Thread.sleep(sleepMillis);
awaitNextRetry(e, nTry);
} else {
throw e;
}
@ -90,4 +83,17 @@ public class S3Utils
}
}
}
private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException
{
final long baseSleepMillis = 1000;
final double fuzziness = 0.2;
final long sleepMillis = Math.max(
baseSleepMillis,
(long) (baseSleepMillis * Math.pow(2, nTry) *
(1 + new Random().nextGaussian() * fuzziness))
);
log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
Thread.sleep(sleepMillis);
}
}

View File

@ -112,7 +112,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
}
catch (IOException e) {
FileUtils.deleteDirectory(outDir);
throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
}
finally {
Closeables.closeQuietly(in);