diff --git a/build.sh b/build.sh
index fe534e319f7..158a0cd6153 100755
--- a/build.sh
+++ b/build.sh
@@ -10,8 +10,6 @@ SCRIPT_DIR=`pwd`
popd
VERSION=`cat pom.xml | grep version | head -4 | tail -1 | sed 's_.*\([^<]*\).*_\1_'`
-#TAR_FILE=${SCRIPT_DIR}/${PROJECT}-${VERSION}.tar.gz
-#rm -f ${TAR_FILE}
echo Using Version[${VERSION}]
diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
index 079f46676ca..15fa7c88240 100644
--- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
+++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
@@ -19,15 +19,14 @@
package com.metamx.druid.common.s3;
+import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
-import org.jets3t.service.S3ServiceException;
-import org.jets3t.service.impl.rest.httpclient.RestS3Service;
-import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.ServiceException;
import org.jets3t.service.model.S3Object;
-import java.io.File;
import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
+import java.util.Random;
+import java.util.concurrent.Callable;
/**
*
@@ -36,37 +35,6 @@ public class S3Utils
{
private static final Logger log = new Logger(S3Utils.class);
- public static void putFileToS3(
- File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
- )
- throws S3ServiceException, IOException, NoSuchAlgorithmException
- {
- S3Object s3Obj = new S3Object(localFile);
- s3Obj.setBucketName(outputS3Bucket);
- s3Obj.setKey(outputS3Path);
-
- log.info("Uploading file[%s] to [s3://%s/%s]", localFile, s3Obj.getBucketName(), s3Obj.getKey());
- s3Client.putObject(new S3Bucket(outputS3Bucket), s3Obj);
- }
-
- public static void putFileToS3WrapExceptions(
- File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
- )
- {
- try {
- putFileToS3(localFile, s3Client, outputS3Bucket, outputS3Path);
- }
- catch (S3ServiceException e) {
- throw new RuntimeException(e);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
-
public static void closeStreamsQuietly(S3Object s3Obj)
{
if (s3Obj == null) {
@@ -80,4 +48,52 @@ public class S3Utils
}
}
+
+ /**
+ * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
+ * found, etc) are not retried.
+ */
+ public static T retryS3Operation(Callable f) throws ServiceException, InterruptedException
+ {
+ int nTry = 0;
+ final int maxTries = 3;
+ while (true) {
+ try {
+ nTry++;
+ return f.call();
+ }
+ catch (IOException e) {
+ if (nTry <= maxTries) {
+ awaitNextRetry(e, nTry);
+ } else {
+ throw Throwables.propagate(e);
+ }
+ }
+ catch (ServiceException e) {
+ if (nTry <= maxTries &&
+ (e.getCause() instanceof IOException ||
+ (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
+ awaitNextRetry(e, nTry);
+ } else {
+ throw e;
+ }
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+
+ private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException
+ {
+ final long baseSleepMillis = 1000;
+ final double fuzziness = 0.2;
+ final long sleepMillis = Math.max(
+ baseSleepMillis,
+ (long) (baseSleepMillis * Math.pow(2, nTry) *
+ (1 + new Random().nextGaussian() * fuzziness))
+ );
+ log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
+ Thread.sleep(sleepMillis);
+ }
}
diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
index 011e1633ca1..75d212886da 100644
--- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
+++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
@@ -19,6 +19,7 @@
package com.metamx.druid.loading;
+import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
@@ -30,16 +31,17 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
-import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
-import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
+import org.jets3t.service.model.StorageObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream;
/**
@@ -62,9 +64,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller
}
@Override
- public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
+ public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
{
- S3Coords s3Coords = new S3Coords(segment);
+ final S3Coords s3Coords = new S3Coords(segment);
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
@@ -80,41 +82,52 @@ public class S3DataSegmentPuller implements DataSegmentPuller
throw new ISE("outDir[%s] must be a directory.", outDir);
}
- long startTime = System.currentTimeMillis();
- S3Object s3Obj = null;
-
try {
- s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
+ S3Utils.retryS3Operation(
+ new Callable()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ S3Object s3Obj = null;
- InputStream in = null;
- try {
- in = s3Obj.getDataInputStream();
- final String key = s3Obj.getKey();
- if (key.endsWith(".zip")) {
- CompressionUtils.unzip(in, outDir);
- } else if (key.endsWith(".gz")) {
- final File outFile = new File(outDir, toFilename(key, ".gz"));
- ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
- } else {
- ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
- }
- log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
- }
- catch (IOException e) {
- FileUtils.deleteDirectory(outDir);
- throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
- }
- finally {
- Closeables.closeQuietly(in);
- }
+ try {
+ s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
+
+ InputStream in = null;
+ try {
+ in = s3Obj.getDataInputStream();
+ final String key = s3Obj.getKey();
+ if (key.endsWith(".zip")) {
+ CompressionUtils.unzip(in, outDir);
+ } else if (key.endsWith(".gz")) {
+ final File outFile = new File(outDir, toFilename(key, ".gz"));
+ ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
+ } else {
+ ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
+ }
+ log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
+ return null;
+ }
+ catch (IOException e) {
+ FileUtils.deleteDirectory(outDir);
+ throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
+ }
+ finally {
+ Closeables.closeQuietly(in);
+ }
+ }
+ finally {
+ S3Utils.closeStreamsQuietly(s3Obj);
+ }
+ }
+ }
+ );
}
catch (Exception e) {
throw new SegmentLoadingException(e, e.getMessage());
}
- finally {
- S3Utils.closeStreamsQuietly(s3Obj);
- }
-
}
private String toFilename(String key, final String suffix)
@@ -124,25 +137,49 @@ public class S3DataSegmentPuller implements DataSegmentPuller
return filename;
}
- private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException
+ private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
{
try {
- return s3Client.isObjectInBucket(coords.bucket, coords.path);
+ return S3Utils.retryS3Operation(
+ new Callable()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return s3Client.isObjectInBucket(coords.bucket, coords.path);
+ }
+ }
+ );
+ }
+ catch (InterruptedException e) {
+ throw Throwables.propagate(e);
}
catch (ServiceException e) {
- throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
+ throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
- S3Coords coords = new S3Coords(segment);
+ final S3Coords coords = new S3Coords(segment);
try {
- S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
+ final StorageObject objDetails = S3Utils.retryS3Operation(
+ new Callable()
+ {
+ @Override
+ public StorageObject call() throws Exception
+ {
+ return s3Client.getObjectDetails(coords.bucket, coords.path);
+ }
+ }
+ );
return objDetails.getLastModifiedDate().getTime();
}
- catch (S3ServiceException e) {
+ catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ catch (ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
diff --git a/upload.sh b/upload.sh
index 5e0580d3055..d242599adac 100755
--- a/upload.sh
+++ b/upload.sh
@@ -3,4 +3,4 @@
#
# Script to upload tarball of assembly build to static.druid.io for serving
#
-s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/
+s3cmd put services/target/druid-services-*-bin.tar.gz s3://static.druid.io/artifacts/releases