mirror of https://github.com/apache/druid.git
AWS "Data read has a different length than the expected" error should reset stream and try again (#11941)
* Add support for custom reset condition & support for other args to have defaults to make the method api consistent * Add support for custom reset condition to InputEntity * Fix test names * Clarifying comments to why we need to read the message's content to identify S3's resettable exception * Add unit test to verify custom resettable condition for S3Entity * Provide a way to customize retries since they are expensive to test
This commit is contained in:
parent
9bc18a93a2
commit
8eff6334f7
|
@ -21,6 +21,7 @@ package org.apache.druid.data.input;
|
||||||
|
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.base.Predicates;
|
import com.google.common.base.Predicates;
|
||||||
|
import org.apache.druid.data.input.impl.RetryingInputStream;
|
||||||
import org.apache.druid.guice.annotations.UnstableApi;
|
import org.apache.druid.guice.annotations.UnstableApi;
|
||||||
import org.apache.druid.java.util.common.FileUtils;
|
import org.apache.druid.java.util.common.FileUtils;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
|
@ -128,4 +129,15 @@ public interface InputEntity
|
||||||
{
|
{
|
||||||
return Predicates.alwaysFalse();
|
return Predicates.alwaysFalse();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a reset condition that the caller should retry on.
|
||||||
|
* The returned condition should be used when reading data from this InputEntity such as in {@link #fetch}
|
||||||
|
* or {@link RetryingInputEntity}.
|
||||||
|
*/
|
||||||
|
default Predicate<Throwable> getResetCondition()
|
||||||
|
{
|
||||||
|
return RetryingInputStream.DEFAULT_RESET_CONDITION;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,11 +40,18 @@ public abstract class RetryingInputEntity implements InputEntity
|
||||||
this,
|
this,
|
||||||
new RetryingInputEntityOpenFunction(),
|
new RetryingInputEntityOpenFunction(),
|
||||||
getRetryCondition(),
|
getRetryCondition(),
|
||||||
RetryUtils.DEFAULT_MAX_TRIES
|
getResetCondition(),
|
||||||
|
getMaxRetries()
|
||||||
);
|
);
|
||||||
return CompressionUtils.decompress(retryingInputStream, getPath());
|
return CompressionUtils.decompress(retryingInputStream, getPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// override this in sub-classes to customize retries
|
||||||
|
protected int getMaxRetries()
|
||||||
|
{
|
||||||
|
return RetryUtils.DEFAULT_MAX_TRIES;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Directly opens an {@link InputStream} on the input entity. Decompression should be handled externally, and is
|
* Directly opens an {@link InputStream} on the input entity. Decompression should be handled externally, and is
|
||||||
* handled by the default implementation of {@link #open}, so this should return the raw stream for the object.
|
* handled by the default implementation of {@link #open}, so this should return the raw stream for the object.
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.druid.data.input.impl;
|
package org.apache.druid.data.input.impl;
|
||||||
|
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.base.Predicates;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.io.CountingInputStream;
|
import com.google.common.io.CountingInputStream;
|
||||||
import org.apache.druid.data.input.impl.prefetch.Fetcher;
|
import org.apache.druid.data.input.impl.prefetch.Fetcher;
|
||||||
|
@ -27,6 +28,7 @@ import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
|
||||||
import org.apache.druid.java.util.common.RetryUtils;
|
import org.apache.druid.java.util.common.RetryUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
|
@ -39,31 +41,49 @@ import java.net.SocketException;
|
||||||
*/
|
*/
|
||||||
public class RetryingInputStream<T> extends InputStream
|
public class RetryingInputStream<T> extends InputStream
|
||||||
{
|
{
|
||||||
|
|
||||||
|
public static final Predicate<Throwable> DEFAULT_RETRY_CONDITION = Predicates.alwaysFalse();
|
||||||
|
public static final Predicate<Throwable> DEFAULT_RESET_CONDITION = RetryingInputStream::isConnectionReset;
|
||||||
|
|
||||||
private static final Logger log = new Logger(RetryingInputStream.class);
|
private static final Logger log = new Logger(RetryingInputStream.class);
|
||||||
|
|
||||||
private final T object;
|
private final T object;
|
||||||
private final ObjectOpenFunction<T> objectOpenFunction;
|
private final ObjectOpenFunction<T> objectOpenFunction;
|
||||||
private final Predicate<Throwable> retryCondition;
|
private final Predicate<Throwable> retryCondition;
|
||||||
|
private final Predicate<Throwable> resetCondition;
|
||||||
private final int maxRetry;
|
private final int maxRetry;
|
||||||
|
|
||||||
private CountingInputStream delegate;
|
private CountingInputStream delegate;
|
||||||
private long startOffset;
|
private long startOffset;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param object The object entity to open
|
||||||
|
* @param objectOpenFunction How to open the object
|
||||||
|
* @param retryCondition A predicate on a throwable to indicate if stream should retry. This defaults to
|
||||||
|
* {@link IOException}, not retryable, when null is passed
|
||||||
|
* @param resetCondition A predicate on a throwable to indicate if stream should reset. This defaults to
|
||||||
|
* a generic reset test, see {@link #isConnectionReset(Throwable)} when null is passed
|
||||||
|
* @param maxRetry The maximum times to retry. Defaults to {@link RetryUtils#DEFAULT_MAX_TRIES} when null
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
public RetryingInputStream(
|
public RetryingInputStream(
|
||||||
T object,
|
T object,
|
||||||
ObjectOpenFunction<T> objectOpenFunction,
|
ObjectOpenFunction<T> objectOpenFunction,
|
||||||
Predicate<Throwable> retryCondition,
|
@Nullable Predicate<Throwable> retryCondition,
|
||||||
int maxRetry
|
@Nullable Predicate<Throwable> resetCondition,
|
||||||
|
@Nullable Integer maxRetry
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
this.object = object;
|
this.object = object;
|
||||||
this.objectOpenFunction = objectOpenFunction;
|
this.objectOpenFunction = objectOpenFunction;
|
||||||
this.retryCondition = retryCondition;
|
this.retryCondition = retryCondition == null ? DEFAULT_RETRY_CONDITION : retryCondition;
|
||||||
this.maxRetry = maxRetry;
|
this.resetCondition = resetCondition == null ? DEFAULT_RESET_CONDITION : resetCondition;
|
||||||
|
this.maxRetry = maxRetry == null ? RetryUtils.DEFAULT_MAX_TRIES : maxRetry;
|
||||||
this.delegate = new CountingInputStream(objectOpenFunction.open(object));
|
this.delegate = new CountingInputStream(objectOpenFunction.open(object));
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isConnectionReset(Throwable t)
|
private static boolean isConnectionReset(Throwable t)
|
||||||
{
|
{
|
||||||
return (t instanceof SocketException && (t.getMessage() != null && t.getMessage().contains("Connection reset"))) ||
|
return (t instanceof SocketException && (t.getMessage() != null && t.getMessage().contains("Connection reset"))) ||
|
||||||
(t.getCause() != null && isConnectionReset(t.getCause()));
|
(t.getCause() != null && isConnectionReset(t.getCause()));
|
||||||
|
@ -71,7 +91,7 @@ public class RetryingInputStream<T> extends InputStream
|
||||||
|
|
||||||
private void waitOrThrow(Throwable t, int nTry) throws IOException
|
private void waitOrThrow(Throwable t, int nTry) throws IOException
|
||||||
{
|
{
|
||||||
final boolean isConnectionReset = isConnectionReset(t);
|
final boolean isConnectionReset = resetCondition.apply(t);
|
||||||
if (isConnectionReset || retryCondition.apply(t)) {
|
if (isConnectionReset || retryCondition.apply(t)) {
|
||||||
if (isConnectionReset) {
|
if (isConnectionReset) {
|
||||||
// Re-open the input stream on connection reset
|
// Re-open the input stream on connection reset
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class FileFetcher<T> extends Fetcher<T>
|
||||||
{
|
{
|
||||||
return new OpenObject<>(
|
return new OpenObject<>(
|
||||||
object,
|
object,
|
||||||
new RetryingInputStream<>(object, openObjectFunction, retryCondition, getFetchConfig().getMaxFetchRetry()),
|
new RetryingInputStream<>(object, openObjectFunction, retryCondition, null, getFetchConfig().getMaxFetchRetry()),
|
||||||
getNoopCloser()
|
getNoopCloser()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import javax.annotation.Nonnull;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -43,32 +44,18 @@ import java.util.zip.GZIPOutputStream;
|
||||||
public class RetryingInputStreamTest
|
public class RetryingInputStreamTest
|
||||||
{
|
{
|
||||||
private static final int MAX_RETRY = 5;
|
private static final int MAX_RETRY = 5;
|
||||||
private static final int MAX_ERROR = 4;
|
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||||
|
|
||||||
private File testFile;
|
private File testFile;
|
||||||
private DataInputStream inputStream;
|
|
||||||
|
|
||||||
@Before
|
private boolean throwSocketException = false;
|
||||||
public void setup() throws IOException
|
private boolean throwCustomException = false;
|
||||||
{
|
private boolean throwIOException = false;
|
||||||
testFile = temporaryFolder.newFile();
|
|
||||||
|
|
||||||
try (FileOutputStream fis = new FileOutputStream(testFile);
|
|
||||||
GZIPOutputStream gis = new GZIPOutputStream(fis);
|
|
||||||
DataOutputStream dis = new DataOutputStream(gis)) {
|
|
||||||
for (int i = 0; i < 10000; i++) {
|
|
||||||
dis.writeInt(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throwError = false;
|
private final ObjectOpenFunction<File> objectOpenFunction = new ObjectOpenFunction<File>()
|
||||||
|
|
||||||
final InputStream retryingInputStream = new RetryingInputStream<>(
|
|
||||||
testFile,
|
|
||||||
new ObjectOpenFunction<File>()
|
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public InputStream open(File object) throws IOException
|
public InputStream open(File object) throws IOException
|
||||||
|
@ -83,33 +70,97 @@ public class RetryingInputStreamTest
|
||||||
Preconditions.checkState(fis.skip(start) == start);
|
Preconditions.checkState(fis.skip(start) == start);
|
||||||
return new TestInputStream(fis);
|
return new TestInputStream(fis);
|
||||||
}
|
}
|
||||||
},
|
};
|
||||||
e -> e instanceof IOException,
|
|
||||||
MAX_RETRY
|
|
||||||
);
|
|
||||||
|
|
||||||
inputStream = new DataInputStream(new GZIPInputStream(retryingInputStream));
|
@Before
|
||||||
|
public void setup() throws IOException
|
||||||
|
{
|
||||||
|
testFile = temporaryFolder.newFile();
|
||||||
|
|
||||||
throwError = true;
|
try (FileOutputStream fis = new FileOutputStream(testFile);
|
||||||
|
GZIPOutputStream gis = new GZIPOutputStream(fis);
|
||||||
|
DataOutputStream dis = new DataOutputStream(gis)) {
|
||||||
|
for (int i = 0; i < 10000; i++) {
|
||||||
|
dis.writeInt(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throwSocketException = false;
|
||||||
|
throwCustomException = false;
|
||||||
|
throwIOException = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void teardown() throws IOException
|
public void teardown() throws IOException
|
||||||
{
|
{
|
||||||
inputStream.close();
|
|
||||||
FileUtils.forceDelete(testFile);
|
FileUtils.forceDelete(testFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadRetry() throws IOException
|
@Test(expected = IOException.class)
|
||||||
|
public void testDefaultsReadThrows() throws IOException
|
||||||
{
|
{
|
||||||
|
throwIOException = true;
|
||||||
|
final InputStream retryingInputStream = new RetryingInputStream<>(
|
||||||
|
testFile,
|
||||||
|
objectOpenFunction,
|
||||||
|
null, // will not retry
|
||||||
|
null, // will enable reset using default logic
|
||||||
|
MAX_RETRY
|
||||||
|
);
|
||||||
|
retryHelper(retryingInputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomResetRead() throws IOException
|
||||||
|
{
|
||||||
|
throwCustomException = true;
|
||||||
|
final InputStream retryingInputStream = new RetryingInputStream<>(
|
||||||
|
testFile,
|
||||||
|
objectOpenFunction,
|
||||||
|
null, // retry will fail
|
||||||
|
t -> t instanceof CustomException, // but reset won't
|
||||||
|
MAX_RETRY
|
||||||
|
);
|
||||||
|
retryHelper(retryingInputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testCustomResetReadThrows() throws IOException
|
||||||
|
{
|
||||||
|
throwCustomException = true;
|
||||||
|
final InputStream retryingInputStream = new RetryingInputStream<>(
|
||||||
|
testFile,
|
||||||
|
objectOpenFunction,
|
||||||
|
null, // will not retry
|
||||||
|
null, // since there is no custom reset lambda it will fail when the custom exception is thrown
|
||||||
|
MAX_RETRY
|
||||||
|
);
|
||||||
|
retryHelper(retryingInputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIOExceptionNotRetriableRead() throws IOException
|
||||||
|
{
|
||||||
|
throwCustomException = true;
|
||||||
|
throwIOException = true;
|
||||||
|
final InputStream retryingInputStream = new RetryingInputStream<>(
|
||||||
|
testFile,
|
||||||
|
objectOpenFunction,
|
||||||
|
t -> t instanceof IOException, // retry will succeed
|
||||||
|
t -> t instanceof CustomException, // reset will also succeed
|
||||||
|
MAX_RETRY
|
||||||
|
);
|
||||||
|
retryHelper(retryingInputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void retryHelper(InputStream retryingInputStream) throws IOException
|
||||||
|
{
|
||||||
|
try (DataInputStream inputStream = new DataInputStream(new GZIPInputStream(retryingInputStream))) {
|
||||||
for (int i = 0; i < 10000; i++) {
|
for (int i = 0; i < 10000; i++) {
|
||||||
Assert.assertEquals(i, inputStream.readInt());
|
Assert.assertEquals(i, inputStream.readInt());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
private boolean throwError = true;
|
|
||||||
private int errorCount = 0;
|
|
||||||
|
|
||||||
private class TestInputStream extends InputStream
|
private class TestInputStream extends InputStream
|
||||||
{
|
{
|
||||||
|
@ -127,21 +178,30 @@ public class RetryingInputStreamTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int read(byte[] b, int off, int len) throws IOException
|
public int read(@Nonnull byte[] b, int off, int len) throws IOException
|
||||||
{
|
{
|
||||||
if (throwError) {
|
if (throwIOException) {
|
||||||
throwError = false;
|
throwIOException = false;
|
||||||
errorCount++;
|
|
||||||
if (errorCount % 2 == 0) {
|
|
||||||
throw new IOException("test retry");
|
throw new IOException("test retry");
|
||||||
} else {
|
} else if (throwCustomException) {
|
||||||
|
throwCustomException = false;
|
||||||
|
RuntimeException e = new RuntimeException();
|
||||||
|
throw new CustomException("I am a custom ResettableException", e);
|
||||||
|
} else if (throwSocketException) {
|
||||||
|
throwSocketException = false;
|
||||||
delegate.close();
|
delegate.close();
|
||||||
throw new SocketException("Test Connection reset");
|
throw new SocketException("Test Connection reset");
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
throwError = errorCount < MAX_ERROR;
|
|
||||||
return delegate.read(b, off, len);
|
return delegate.read(b, off, len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class CustomException extends RuntimeException
|
||||||
|
{
|
||||||
|
public CustomException(String err, Throwable t)
|
||||||
|
{
|
||||||
|
super(err, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,16 @@
|
||||||
|
|
||||||
package org.apache.druid.data.input.s3;
|
package org.apache.druid.data.input.s3;
|
||||||
|
|
||||||
|
import com.amazonaws.SdkClientException;
|
||||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||||
import com.amazonaws.services.s3.model.S3Object;
|
import com.amazonaws.services.s3.model.S3Object;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import org.apache.druid.data.input.RetryingInputEntity;
|
import org.apache.druid.data.input.RetryingInputEntity;
|
||||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.java.util.common.RetryUtils;
|
||||||
import org.apache.druid.storage.s3.S3StorageDruidModule;
|
import org.apache.druid.storage.s3.S3StorageDruidModule;
|
||||||
import org.apache.druid.storage.s3.S3Utils;
|
import org.apache.druid.storage.s3.S3Utils;
|
||||||
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
|
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
|
||||||
|
@ -38,11 +41,29 @@ public class S3Entity extends RetryingInputEntity
|
||||||
{
|
{
|
||||||
private final ServerSideEncryptingAmazonS3 s3Client;
|
private final ServerSideEncryptingAmazonS3 s3Client;
|
||||||
private final CloudObjectLocation object;
|
private final CloudObjectLocation object;
|
||||||
|
private final int maxRetries;
|
||||||
|
|
||||||
S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords)
|
S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords)
|
||||||
{
|
{
|
||||||
this.s3Client = s3Client;
|
this.s3Client = s3Client;
|
||||||
this.object = coords;
|
this.object = coords;
|
||||||
|
this.maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
|
||||||
|
}
|
||||||
|
|
||||||
|
// this was added for testing but it might be useful in other cases (you can
|
||||||
|
// configure maxRetries...
|
||||||
|
S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords, int maxRetries)
|
||||||
|
{
|
||||||
|
Preconditions.checkArgument(maxRetries >= 0);
|
||||||
|
this.s3Client = s3Client;
|
||||||
|
this.object = coords;
|
||||||
|
this.maxRetries = maxRetries;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getMaxRetries()
|
||||||
|
{
|
||||||
|
return maxRetries;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -84,4 +105,16 @@ public class S3Entity extends RetryingInputEntity
|
||||||
{
|
{
|
||||||
return S3Utils.S3RETRY;
|
return S3Utils.S3RETRY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Predicate<Throwable> getResetCondition()
|
||||||
|
{
|
||||||
|
// SdkClientException can be thrown for many reasons and the only way to
|
||||||
|
// distinguish it is to look at the message, this is not ideal since the
|
||||||
|
// message may change so it may need to be adjusted in the future
|
||||||
|
return t -> super.getResetCondition().apply(t) ||
|
||||||
|
(t instanceof SdkClientException &&
|
||||||
|
t.getMessage().contains("Data read has a different length than the expected"));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.druid.data.input.SplitHintSpec;
|
||||||
import org.apache.druid.data.input.impl.CloudObjectInputSource;
|
import org.apache.druid.data.input.impl.CloudObjectInputSource;
|
||||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||||
import org.apache.druid.data.input.impl.SplittableInputSource;
|
import org.apache.druid.data.input.impl.SplittableInputSource;
|
||||||
|
import org.apache.druid.java.util.common.RetryUtils;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.storage.s3.S3InputDataConfig;
|
import org.apache.druid.storage.s3.S3InputDataConfig;
|
||||||
import org.apache.druid.storage.s3.S3StorageDruidModule;
|
import org.apache.druid.storage.s3.S3StorageDruidModule;
|
||||||
|
@ -67,6 +68,7 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
private final S3InputSourceConfig s3InputSourceConfig;
|
private final S3InputSourceConfig s3InputSourceConfig;
|
||||||
private final S3InputDataConfig inputDataConfig;
|
private final S3InputDataConfig inputDataConfig;
|
||||||
private final AWSCredentialsProvider awsCredentialsProvider;
|
private final AWSCredentialsProvider awsCredentialsProvider;
|
||||||
|
private int maxRetries;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor for S3InputSource
|
* Constructor for S3InputSource
|
||||||
|
@ -124,6 +126,7 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
this.maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
|
||||||
this.awsCredentialsProvider = awsCredentialsProvider;
|
this.awsCredentialsProvider = awsCredentialsProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,6 +144,22 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
|
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public S3InputSource(
|
||||||
|
ServerSideEncryptingAmazonS3 s3Client,
|
||||||
|
ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
|
||||||
|
S3InputDataConfig inputDataConfig,
|
||||||
|
List<URI> uris,
|
||||||
|
List<URI> prefixes,
|
||||||
|
List<CloudObjectLocation> objects,
|
||||||
|
S3InputSourceConfig s3InputSourceConfig,
|
||||||
|
int maxRetries
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
|
||||||
|
this.maxRetries = maxRetries;
|
||||||
|
}
|
||||||
|
|
||||||
private void applyAssumeRole(
|
private void applyAssumeRole(
|
||||||
ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
|
ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
|
||||||
S3InputSourceConfig s3InputSourceConfig,
|
S3InputSourceConfig s3InputSourceConfig,
|
||||||
|
@ -186,7 +205,7 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
@Override
|
@Override
|
||||||
protected InputEntity createEntity(CloudObjectLocation location)
|
protected InputEntity createEntity(CloudObjectLocation location)
|
||||||
{
|
{
|
||||||
return new S3Entity(s3ClientSupplier.get(), location);
|
return new S3Entity(s3ClientSupplier.get(), location, maxRetries);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -254,6 +273,10 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
|
|
||||||
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
|
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
|
||||||
{
|
{
|
||||||
return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(), getPrefixes(), inputDataConfig.getMaxListingLength());
|
return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(),
|
||||||
|
getPrefixes(),
|
||||||
|
inputDataConfig.getMaxListingLength(),
|
||||||
|
maxRetries
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,9 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.druid.java.util.common.RE;
|
import org.apache.druid.java.util.common.RE;
|
||||||
|
import org.apache.druid.java.util.common.RetryUtils;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -45,6 +47,8 @@ public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
|
||||||
private ListObjectsV2Result result;
|
private ListObjectsV2Result result;
|
||||||
private Iterator<S3ObjectSummary> objectSummaryIterator;
|
private Iterator<S3ObjectSummary> objectSummaryIterator;
|
||||||
private S3ObjectSummary currentObjectSummary;
|
private S3ObjectSummary currentObjectSummary;
|
||||||
|
private int maxRetries; // this is made available for testing mostly
|
||||||
|
|
||||||
|
|
||||||
ObjectSummaryIterator(
|
ObjectSummaryIterator(
|
||||||
final ServerSideEncryptingAmazonS3 s3Client,
|
final ServerSideEncryptingAmazonS3 s3Client,
|
||||||
|
@ -55,7 +59,32 @@ public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
|
||||||
this.s3Client = s3Client;
|
this.s3Client = s3Client;
|
||||||
this.prefixesIterator = prefixes.iterator();
|
this.prefixesIterator = prefixes.iterator();
|
||||||
this.maxListingLength = maxListingLength;
|
this.maxListingLength = maxListingLength;
|
||||||
|
maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
|
||||||
|
|
||||||
|
constructorPostProcessing();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ObjectSummaryIterator(
|
||||||
|
final ServerSideEncryptingAmazonS3 s3Client,
|
||||||
|
final Iterable<URI> prefixes,
|
||||||
|
final int maxListingLength,
|
||||||
|
final int maxRetries
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.s3Client = s3Client;
|
||||||
|
this.prefixesIterator = prefixes.iterator();
|
||||||
|
this.maxListingLength = maxListingLength;
|
||||||
|
this.maxRetries = maxRetries;
|
||||||
|
|
||||||
|
constructorPostProcessing();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper to factor out stuff that happens in constructor after members are set
|
||||||
|
private void constructorPostProcessing()
|
||||||
|
{
|
||||||
prepareNextRequest();
|
prepareNextRequest();
|
||||||
fetchNextBatch();
|
fetchNextBatch();
|
||||||
advanceObjectSummary();
|
advanceObjectSummary();
|
||||||
|
@ -94,7 +123,7 @@ public class ObjectSummaryIterator implements Iterator<S3ObjectSummary>
|
||||||
private void fetchNextBatch()
|
private void fetchNextBatch()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request));
|
result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request), maxRetries);
|
||||||
request.setContinuationToken(result.getNextContinuationToken());
|
request.setContinuationToken(result.getNextContinuationToken());
|
||||||
objectSummaryIterator = result.getObjectSummaries().iterator();
|
objectSummaryIterator = result.getObjectSummaries().iterator();
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,15 @@ public class S3Utils
|
||||||
return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES);
|
return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
|
||||||
|
* found, etc) are not retried. Also provide a way to set maxRetries that can be useful, i.e. for testing.
|
||||||
|
*/
|
||||||
|
static <T> T retryS3Operation(Task<T> f, int maxRetries) throws Exception
|
||||||
|
{
|
||||||
|
return RetryUtils.retry(f, S3RETRY, maxRetries);
|
||||||
|
}
|
||||||
|
|
||||||
static boolean isObjectInBucketIgnoringPermission(
|
static boolean isObjectInBucketIgnoringPermission(
|
||||||
ServerSideEncryptingAmazonS3 s3Client,
|
ServerSideEncryptingAmazonS3 s3Client,
|
||||||
String bucketName,
|
String bucketName,
|
||||||
|
@ -120,6 +129,25 @@ public class S3Utils
|
||||||
return new ObjectSummaryIterator(s3Client, prefixes, maxListingLength);
|
return new ObjectSummaryIterator(s3Client, prefixes, maxListingLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an iterator over a set of S3 objects specified by a set of prefixes.
|
||||||
|
*
|
||||||
|
* For each provided prefix URI, the iterator will walk through all objects that are in the same bucket as the
|
||||||
|
* provided URI and whose keys start with that URI's path, except for directory placeholders (which will be
|
||||||
|
* ignored). The iterator is computed incrementally by calling {@link ServerSideEncryptingAmazonS3#listObjectsV2} for
|
||||||
|
* each prefix in batches of {@param maxListLength}. The first call is made at the same time the iterator is
|
||||||
|
* constructed.
|
||||||
|
*/
|
||||||
|
public static Iterator<S3ObjectSummary> objectSummaryIterator(
|
||||||
|
final ServerSideEncryptingAmazonS3 s3Client,
|
||||||
|
final Iterable<URI> prefixes,
|
||||||
|
final int maxListingLength,
|
||||||
|
final int maxRetries
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return new ObjectSummaryIterator(s3Client, prefixes, maxListingLength, maxRetries);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below.
|
* Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below.
|
||||||
*
|
*
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.data.input.s3;
|
package org.apache.druid.data.input.s3;
|
||||||
|
|
||||||
|
import com.amazonaws.SdkClientException;
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
import com.amazonaws.services.s3.AmazonS3;
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
import com.amazonaws.services.s3.AmazonS3Client;
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
|
@ -28,6 +29,7 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||||
import com.amazonaws.services.s3.model.S3Object;
|
import com.amazonaws.services.s3.model.S3Object;
|
||||||
|
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||||
import com.fasterxml.jackson.core.JsonParser;
|
import com.fasterxml.jackson.core.JsonParser;
|
||||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||||
|
@ -87,6 +89,8 @@ import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.easymock.EasyMock.expectLastCall;
|
||||||
|
|
||||||
public class S3InputSourceTest extends InitializedNullHandlingTest
|
public class S3InputSourceTest extends InitializedNullHandlingTest
|
||||||
{
|
{
|
||||||
private static final ObjectMapper MAPPER = createS3ObjectMapper();
|
private static final ObjectMapper MAPPER = createS3ObjectMapper();
|
||||||
|
@ -535,6 +539,49 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
||||||
EasyMock.verify(S3_CLIENT);
|
EasyMock.verify(S3_CLIENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = SdkClientException.class)
|
||||||
|
public void testReaderRetriesOnSdkClientExceptionButNeverSucceedsThenThrows() throws Exception
|
||||||
|
{
|
||||||
|
EasyMock.reset(S3_CLIENT);
|
||||||
|
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
|
||||||
|
expectSdkClientException(EXPECTED_URIS.get(0));
|
||||||
|
EasyMock.replay(S3_CLIENT);
|
||||||
|
|
||||||
|
S3InputSource inputSource = new S3InputSource(
|
||||||
|
SERVICE,
|
||||||
|
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
|
||||||
|
INPUT_DATA_CONFIG,
|
||||||
|
null,
|
||||||
|
ImmutableList.of(PREFIXES.get(0)),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
3 // only have three retries since they are slow
|
||||||
|
);
|
||||||
|
|
||||||
|
InputRowSchema someSchema = new InputRowSchema(
|
||||||
|
new TimestampSpec("time", "auto", null),
|
||||||
|
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
|
||||||
|
ColumnsFilter.all()
|
||||||
|
);
|
||||||
|
|
||||||
|
InputSourceReader reader = inputSource.reader(
|
||||||
|
someSchema,
|
||||||
|
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
|
||||||
|
temporaryFolder.newFolder()
|
||||||
|
);
|
||||||
|
|
||||||
|
CloseableIterator<InputRow> iterator = reader.read();
|
||||||
|
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
InputRow nextRow = iterator.next();
|
||||||
|
Assert.assertEquals(NOW, nextRow.getTimestamp());
|
||||||
|
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
|
||||||
|
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
EasyMock.verify(S3_CLIENT);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCompressedReader() throws IOException
|
public void testCompressedReader() throws IOException
|
||||||
{
|
{
|
||||||
|
@ -620,6 +667,31 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
||||||
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
|
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Setup mocks for invoquing the resettable condition for the S3Entity:
|
||||||
|
private static void expectSdkClientException(URI uri) throws IOException
|
||||||
|
{
|
||||||
|
final String s3Bucket = uri.getAuthority();
|
||||||
|
final String key = S3Utils.extractS3Key(uri);
|
||||||
|
|
||||||
|
S3ObjectInputStream someInputStream = EasyMock.createMock(S3ObjectInputStream.class);
|
||||||
|
EasyMock.expect(someInputStream.read(EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
|
||||||
|
.andThrow(new SdkClientException("Data read has a different length than the expected")).anyTimes();
|
||||||
|
someInputStream.close();
|
||||||
|
expectLastCall().andVoid().anyTimes();
|
||||||
|
|
||||||
|
S3Object someObject = EasyMock.createMock(S3Object.class);
|
||||||
|
EasyMock.expect(someObject.getBucketName()).andReturn(s3Bucket).anyTimes();
|
||||||
|
EasyMock.expect(someObject.getKey()).andReturn(key).anyTimes();
|
||||||
|
EasyMock.expect(someObject.getObjectContent()).andReturn(someInputStream).anyTimes();
|
||||||
|
|
||||||
|
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).anyTimes();
|
||||||
|
|
||||||
|
EasyMock.replay(someObject);
|
||||||
|
EasyMock.replay(someInputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static void expectGetObjectCompressed(URI uri) throws IOException
|
private static void expectGetObjectCompressed(URI uri) throws IOException
|
||||||
{
|
{
|
||||||
final String s3Bucket = uri.getAuthority();
|
final String s3Bucket = uri.getAuthority();
|
||||||
|
|
Loading…
Reference in New Issue