diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java index 15fa7c88240..84ce35df947 100644 --- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java +++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java @@ -53,7 +53,7 @@ public class S3Utils * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not * found, etc) are not retried. */ - public static T retryS3Operation(Callable f) throws ServiceException, InterruptedException + public static T retryS3Operation(Callable f) throws IOException, ServiceException, InterruptedException { int nTry = 0; final int maxTries = 3; @@ -66,7 +66,7 @@ public class S3Utils if (nTry <= maxTries) { awaitNextRetry(e, nTry); } else { - throw Throwables.propagate(e); + throw e; } } catch (ServiceException e) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java index a09f2ef7e00..89c1b6d2b44 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java @@ -5,6 +5,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.io.InputSupplier; import com.metamx.common.logger.Logger; +import com.metamx.druid.common.s3.S3Utils; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageService; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -13,6 +14,7 @@ import org.jets3t.service.model.StorageObject; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.Callable; /** * Provides task logs archived on S3. @@ -86,16 +88,25 @@ public class S3TaskLogs implements TaskLogs } } - public void pushTaskLog(String taskid, File logFile) throws IOException + public void pushTaskLog(final String taskid, final File logFile) throws IOException { final String taskKey = getTaskLogKey(taskid); + log.info("Pushing task log %s to: %s", logFile, taskKey); try { - log.info("Pushing task log %s to: %s", logFile, taskKey); - - final StorageObject object = new StorageObject(logFile); - object.setKey(taskKey); - service.putObject(bucket, object); + S3Utils.retryS3Operation( + new Callable() + { + @Override + public Void call() throws Exception + { + final StorageObject object = new StorageObject(logFile); + object.setKey(taskKey); + service.putObject(bucket, object); + return null; + } + } + ); } catch (Exception e) { Throwables.propagateIfInstanceOf(e, IOException.class); diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java index 75d212886da..c6a82ffa3ff 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java @@ -40,7 +40,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Map; -import java.util.Random; import java.util.concurrent.Callable; import java.util.zip.GZIPInputStream; @@ -154,6 +153,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller catch (InterruptedException e) { throw Throwables.propagate(e); } + catch (IOException e) { + throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); + } catch (ServiceException e) { throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); } @@ -179,6 +181,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller catch (InterruptedException e) { throw Throwables.propagate(e); } + catch (IOException e) { + throw new SegmentLoadingException(e, e.getMessage()); + } catch (ServiceException e) { throw new SegmentLoadingException(e, e.getMessage()); } diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java index d9ac69e5c5b..a5462a83eb7 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPusher.java @@ -21,21 +21,23 @@ package com.metamx.druid.loading; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.common.s3.S3Utils; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.utils.CompressionUtils; import com.metamx.emitter.EmittingLogger; -import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; import org.jets3t.service.acl.gs.GSAccessControlList; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; import java.io.File; import java.io.IOException; -import java.security.NoSuchAlgorithmException; +import java.util.concurrent.Callable; public class S3DataSegmentPusher implements DataSegmentPusher { @@ -63,61 +65,76 @@ public class S3DataSegmentPusher implements DataSegmentPusher } @Override - public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException + public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException { log.info("Uploading [%s] to S3", indexFilesDir); - String outputKey = JOINER.join( + final String outputKey = JOINER.join( config.getBaseKey().isEmpty() ? null : config.getBaseKey(), - DataSegmentPusherUtil.getStorageDir(segment) + DataSegmentPusherUtil.getStorageDir(inSegment) ); - final File zipOutFile = File.createTempFile("druid", "index.zip"); - long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile); + final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile); try { - S3Object toPush = new S3Object(zipOutFile); + return S3Utils.retryS3Operation( + new Callable() + { + @Override + public DataSegment call() throws Exception + { + S3Object toPush = new S3Object(zipOutFile); - final String outputBucket = config.getBucket(); - toPush.setBucketName(outputBucket); - toPush.setKey(outputKey + "/index.zip"); - if (!config.getDisableAcl()) { - toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); - } + final String outputBucket = config.getBucket(); + toPush.setBucketName(outputBucket); + toPush.setKey(outputKey + "/index.zip"); + if (!config.getDisableAcl()) { + toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); + } - log.info("Pushing %s.", toPush); - s3Client.putObject(outputBucket, toPush); + log.info("Pushing %s.", toPush); + s3Client.putObject(outputBucket, toPush); - segment = segment.withSize(indexSize) - .withLoadSpec( - ImmutableMap.of("type", "s3_zip", "bucket", outputBucket, "key", toPush.getKey()) - ) - .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir)); + final DataSegment outSegment = inSegment.withSize(indexSize) + .withLoadSpec( + ImmutableMap.of( + "type", + "s3_zip", + "bucket", + outputBucket, + "key", + toPush.getKey() + ) + ) + .withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir)); - File descriptorFile = File.createTempFile("druid", "descriptor.json"); - Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); - S3Object descriptorObject = new S3Object(descriptorFile); - descriptorObject.setBucketName(outputBucket); - descriptorObject.setKey(outputKey + "/descriptor.json"); - if (!config.getDisableAcl()) { - descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); - } + File descriptorFile = File.createTempFile("druid", "descriptor.json"); + Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile); + S3Object descriptorObject = new S3Object(descriptorFile); + descriptorObject.setBucketName(outputBucket); + descriptorObject.setKey(outputKey + "/descriptor.json"); + if (!config.getDisableAcl()) { + descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); + } - log.info("Pushing %s", descriptorObject); - s3Client.putObject(outputBucket, descriptorObject); + log.info("Pushing %s", descriptorObject); + s3Client.putObject(outputBucket, descriptorObject); - log.info("Deleting zipped index File[%s]", zipOutFile); - zipOutFile.delete(); + log.info("Deleting zipped index File[%s]", zipOutFile); + zipOutFile.delete(); - log.info("Deleting descriptor file[%s]", descriptorFile); - descriptorFile.delete(); + log.info("Deleting descriptor file[%s]", descriptorFile); + descriptorFile.delete(); - return segment; + return outSegment; + } + } + ); } - catch (NoSuchAlgorithmException e) { + catch (ServiceException e) { throw new IOException(e); } - catch (S3ServiceException e) { - throw new IOException(e); + catch (InterruptedException e) { + throw Throwables.propagate(e); } } } \ No newline at end of file