diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java index 299633d2a17..d560ceca348 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java @@ -90,7 +90,7 @@ public class S3BlobContainer extends AbstractBlobContainer { } else { if (e instanceof AmazonS3Exception) { if (404 == ((AmazonS3Exception) e).getStatusCode()) { - throw new FileNotFoundException(e.getMessage()); + throw new FileNotFoundException("Blob object [" + blobName + "] not found: " + e.getMessage()); } } throw e; diff --git a/src/test/java/org/elasticsearch/cloud/aws/AbstractAwsTest.java b/src/test/java/org/elasticsearch/cloud/aws/AbstractAwsTest.java index 3ea6470cb58..537fa058e80 100644 --- a/src/test/java/org/elasticsearch/cloud/aws/AbstractAwsTest.java +++ b/src/test/java/org/elasticsearch/cloud/aws/AbstractAwsTest.java @@ -60,7 +60,8 @@ public abstract class AbstractAwsTest extends ElasticsearchIntegrationTest { .put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true) .put(AwsModule.S3_SERVICE_TYPE_KEY, TestAwsS3Service.class) .put("cloud.aws.test.random", randomInt()) - .put("cloud.aws.test.write_failures", 0.1); + .put("cloud.aws.test.write_failures", 0.1) + .put("cloud.aws.test.read_failures", 0.1); Environment environment = new Environment(); diff --git a/src/test/java/org/elasticsearch/cloud/aws/TestAmazonS3.java b/src/test/java/org/elasticsearch/cloud/aws/TestAmazonS3.java index a3c3e02d28c..cdf167be794 100644 --- a/src/test/java/org/elasticsearch/cloud/aws/TestAmazonS3.java +++ b/src/test/java/org/elasticsearch/cloud/aws/TestAmazonS3.java @@ -22,10 +22,10 @@ package org.elasticsearch.cloud.aws; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.*; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import java.io.IOException; @@ -44,7 +44,10 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble; */ public class TestAmazonS3 extends AmazonS3Wrapper { + protected final ESLogger logger = Loggers.getLogger(getClass()); + private double writeFailureRate = 0.0; + private double readFailureRate = 0.0; private String randomPrefix; @@ -65,6 +68,7 @@ public class TestAmazonS3 extends AmazonS3Wrapper { super(delegate); randomPrefix = componentSettings.get("test.random"); writeFailureRate = componentSettings.getAsDouble("test.write_failures", 0.0); + readFailureRate = componentSettings.getAsDouble("test.read_failures", 0.0); } @Override @@ -80,6 +84,7 @@ public class TestAmazonS3 extends AmazonS3Wrapper { throw new ElasticsearchException("cannot read input stream", ex); } } + logger.info("--> random write failure on putObject method: throwing an exception for [bucket={}, key={}]", bucketName, key); AmazonS3Exception ex = new AmazonS3Exception("Random S3 exception"); ex.setStatusCode(400); ex.setErrorCode("RequestTimeout"); @@ -89,6 +94,41 @@ public class TestAmazonS3 extends AmazonS3Wrapper { } } + @Override + public UploadPartResult uploadPart(UploadPartRequest request) throws AmazonClientException, AmazonServiceException { + if (shouldFail(request.getBucketName(), request.getKey(), writeFailureRate)) { + long length = request.getPartSize(); + long partToRead = (long) (length * randomDouble()); + byte[] buffer = new byte[1024]; + for (long cur = 0; cur < partToRead; cur += buffer.length) { + try (InputStream input = request.getInputStream()){ + input.read(buffer, 0, (int) (partToRead - cur > buffer.length ? buffer.length : partToRead - cur)); + } catch (IOException ex) { + throw new ElasticsearchException("cannot read input stream", ex); + } + } + logger.info("--> random write failure on uploadPart method: throwing an exception for [bucket={}, key={}]", request.getBucketName(), request.getKey()); + AmazonS3Exception ex = new AmazonS3Exception("Random S3 write exception"); + ex.setStatusCode(400); + ex.setErrorCode("RequestTimeout"); + throw ex; + } else { + return super.uploadPart(request); + } + } + + @Override + public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException { + if (shouldFail(bucketName, key, readFailureRate)) { + logger.info("--> random read failure on getObject method: throwing an exception for [bucket={}, key={}]", bucketName, key); + AmazonS3Exception ex = new AmazonS3Exception("Random S3 read exception"); + ex.setStatusCode(404); + throw ex; + } else { + return super.getObject(bucketName, key); + } + } + private boolean shouldFail(String bucketName, String key, double probability) { if (probability > 0.0) { String path = randomPrefix + "-" + bucketName + "+" + key; diff --git a/src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java b/src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java index 6372657d95d..8c4707b806c 100644 --- a/src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java +++ b/src/test/java/org/elasticsearch/cloud/aws/TestAwsS3Service.java @@ -49,6 +49,11 @@ public class TestAwsS3Service extends InternalAwsS3Service { return cachedWrapper(super.client(region, account, key)); } + @Override + public synchronized AmazonS3 client(String region, String account, String key, Integer maxRetries) { + return cachedWrapper(super.client(region, account, key, maxRetries)); + } + private AmazonS3 cachedWrapper(AmazonS3 client) { TestAmazonS3 wrapper = clients.get(client); if (wrapper == null) {