Merge branch 'master' into feature/per-repo-endpoint

Conflicts:
	src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java
	src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java
	src/main/java/org/elasticsearch/repositories/s3/S3Repository.java
This commit is contained in:
Bruno Renié 2014-11-21 11:00:01 +01:00
commit 2203f439e2
12 changed files with 159 additions and 56 deletions

View File

@ -7,7 +7,7 @@ for the unicast discovery mechanism and add S3 repositories.
In order to install the plugin, run:
```sh
bin/plugin -install elasticsearch/elasticsearch-cloud-aws/2.4.0
bin/plugin install elasticsearch/elasticsearch-cloud-aws/2.4.0
```
You need to install a version matching your Elasticsearch version:

17
pom.xml
View File

@ -33,7 +33,8 @@
<properties>
<elasticsearch.version>2.0.0-SNAPSHOT</elasticsearch.version>
<lucene.version>4.10.1</lucene.version>
<lucene.version>5.0.0</lucene.version>
<lucene.maven.version>5.0.0-snapshot-1637347</lucene.maven.version>
<amazonaws.version>1.7.13</amazonaws.version>
<tests.output>onerror</tests.output>
<tests.shuffle>true</tests.shuffle>
@ -44,6 +45,10 @@
</properties>
<repositories>
<repository>
<id>Lucene snapshots</id>
<url>https://download.elasticsearch.org/lucenesnapshots/1637347/</url>
</repository>
<repository>
<id>sonatype</id>
<url>http://oss.sonatype.org/content/repositories/releases/</url>
@ -63,10 +68,16 @@
<version>1.3.RC2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.carrotsearch.randomizedtesting</groupId>
<artifactId>randomizedtesting-runner</artifactId>
<version>2.1.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-test-framework</artifactId>
<version>${lucene.version}</version>
<version>${lucene.maven.version}</version>
<scope>test</scope>
</dependency>
@ -79,7 +90,7 @@
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${lucene.version}</version>
<version>${lucene.maven.version}</version>
<scope>provided</scope>
</dependency>

View File

@ -29,4 +29,6 @@ public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
AmazonS3 client();
AmazonS3 client(String endpoint, String region, String account, String key);
AmazonS3 client(String endpoint, String region, String account, String key, Integer maxRetries);
}

View File

