Merge pull request #215 from metamx/s3-retries

Retries for S3TaskLogs, S3DataSegmentPusher
This commit is contained in:
cheddar 2013-08-12 14:36:12 -07:00
commit d739b15380
4 changed files with 81 additions and 48 deletions

View File

@ -53,7 +53,7 @@ public class S3Utils
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
public static <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
public static <T> T retryS3Operation(Callable<T> f) throws IOException, ServiceException, InterruptedException
{
int nTry = 0;
final int maxTries = 3;
@ -66,7 +66,7 @@ public class S3Utils
if (nTry <= maxTries) {
awaitNextRetry(e, nTry);
} else {
throw Throwables.propagate(e);
throw e;
}
}
catch (ServiceException e) {

View File

@ -5,6 +5,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.InputSupplier;
import com.metamx.common.logger.Logger;
import com.metamx.druid.common.s3.S3Utils;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageService;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -13,6 +14,7 @@ import org.jets3t.service.model.StorageObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Callable;
/**
* Provides task logs archived on S3.
@ -86,16 +88,25 @@ public class S3TaskLogs implements TaskLogs
}
}
public void pushTaskLog(String taskid, File logFile) throws IOException
public void pushTaskLog(final String taskid, final File logFile) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
log.info("Pushing task log %s to: %s", logFile, taskKey);
try {
log.info("Pushing task log %s to: %s", logFile, taskKey);
final StorageObject object = new StorageObject(logFile);
object.setKey(taskKey);
service.putObject(bucket, object);
S3Utils.retryS3Operation(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
final StorageObject object = new StorageObject(logFile);
object.setKey(taskKey);
service.putObject(bucket, object);
return null;
}
}
);
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);

View File

@ -40,7 +40,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream;
@ -154,6 +153,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
@ -179,6 +181,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage());
}

View File

@ -21,21 +21,23 @@ package com.metamx.druid.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.utils.CompressionUtils;
import com.metamx.emitter.EmittingLogger;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Callable;
public class S3DataSegmentPusher implements DataSegmentPusher
{
@ -63,61 +65,76 @@ public class S3DataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
log.info("Uploading [%s] to S3", indexFilesDir);
String outputKey = JOINER.join(
final String outputKey = JOINER.join(
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
DataSegmentPusherUtil.getStorageDir(segment)
DataSegmentPusherUtil.getStorageDir(inSegment)
);
final File zipOutFile = File.createTempFile("druid", "index.zip");
long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
try {
S3Object toPush = new S3Object(zipOutFile);
return S3Utils.retryS3Operation(
new Callable<DataSegment>()
{
@Override
public DataSegment call() throws Exception
{
S3Object toPush = new S3Object(zipOutFile);
final String outputBucket = config.getBucket();
toPush.setBucketName(outputBucket);
toPush.setKey(outputKey + "/index.zip");
if (!config.getDisableAcl()) {
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
final String outputBucket = config.getBucket();
toPush.setBucketName(outputBucket);
toPush.setKey(outputKey + "/index.zip");
if (!config.getDisableAcl()) {
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
log.info("Pushing %s.", toPush);
s3Client.putObject(outputBucket, toPush);
log.info("Pushing %s.", toPush);
s3Client.putObject(outputBucket, toPush);
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of("type", "s3_zip", "bucket", outputBucket, "key", toPush.getKey())
)
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
final DataSegment outSegment = inSegment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"s3_zip",
"bucket",
outputBucket,
"key",
toPush.getKey()
)
)
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
if (!config.getDisableAcl()) {
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
File descriptorFile = File.createTempFile("druid", "descriptor.json");
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(inSegment)), descriptorFile);
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(outputKey + "/descriptor.json");
if (!config.getDisableAcl()) {
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
return segment;
return outSegment;
}
}
);
}
catch (NoSuchAlgorithmException e) {
catch (ServiceException e) {
throw new IOException(e);
}
catch (S3ServiceException e) {
throw new IOException(e);
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}