Add random read failures in unit tests to test the retry logic added to S3BlobContainer.openInput()
This commit is contained in:
parent
ea91adf6a1
commit
e757722291
|
@ -90,7 +90,7 @@ public class S3BlobContainer extends AbstractBlobContainer {
|
|||
} else {
|
||||
if (e instanceof AmazonS3Exception) {
|
||||
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
|
||||
throw new FileNotFoundException(e.getMessage());
|
||||
throw new FileNotFoundException("Blob object [" + blobName + "] not found: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
|
|
|
@ -60,7 +60,8 @@ public abstract class AbstractAwsTest extends ElasticsearchIntegrationTest {
|
|||
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true)
|
||||
.put(AwsModule.S3_SERVICE_TYPE_KEY, TestAwsS3Service.class)
|
||||
.put("cloud.aws.test.random", randomInt())
|
||||
.put("cloud.aws.test.write_failures", 0.1);
|
||||
.put("cloud.aws.test.write_failures", 0.1)
|
||||
.put("cloud.aws.test.read_failures", 0.1);
|
||||
|
||||
Environment environment = new Environment();
|
||||
|
||||
|
|
|
@ -22,10 +22,10 @@ package org.elasticsearch.cloud.aws;
|
|||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
import com.amazonaws.services.s3.model.*;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -44,7 +44,10 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
|
|||
*/
|
||||
public class TestAmazonS3 extends AmazonS3Wrapper {
|
||||
|
||||
protected final ESLogger logger = Loggers.getLogger(getClass());
|
||||
|
||||
private double writeFailureRate = 0.0;
|
||||
private double readFailureRate = 0.0;
|
||||
|
||||
private String randomPrefix;
|
||||
|
||||
|
@ -65,6 +68,7 @@ public class TestAmazonS3 extends AmazonS3Wrapper {
|
|||
super(delegate);
|
||||
randomPrefix = componentSettings.get("test.random");
|
||||
writeFailureRate = componentSettings.getAsDouble("test.write_failures", 0.0);
|
||||
readFailureRate = componentSettings.getAsDouble("test.read_failures", 0.0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,6 +84,7 @@ public class TestAmazonS3 extends AmazonS3Wrapper {
|
|||
throw new ElasticsearchException("cannot read input stream", ex);
|
||||
}
|
||||
}
|
||||
logger.info("--> random write failure on putObject method: throwing an exception for [bucket={}, key={}]", bucketName, key);
|
||||
AmazonS3Exception ex = new AmazonS3Exception("Random S3 exception");
|
||||
ex.setStatusCode(400);
|
||||
ex.setErrorCode("RequestTimeout");
|
||||
|
@ -89,6 +94,41 @@ public class TestAmazonS3 extends AmazonS3Wrapper {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UploadPartResult uploadPart(UploadPartRequest request) throws AmazonClientException, AmazonServiceException {
|
||||
if (shouldFail(request.getBucketName(), request.getKey(), writeFailureRate)) {
|
||||
long length = request.getPartSize();
|
||||
long partToRead = (long) (length * randomDouble());
|
||||
byte[] buffer = new byte[1024];
|
||||
for (long cur = 0; cur < partToRead; cur += buffer.length) {
|
||||
try (InputStream input = request.getInputStream()){
|
||||
input.read(buffer, 0, (int) (partToRead - cur > buffer.length ? buffer.length : partToRead - cur));
|
||||
} catch (IOException ex) {
|
||||
throw new ElasticsearchException("cannot read input stream", ex);
|
||||
}
|
||||
}
|
||||
logger.info("--> random write failure on uploadPart method: throwing an exception for [bucket={}, key={}]", request.getBucketName(), request.getKey());
|
||||
AmazonS3Exception ex = new AmazonS3Exception("Random S3 write exception");
|
||||
ex.setStatusCode(400);
|
||||
ex.setErrorCode("RequestTimeout");
|
||||
throw ex;
|
||||
} else {
|
||||
return super.uploadPart(request);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
|
||||
if (shouldFail(bucketName, key, readFailureRate)) {
|
||||
logger.info("--> random read failure on getObject method: throwing an exception for [bucket={}, key={}]", bucketName, key);
|
||||
AmazonS3Exception ex = new AmazonS3Exception("Random S3 read exception");
|
||||
ex.setStatusCode(404);
|
||||
throw ex;
|
||||
} else {
|
||||
return super.getObject(bucketName, key);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldFail(String bucketName, String key, double probability) {
|
||||
if (probability > 0.0) {
|
||||
String path = randomPrefix + "-" + bucketName + "+" + key;
|
||||
|
|
|
@ -49,6 +49,11 @@ public class TestAwsS3Service extends InternalAwsS3Service {
|
|||
return cachedWrapper(super.client(region, account, key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized AmazonS3 client(String region, String account, String key, Integer maxRetries) {
|
||||
return cachedWrapper(super.client(region, account, key, maxRetries));
|
||||
}
|
||||
|
||||
private AmazonS3 cachedWrapper(AmazonS3 client) {
|
||||
TestAmazonS3 wrapper = clients.get(client);
|
||||
if (wrapper == null) {
|
||||
|
|
Loading…
Reference in New Issue