From 945bc9a3707d4d9036c83d4d3d4932f7a361c6f9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 16 Jul 2013 08:02:49 -0700 Subject: [PATCH] Limited retries on s3 segment puller exceptions Useful for reducing spurious alerts due to general s3 flakiness. --- .../druid/loading/S3DataSegmentPuller.java | 148 +++++++++++++----- 1 file changed, 109 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java index 011e1633ca1..210decc1178 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java @@ -19,6 +19,7 @@ package com.metamx.druid.loading; +import com.google.common.base.Throwables; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.common.io.Files; @@ -30,16 +31,17 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.common.s3.S3Utils; import com.metamx.druid.utils.CompressionUtils; import org.apache.commons.io.FileUtils; -import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; +import org.jets3t.service.model.StorageObject; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; import java.util.zip.GZIPInputStream; /** @@ -62,9 +64,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller } @Override - public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException { - S3Coords s3Coords = new S3Coords(segment); + final S3Coords s3Coords = new S3Coords(segment); log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); @@ -80,41 +82,52 @@ public class S3DataSegmentPuller implements DataSegmentPuller throw new ISE("outDir[%s] must be a directory.", outDir); } - long startTime = System.currentTimeMillis(); - S3Object s3Obj = null; - try { - s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path); + retryS3Operation( + new Callable() + { + @Override + public Void call() throws Exception + { + long startTime = System.currentTimeMillis(); + S3Object s3Obj = null; - InputStream in = null; - try { - in = s3Obj.getDataInputStream(); - final String key = s3Obj.getKey(); - if (key.endsWith(".zip")) { - CompressionUtils.unzip(in, outDir); - } else if (key.endsWith(".gz")) { - final File outFile = new File(outDir, toFilename(key, ".gz")); - ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); - } else { - ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); - } - log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime); - } - catch (IOException e) { - FileUtils.deleteDirectory(outDir); - throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); - } - finally { - Closeables.closeQuietly(in); - } + try { + s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path); + + InputStream in = null; + try { + in = s3Obj.getDataInputStream(); + final String key = s3Obj.getKey(); + if (key.endsWith(".zip")) { + CompressionUtils.unzip(in, outDir); + } else if (key.endsWith(".gz")) { + final File outFile = new File(outDir, toFilename(key, ".gz")); + ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); + } else { + ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); + } + log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime); + return null; + } + catch (IOException e) { + FileUtils.deleteDirectory(outDir); + throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); + } + finally { + Closeables.closeQuietly(in); + } + } + finally { + S3Utils.closeStreamsQuietly(s3Obj); + } + } + } + ); } catch (Exception e) { throw new SegmentLoadingException(e, e.getMessage()); } - finally { - S3Utils.closeStreamsQuietly(s3Obj); - } - } private String toFilename(String key, final String suffix) @@ -124,29 +137,86 @@ public class S3DataSegmentPuller implements DataSegmentPuller return filename; } - private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException + private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException { try { - return s3Client.isObjectInBucket(coords.bucket, coords.path); + return retryS3Operation( + new Callable() + { + @Override + public Boolean call() throws Exception + { + return s3Client.isObjectInBucket(coords.bucket, coords.path); + } + } + ); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); } catch (ServiceException e) { - throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); + throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); } } @Override public long getLastModified(DataSegment segment) throws SegmentLoadingException { - S3Coords coords = new S3Coords(segment); + final S3Coords coords = new S3Coords(segment); try { - S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path); + final StorageObject objDetails = retryS3Operation( + new Callable() + { + @Override + public StorageObject call() throws Exception + { + return s3Client.getObjectDetails(coords.bucket, coords.path); + } + } + ); return objDetails.getLastModifiedDate().getTime(); } - catch (S3ServiceException e) { + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + catch (ServiceException e) { throw new SegmentLoadingException(e, e.getMessage()); } } + private T retryS3Operation(Callable f) throws ServiceException, InterruptedException + { + int nTry = 0; + final int maxTries = 3; + final long baseSleepMillis = 1000; + final double fuzziness = 0.2; + while (true) { + try { + nTry++; + return f.call(); + } + catch (ServiceException e) { + if (nTry <= maxTries && + (e.getCause() instanceof IOException || + (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) { + // Retryable + final long sleepMillis = Math.max( + baseSleepMillis, + (long) (baseSleepMillis * Math.pow(2, nTry) * + (1 + new Random().nextGaussian() * fuzziness)) + ); + log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis); + Thread.sleep(sleepMillis); + } else { + throw e; + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + private static class S3Coords { String bucket;