@ -60,14 +60,20 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
return getClient(endpoint, account, key);
return getClient(endpoint, account, key, null);
}
@Override
public synchronized AmazonS3 client(String endpoint, String region, String account, String key) {
public AmazonS3 client(String endpoint, String region, String account, String key) {
return client(endpoint, region, account, key, null);
}
@Override
public synchronized AmazonS3 client(String endpoint, String region, String account, String key, Integer maxRetries) {
if (endpoint == null) {
endpoint = getDefaultEndpoint();
}
if (region != null) {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
@ -77,11 +83,11 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
key = componentSettings.get("secret_key", settings.get("cloud.key"));
}
return getClient(endpoint, account, key);
return getClient(endpoint, account, key, maxRetries);
}
private synchronized AmazonS3 getClient(String endpoint, String account, String key) {
private synchronized AmazonS3 getClient(String endpoint, String account, String key, Integer maxRetries) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
@ -111,6 +117,11 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
}
if (maxRetries != null) {
// If not explicitly set, default to 3 with exponential backoff policy
clientConfiguration.setMaxErrorRetry(maxRetries);
}
AWSCredentialsProvider credentials;
if (account == null && key == null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.*;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -96,12 +97,12 @@ public class DefaultS3OutputStream extends S3OutputStream {
private void upload(byte[] bytes, int off, int len) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry < getNumberOfRetries()) {
while (retry <= getNumberOfRetries()) {
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
break;
} catch (AmazonS3Exception e) {
if (shouldRetry(e)) {
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
@ -123,11 +124,20 @@ public class DefaultS3OutputStream extends S3OutputStream {
}
private void initializeMultipart() {
if (multipartId == null) {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
int retry = 0;
while ((retry <= getNumberOfRetries()) && (multipartId == null)) {
try {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
}
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
throw e;
}
}
}
}
@ -145,14 +155,14 @@ public class DefaultS3OutputStream extends S3OutputStream {
private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry < getNumberOfRetries()) {
while (retry <= getNumberOfRetries()) {
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
multiparts.add(partETag);
multipartChunks++;
return;
} catch (AmazonS3Exception e) {
if (shouldRetry(e) && retry < getNumberOfRetries()) {
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
@ -182,13 +192,13 @@ public class DefaultS3OutputStream extends S3OutputStream {
private void completeMultipart() {
int retry = 0;
while (retry < getNumberOfRetries()) {
while (retry <= getNumberOfRetries()) {
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
return;
} catch (AmazonS3Exception e) {
if (shouldRetry(e) && retry < getNumberOfRetries()) {
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
abortMultipart();
@ -218,8 +228,4 @@ public class DefaultS3OutputStream extends S3OutputStream {
throws AmazonS3Exception {
blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId));
}
protected boolean shouldRetry(AmazonS3Exception e) {
return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode());
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
@ -68,27 +69,40 @@ public class S3BlobContainer extends AbstractBlobContainer {
}
@Override
public boolean deleteBlob(String blobName) throws IOException {
blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
return true;
}
@Override
public InputStream openInput(String blobName) throws IOException {
public void deleteBlob(String blobName) throws IOException {
try {
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
return s3Object.getObjectContent();
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
throw new FileNotFoundException(e.getMessage());
}
throw e;
blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
} catch (AmazonClientException e) {
throw new IOException("Exception when deleting blob [" + blobName + "]", e);
}
}
@Override
public InputStream openInput(String blobName) throws IOException {
int retry = 0;
while (retry <= blobStore.numberOfRetries()) {
try {
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) {
retry++;
} else {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new FileNotFoundException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
}
}
}
throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() +"]");
}
@Override
public OutputStream createOutput(final String blobName) throws IOException {
// UploadS3OutputStream does buffering internally
// UploadS3OutputStream does buffering & retry logic internally
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
}

View File

@ -19,7 +19,9 @@
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
@ -55,12 +57,8 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
private final int numberOfRetries;
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, String region, boolean serverSideEncryption) {
this(settings, client, bucket, region, serverSideEncryption, null);
}
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, ByteSizeValue bufferSize) {
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption,
ByteSizeValue bufferSize, int maxRetries) {
super(settings);
this.client = client;
this.bucket = bucket;
@ -72,7 +70,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
throw new BlobStoreException("\"Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
}
this.numberOfRetries = settings.getAsInt("max_retries", 3);
this.numberOfRetries = maxRetries;
if (!client.doesBucketExist(bucket)) {
if (region != null) {
client.createBucket(bucket, region);
@ -152,6 +150,16 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
}
}
protected boolean shouldRetry(AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
AmazonS3Exception s3e = (AmazonS3Exception)e;
if (s3e.getStatusCode() == 400 && "RequestTimeout".equals(s3e.getErrorCode())) {
return true;
}
}
return e.isRetryable();
}
@Override
public void close() {
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -45,9 +46,9 @@ public class Ec2Discovery extends ZenDiscovery {
public Ec2Discovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, NodeSettingsService nodeSettingsService, ZenPingService pingService,
DiscoveryNodeService discoveryNodeService, AwsEc2Service ec2Service, DiscoverySettings discoverySettings,
ElectMasterService electMasterService) {
ElectMasterService electMasterService, DynamicSettings dynamicSettings) {
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService,
discoveryNodeService, pingService, electMasterService, discoverySettings);
discoveryNodeService, pingService, electMasterService, discoverySettings, dynamicSettings);
if (settings.getAsBoolean("cloud.enabled", true)) {
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
UnicastZenPing unicastZenPing = null;

View File

@ -122,10 +122,14 @@ public class S3Repository extends BlobStoreRepository {
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false));
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null));
logger.debug("using bucket [{}], region [{}], endpoint [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}]", bucket, region, endpoint, chunkSize, serverSideEncryption, bufferSize);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, serverSideEncryption, bufferSize);
Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", componentSettings.getAsInt("max_retries", 3));
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
logger.debug("using bucket [{}], region [{}], endpoint [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]",
bucket, region, endpoint, chunkSize, serverSideEncryption, bufferSize, maxRetries);
blobStore = new S3BlobStore(settings, s3Service.client(endpoint, region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
String basePath = repositorySettings.settings().get("base_path", null);
if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath();

View File

@ -60,7 +60,8 @@ public abstract class AbstractAwsTest extends ElasticsearchIntegrationTest {
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true)
.put(AwsModule.S3_SERVICE_TYPE_KEY, TestAwsS3Service.class)
.put("cloud.aws.test.random", randomInt())
.put("cloud.aws.test.write_failures", 0.1);
.put("cloud.aws.test.write_failures", 0.1)
.put("cloud.aws.test.read_failures", 0.1);
Environment environment = new Environment();

View File

@ -22,10 +22,10 @@ package org.elasticsearch.cloud.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.*;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
@ -44,7 +44,10 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
*/
public class TestAmazonS3 extends AmazonS3Wrapper {
protected final ESLogger logger = Loggers.getLogger(getClass());
private double writeFailureRate = 0.0;
private double readFailureRate = 0.0;
private String randomPrefix;
@ -65,6 +68,7 @@ public class TestAmazonS3 extends AmazonS3Wrapper {
super(delegate);
randomPrefix = componentSettings.get("test.random");
writeFailureRate = componentSettings.getAsDouble("test.write_failures", 0.0);
readFailureRate = componentSettings.getAsDouble("test.read_failures", 0.0);
}
@Override
@ -80,6 +84,7 @@ public class TestAmazonS3 extends AmazonS3Wrapper {
throw new ElasticsearchException("cannot read input stream", ex);
}
}
logger.info("--> random write failure on putObject method: throwing an exception for [bucket={}, key={}]", bucketName, key);
AmazonS3Exception ex = new AmazonS3Exception("Random S3 exception");
ex.setStatusCode(400);
ex.setErrorCode("RequestTimeout");
@ -89,6 +94,41 @@ public class TestAmazonS3 extends AmazonS3Wrapper {
}
}
@Override
public UploadPartResult uploadPart(UploadPartRequest request) throws AmazonClientException, AmazonServiceException {
if (shouldFail(request.getBucketName(), request.getKey(), writeFailureRate)) {
long length = request.getPartSize();
long partToRead = (long) (length * randomDouble());
byte[] buffer = new byte[1024];
for (long cur = 0; cur < partToRead; cur += buffer.length) {
try (InputStream input = request.getInputStream()){
input.read(buffer, 0, (int) (partToRead - cur > buffer.length ? buffer.length : partToRead - cur));
} catch (IOException ex) {
throw new ElasticsearchException("cannot read input stream", ex);
}
}
logger.info("--> random write failure on uploadPart method: throwing an exception for [bucket={}, key={}]", request.getBucketName(), request.getKey());
AmazonS3Exception ex = new AmazonS3Exception("Random S3 write exception");
ex.setStatusCode(400);
ex.setErrorCode("RequestTimeout");
throw ex;
} else {
return super.uploadPart(request);
}
}
@Override
public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
if (shouldFail(bucketName, key, readFailureRate)) {
logger.info("--> random read failure on getObject method: throwing an exception for [bucket={}, key={}]", bucketName, key);
AmazonS3Exception ex = new AmazonS3Exception("Random S3 read exception");
ex.setStatusCode(404);
throw ex;
} else {
return super.getObject(bucketName, key);
}
}
private boolean shouldFail(String bucketName, String key, double probability) {
if (probability > 0.0) {
String path = randomPrefix + "-" + bucketName + "+" + key;

View File

@ -49,6 +49,11 @@ public class TestAwsS3Service extends InternalAwsS3Service {
return cachedWrapper(super.client(endpoint, region, account, key));
}
@Override
public synchronized AmazonS3 client(String region, String account, String key, Integer maxRetries) {
return cachedWrapper(super.client(region, account, key, maxRetries));
}
private AmazonS3 cachedWrapper(AmazonS3 client) {
TestAmazonS3 wrapper = clients.get(client);
if (wrapper == null) {