Limited retries on s3 segment puller exceptions

Useful for reducing spurious alerts due to general s3 flakiness.
This commit is contained in:
Gian Merlino 2013-07-16 08:02:49 -07:00
parent dcc9942d8a
commit 945bc9a370
1 changed files with 109 additions and 39 deletions

View File

@ -19,6 +19,7 @@
package com.metamx.druid.loading; package com.metamx.druid.loading;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -30,16 +31,17 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils; import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.utils.CompressionUtils; import com.metamx.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException; import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object; import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
/** /**
@ -62,9 +64,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller
} }
@Override @Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
{ {
S3Coords s3Coords = new S3Coords(segment); final S3Coords s3Coords = new S3Coords(segment);
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
@ -80,41 +82,52 @@ public class S3DataSegmentPuller implements DataSegmentPuller
throw new ISE("outDir[%s] must be a directory.", outDir); throw new ISE("outDir[%s] must be a directory.", outDir);
} }
long startTime = System.currentTimeMillis();
S3Object s3Obj = null;
try { try {
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path); retryS3Operation(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
long startTime = System.currentTimeMillis();
S3Object s3Obj = null;
InputStream in = null; try {
try { s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
in = s3Obj.getDataInputStream();
final String key = s3Obj.getKey(); InputStream in = null;
if (key.endsWith(".zip")) { try {
CompressionUtils.unzip(in, outDir); in = s3Obj.getDataInputStream();
} else if (key.endsWith(".gz")) { final String key = s3Obj.getKey();
final File outFile = new File(outDir, toFilename(key, ".gz")); if (key.endsWith(".zip")) {
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile)); CompressionUtils.unzip(in, outDir);
} else { } else if (key.endsWith(".gz")) {
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); final File outFile = new File(outDir, toFilename(key, ".gz"));
} ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime); } else {
} ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
catch (IOException e) { }
FileUtils.deleteDirectory(outDir); log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); return null;
} }
finally { catch (IOException e) {
Closeables.closeQuietly(in); FileUtils.deleteDirectory(outDir);
} throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
}
finally {
Closeables.closeQuietly(in);
}
}
finally {
S3Utils.closeStreamsQuietly(s3Obj);
}
}
}
);
} }
catch (Exception e) { catch (Exception e) {
throw new SegmentLoadingException(e, e.getMessage()); throw new SegmentLoadingException(e, e.getMessage());
} }
finally {
S3Utils.closeStreamsQuietly(s3Obj);
}
} }
private String toFilename(String key, final String suffix) private String toFilename(String key, final String suffix)
@ -124,29 +137,86 @@ public class S3DataSegmentPuller implements DataSegmentPuller
return filename; return filename;
} }
private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
{ {
try { try {
return s3Client.isObjectInBucket(coords.bucket, coords.path); return retryS3Operation(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return s3Client.isObjectInBucket(coords.bucket, coords.path);
}
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
} }
catch (ServiceException e) { catch (ServiceException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
} }
} }
@Override @Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException public long getLastModified(DataSegment segment) throws SegmentLoadingException
{ {
S3Coords coords = new S3Coords(segment); final S3Coords coords = new S3Coords(segment);
try { try {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path); final StorageObject objDetails = retryS3Operation(
new Callable<StorageObject>()
{
@Override
public StorageObject call() throws Exception
{
return s3Client.getObjectDetails(coords.bucket, coords.path);
}
}
);
return objDetails.getLastModifiedDate().getTime(); return objDetails.getLastModifiedDate().getTime();
} }
catch (S3ServiceException e) { catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage()); throw new SegmentLoadingException(e, e.getMessage());
} }
} }
private <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
{
int nTry = 0;
final int maxTries = 3;
final long baseSleepMillis = 1000;
final double fuzziness = 0.2;
while (true) {
try {
nTry++;
return f.call();
}
catch (ServiceException e) {
if (nTry <= maxTries &&
(e.getCause() instanceof IOException ||
(e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
// Retryable
final long sleepMillis = Math.max(
baseSleepMillis,
(long) (baseSleepMillis * Math.pow(2, nTry) *
(1 + new Random().nextGaussian() * fuzziness))
);
log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis);
Thread.sleep(sleepMillis);
} else {
throw e;
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
private static class S3Coords private static class S3Coords
{ {
String bucket; String bucket;