From 945bc9a3707d4d9036c83d4d3d4932f7a361c6f9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 16 Jul 2013 08:02:49 -0700 Subject: [PATCH 1/5] Limited retries on s3 segment puller exceptions Useful for reducing spurious alerts due to general s3 flakiness. --- .../druid/loading/S3DataSegmentPuller.java | 148 +++++++++++++----- 1 file changed, 109 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java index 011e1633ca1..210decc1178 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java @@ -19,6 +19,7 @@ package com.metamx.druid.loading; +import com.google.common.base.Throwables; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.common.io.Files; @@ -30,16 +31,17 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.common.s3.S3Utils; import com.metamx.druid.utils.CompressionUtils; import org.apache.commons.io.FileUtils; -import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; +import org.jets3t.service.model.StorageObject; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; import java.util.zip.GZIPInputStream; /** @@ -62,9 +64,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller } @Override - public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException { - S3Coords s3Coords = new S3Coords(segment); + final S3Coords s3Coords = new S3Coords(segment); log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); @@ -80,41 +82,52 @@ public class S3DataSegmentPuller implements DataSegmentPuller throw new ISE("outDir[%s] must be a directory.", outDir); } - long startTime = System.currentTimeMillis(); - S3Object s3Obj = null; - try { - s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path); + retryS3Operation( + new Callable() + { + @Override + public Void call() throws Exception + { + long startTime = System.currentTimeMillis(); + S3Object s3Obj = null; - InputStream in = null; - try { - in = s3Obj.getDataInputStream(); - final String key = s3Obj.getKey(); - if (key.endsWith(".zip")) { - CompressionUtils.unzip(in, outDir); - } else if (key.endsWith(".gz")) { - final File outFile = new File(outDir, toFilename(key, ".gz")); - ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); - } else { - ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); - } - log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime); - } - catch (IOException e) { - FileUtils.deleteDirectory(outDir); - throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); - } - finally { - Closeables.closeQuietly(in); - } + try { + s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path); + + InputStream in = null; + try { + in = s3Obj.getDataInputStream(); + final String key = s3Obj.getKey(); + if (key.endsWith(".zip")) { + CompressionUtils.unzip(in, outDir); + } else if (key.endsWith(".gz")) { + final File outFile = new File(outDir, toFilename(key, ".gz")); + ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); + } else { + ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); + } + log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime); + return null; + } + catch (IOException e) { + FileUtils.deleteDirectory(outDir); + throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); + } + finally { + Closeables.closeQuietly(in); + } + } + finally { + S3Utils.closeStreamsQuietly(s3Obj); + } + } + } + ); } catch (Exception e) { throw new SegmentLoadingException(e, e.getMessage()); } - finally { - S3Utils.closeStreamsQuietly(s3Obj); - } - } private String toFilename(String key, final String suffix) @@ -124,29 +137,86 @@ public class S3DataSegmentPuller implements DataSegmentPuller return filename; } - private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException + private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException { try { - return s3Client.isObjectInBucket(coords.bucket, coords.path); + return retryS3Operation( + new Callable() + { + @Override + public Boolean call() throws Exception + { + return s3Client.isObjectInBucket(coords.bucket, coords.path); + } + } + ); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); } catch (ServiceException e) { - throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); + throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); } } @Override public long getLastModified(DataSegment segment) throws SegmentLoadingException { - S3Coords coords = new S3Coords(segment); + final S3Coords coords = new S3Coords(segment); try { - S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path); + final StorageObject objDetails = retryS3Operation( + new Callable() + { + @Override + public StorageObject call() throws Exception + { + return s3Client.getObjectDetails(coords.bucket, coords.path); + } + } + ); return objDetails.getLastModifiedDate().getTime(); } - catch (S3ServiceException e) { + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + catch (ServiceException e) { throw new SegmentLoadingException(e, e.getMessage()); } } + private T retryS3Operation(Callable f) throws ServiceException, InterruptedException + { + int nTry = 0; + final int maxTries = 3; + final long baseSleepMillis = 1000; + final double fuzziness = 0.2; + while (true) { + try { + nTry++; + return f.call(); + } + catch (ServiceException e) { + if (nTry <= maxTries && + (e.getCause() instanceof IOException || + (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) { + // Retryable + final long sleepMillis = Math.max( + baseSleepMillis, + (long) (baseSleepMillis * Math.pow(2, nTry) * + (1 + new Random().nextGaussian() * fuzziness)) + ); + log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis); + Thread.sleep(sleepMillis); + } else { + throw e; + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + private static class S3Coords { String bucket; From 320f1fe84051d0abb6cd200a46d820a7810a59db Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 16 Jul 2013 08:07:14 -0700 Subject: [PATCH 2/5] Move retryS3Operation to S3Utils --- .../com/metamx/druid/common/s3/S3Utils.java | 41 +++++++++++++++++++ .../druid/loading/S3DataSegmentPuller.java | 39 ++---------------- 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java index 079f46676ca..dee3bf320f5 100644 --- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java +++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java @@ -19,8 +19,10 @@ package com.metamx.druid.common.s3; +import com.google.common.base.Throwables; import com.metamx.common.logger.Logger; import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; @@ -28,6 +30,8 @@ import org.jets3t.service.model.S3Object; import java.io.File; import java.io.IOException; import java.security.NoSuchAlgorithmException; +import java.util.Random; +import java.util.concurrent.Callable; /** * @@ -80,4 +84,41 @@ public class S3Utils } } + + /** + * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not + * found, etc) are not retried. + */ + public static T retryS3Operation(Callable f) throws ServiceException, InterruptedException + { + int nTry = 0; + final int maxTries = 3; + final long baseSleepMillis = 1000; + final double fuzziness = 0.2; + while (true) { + try { + nTry++; + return f.call(); + } + catch (ServiceException e) { + if (nTry <= maxTries && + (e.getCause() instanceof IOException || + (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) { + // Retryable + final long sleepMillis = Math.max( + baseSleepMillis, + (long) (baseSleepMillis * Math.pow(2, nTry) * + (1 + new Random().nextGaussian() * fuzziness)) + ); + log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis); + Thread.sleep(sleepMillis); + } else { + throw e; + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } } diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java index 210decc1178..59100b5fdeb 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java @@ -83,7 +83,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller } try { - retryS3Operation( + S3Utils.retryS3Operation( new Callable() { @Override @@ -140,7 +140,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException { try { - return retryS3Operation( + return S3Utils.retryS3Operation( new Callable() { @Override @@ -164,7 +164,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller { final S3Coords coords = new S3Coords(segment); try { - final StorageObject objDetails = retryS3Operation( + final StorageObject objDetails = S3Utils.retryS3Operation( new Callable() { @Override @@ -184,39 +184,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller } } - private T retryS3Operation(Callable f) throws ServiceException, InterruptedException - { - int nTry = 0; - final int maxTries = 3; - final long baseSleepMillis = 1000; - final double fuzziness = 0.2; - while (true) { - try { - nTry++; - return f.call(); - } - catch (ServiceException e) { - if (nTry <= maxTries && - (e.getCause() instanceof IOException || - (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) { - // Retryable - final long sleepMillis = Math.max( - baseSleepMillis, - (long) (baseSleepMillis * Math.pow(2, nTry) * - (1 + new Random().nextGaussian() * fuzziness)) - ); - log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis); - Thread.sleep(sleepMillis); - } else { - throw e; - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - private static class S3Coords { String bucket; From 4201d9ff246ce35a89832f7ee0a66bb91857175f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 16 Jul 2013 08:08:25 -0700 Subject: [PATCH 3/5] Remove unused methods from S3Utils --- .../com/metamx/druid/common/s3/S3Utils.java | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java index dee3bf320f5..f0ba2d133d3 100644 --- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java +++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java @@ -40,37 +40,6 @@ public class S3Utils { private static final Logger log = new Logger(S3Utils.class); - public static void putFileToS3( - File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path - ) - throws S3ServiceException, IOException, NoSuchAlgorithmException - { - S3Object s3Obj = new S3Object(localFile); - s3Obj.setBucketName(outputS3Bucket); - s3Obj.setKey(outputS3Path); - - log.info("Uploading file[%s] to [s3://%s/%s]", localFile, s3Obj.getBucketName(), s3Obj.getKey()); - s3Client.putObject(new S3Bucket(outputS3Bucket), s3Obj); - } - - public static void putFileToS3WrapExceptions( - File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path - ) - { - try { - putFileToS3(localFile, s3Client, outputS3Bucket, outputS3Path); - } - catch (S3ServiceException e) { - throw new RuntimeException(e); - } - catch (IOException e) { - throw new RuntimeException(e); - } - catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); - } - } - public static void closeStreamsQuietly(S3Object s3Obj) { if (s3Obj == null) { From 6258d77398f1b603cf47d59d6d314f4724d6555f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 16 Jul 2013 08:25:19 -0700 Subject: [PATCH 4/5] Retry s3 operations on non-wrapped IOExceptions Can happen if we get a socket related mishap while fetching an s3 object. --- .../com/metamx/druid/common/s3/S3Utils.java | 36 +++++++++++-------- .../druid/loading/S3DataSegmentPuller.java | 2 +- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java index f0ba2d133d3..15fa7c88240 100644 --- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java +++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java @@ -21,15 +21,10 @@ package com.metamx.druid.common.s3; import com.google.common.base.Throwables; import com.metamx.common.logger.Logger; -import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Bucket; import org.jets3t.service.model.S3Object; -import java.io.File; import java.io.IOException; -import java.security.NoSuchAlgorithmException; import java.util.Random; import java.util.concurrent.Callable; @@ -62,25 +57,23 @@ public class S3Utils { int nTry = 0; final int maxTries = 3; - final long baseSleepMillis = 1000; - final double fuzziness = 0.2; while (true) { try { nTry++; return f.call(); } + catch (IOException e) { + if (nTry <= maxTries) { + awaitNextRetry(e, nTry); + } else { + throw Throwables.propagate(e); + } + } catch (ServiceException e) { if (nTry <= maxTries && (e.getCause() instanceof IOException || (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) { - // Retryable - final long sleepMillis = Math.max( - baseSleepMillis, - (long) (baseSleepMillis * Math.pow(2, nTry) * - (1 + new Random().nextGaussian() * fuzziness)) - ); - log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis); - Thread.sleep(sleepMillis); + awaitNextRetry(e, nTry); } else { throw e; } @@ -90,4 +83,17 @@ public class S3Utils } } } + + private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException + { + final long baseSleepMillis = 1000; + final double fuzziness = 0.2; + final long sleepMillis = Math.max( + baseSleepMillis, + (long) (baseSleepMillis * Math.pow(2, nTry) * + (1 + new Random().nextGaussian() * fuzziness)) + ); + log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis); + Thread.sleep(sleepMillis); + } } diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java index 59100b5fdeb..75d212886da 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java @@ -112,7 +112,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller } catch (IOException e) { FileUtils.deleteDirectory(outDir); - throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); + throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e); } finally { Closeables.closeQuietly(in); From d4afeb5ebe3fff3d6c22350b4b46435e1256af98 Mon Sep 17 00:00:00 2001 From: cheddar Date: Thu, 18 Jul 2013 11:18:28 -0700 Subject: [PATCH 5/5] 1) upload.sh pushes up to artifacts/releases 2) Delete some commented stuff in build.sh --- build.sh | 2 -- upload.sh | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/build.sh b/build.sh index fe534e319f7..158a0cd6153 100755 --- a/build.sh +++ b/build.sh @@ -10,8 +10,6 @@ SCRIPT_DIR=`pwd` popd VERSION=`cat pom.xml | grep version | head -4 | tail -1 | sed 's_.*\([^<]*\).*_\1_'` -#TAR_FILE=${SCRIPT_DIR}/${PROJECT}-${VERSION}.tar.gz -#rm -f ${TAR_FILE} echo Using Version[${VERSION}] diff --git a/upload.sh b/upload.sh index 5e0580d3055..d242599adac 100755 --- a/upload.sh +++ b/upload.sh @@ -3,4 +3,4 @@ # # Script to upload tarball of assembly build to static.druid.io for serving # -s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/ +s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/releases