mirror of https://github.com/apache/druid.git
Move retryS3Operation to S3Utils
This commit is contained in:
parent
945bc9a370
commit
320f1fe840
|
@ -19,8 +19,10 @@
|
||||||
|
|
||||||
package com.metamx.druid.common.s3;
|
package com.metamx.druid.common.s3;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import org.jets3t.service.S3ServiceException;
|
import org.jets3t.service.S3ServiceException;
|
||||||
|
import org.jets3t.service.ServiceException;
|
||||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||||
import org.jets3t.service.model.S3Bucket;
|
import org.jets3t.service.model.S3Bucket;
|
||||||
import org.jets3t.service.model.S3Object;
|
import org.jets3t.service.model.S3Object;
|
||||||
|
@ -28,6 +30,8 @@ import org.jets3t.service.model.S3Object;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -80,4 +84,41 @@ public class S3Utils
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
|
||||||
|
* found, etc) are not retried.
|
||||||
|
*/
|
||||||
|
public static <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
|
||||||
|
{
|
||||||
|
int nTry = 0;
|
||||||
|
final int maxTries = 3;
|
||||||
|
final long baseSleepMillis = 1000;
|
||||||
|
final double fuzziness = 0.2;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
nTry++;
|
||||||
|
return f.call();
|
||||||
|
}
|
||||||
|
catch (ServiceException e) {
|
||||||
|
if (nTry <= maxTries &&
|
||||||
|
(e.getCause() instanceof IOException ||
|
||||||
|
(e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
|
||||||
|
// Retryable
|
||||||
|
final long sleepMillis = Math.max(
|
||||||
|
baseSleepMillis,
|
||||||
|
(long) (baseSleepMillis * Math.pow(2, nTry) *
|
||||||
|
(1 + new Random().nextGaussian() * fuzziness))
|
||||||
|
);
|
||||||
|
log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis);
|
||||||
|
Thread.sleep(sleepMillis);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
retryS3Operation(
|
S3Utils.retryS3Operation(
|
||||||
new Callable<Void>()
|
new Callable<Void>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -140,7 +140,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
|
||||||
private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
|
private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
return retryS3Operation(
|
return S3Utils.retryS3Operation(
|
||||||
new Callable<Boolean>()
|
new Callable<Boolean>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -164,7 +164,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
|
||||||
{
|
{
|
||||||
final S3Coords coords = new S3Coords(segment);
|
final S3Coords coords = new S3Coords(segment);
|
||||||
try {
|
try {
|
||||||
final StorageObject objDetails = retryS3Operation(
|
final StorageObject objDetails = S3Utils.retryS3Operation(
|
||||||
new Callable<StorageObject>()
|
new Callable<StorageObject>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -184,39 +184,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
|
|
||||||
{
|
|
||||||
int nTry = 0;
|
|
||||||
final int maxTries = 3;
|
|
||||||
final long baseSleepMillis = 1000;
|
|
||||||
final double fuzziness = 0.2;
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
nTry++;
|
|
||||||
return f.call();
|
|
||||||
}
|
|
||||||
catch (ServiceException e) {
|
|
||||||
if (nTry <= maxTries &&
|
|
||||||
(e.getCause() instanceof IOException ||
|
|
||||||
(e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
|
|
||||||
// Retryable
|
|
||||||
final long sleepMillis = Math.max(
|
|
||||||
baseSleepMillis,
|
|
||||||
(long) (baseSleepMillis * Math.pow(2, nTry) *
|
|
||||||
(1 + new Random().nextGaussian() * fuzziness))
|
|
||||||
);
|
|
||||||
log.info(e, "S3 fail on try %d/%d, retrying in %,dms.", nTry, maxTries, sleepMillis);
|
|
||||||
Thread.sleep(sleepMillis);
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class S3Coords
|
private static class S3Coords
|
||||||
{
|
{
|
||||||
String bucket;
|
String bucket;
|
||||||
|
|
Loading…
Reference in New Issue