Added retry mechanism for S3 connection errors

Closes #95
This commit is contained in:
Igor Motov 2014-06-19 16:01:14 -04:00 committed by David Pilato
parent ba185e026b
commit 11f4e9c063
12 changed files with 1012 additions and 191 deletions

View File

@ -115,6 +115,7 @@ The following settings are supported:
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`.
* `server_side_encryption`: When set to `true` files are encrypted on server side using AES256 algorithm. Defaults to `false`.
* `max_retries`: Number of retries in case of S3 errors. Defaults to `3`.
The S3 repositories are using the same credentials as the rest of the AWS services provided by this plugin (`discovery`).
See [Generic Configuration](#generic-configuration) for details.

View File

@ -20,15 +20,29 @@
package org.elasticsearch.cloud.aws;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
*
*/
public class AwsModule extends AbstractModule {
private final Settings settings;
public static final String S3_SERVICE_TYPE_KEY = "cloud.aws.s3service.type";
public AwsModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(AwsS3Service.class).asEagerSingleton();
bind(AwsS3Service.class).to(getS3ServiceClass(settings)).asEagerSingleton();
bind(AwsEc2Service.class).asEagerSingleton();
}
}
public static Class<? extends AwsS3Service> getS3ServiceClass(Settings settings) {
return settings.getAsClass(S3_SERVICE_TYPE_KEY, InternalAwsS3Service.class);
}
}

View File

@ -19,176 +19,14 @@
package org.elasticsearch.cloud.aws;
import java.util.HashMap;
import java.util.Map;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.component.LifecycleComponent;
/**
*
*/
public class AwsS3Service extends AbstractLifecycleComponent<AwsS3Service> {
/**
* (acceskey, endpoint) -> client
*/
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<Tuple<String,String>, AmazonS3Client>();
@Inject
public AwsS3Service(Settings settings, SettingsFilter settingsFilter) {
super(settings);
settingsFilter.addFilter(new AwsSettingsFilter());
}
public synchronized AmazonS3 client() {
String endpoint = getDefaultEndpoint();
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
return getClient(endpoint, account, key);
}
public synchronized AmazonS3 client(String region, String account, String key) {
String endpoint;
if (region == null) {
endpoint = getDefaultEndpoint();
} else {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
if (account == null || key == null) {
account = componentSettings.get("access_key", settings.get("cloud.account"));
key = componentSettings.get("secret_key", settings.get("cloud.key"));
}
return getClient(endpoint, account, key);
}
private synchronized AmazonS3 getClient(String endpoint, String account, String key) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
return client;
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
String protocol = componentSettings.get("protocol", "http").toLowerCase();
if ("http".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new ElasticsearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String proxyHost = componentSettings.get("proxy_host");
if (proxyHost != null) {
String portString = componentSettings.get("proxy_port", "80");
Integer proxyPort;
try {
proxyPort = Integer.parseInt(portString, 10);
} catch (NumberFormatException ex) {
throw new ElasticsearchIllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
}
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
}
AWSCredentialsProvider credentials;
if (account == null && key == null) {
credentials = new AWSCredentialsProviderChain(
new EnvironmentVariableCredentialsProvider(),
new SystemPropertiesCredentialsProvider(),
new InstanceProfileCredentialsProvider()
);
} else {
credentials = new AWSCredentialsProviderChain(
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
);
}
client = new AmazonS3Client(credentials, clientConfiguration);
if (endpoint != null) {
client.setEndpoint(endpoint);
}
clients.put(clientDescriptor, client);
return client;
}
private String getDefaultEndpoint() {
String endpoint = null;
if (componentSettings.get("s3.endpoint") != null) {
endpoint = componentSettings.get("s3.endpoint");
logger.debug("using explicit s3 endpoint [{}]", endpoint);
} else if (componentSettings.get("region") != null) {
String region = componentSettings.get("region").toLowerCase();
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
return endpoint;
}
private static String getEndpoint(String region) {
if ("us-east".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-east-1".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-west".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-1".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-2".equals(region)) {
return "s3-us-west-2.amazonaws.com";
} else if ("ap-southeast".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-1".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-2".equals(region)) {
return "s3-ap-southeast-2.amazonaws.com";
} else if ("ap-northeast".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("ap-northeast-1".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("eu-west".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("eu-west-1".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("sa-east".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else if ("sa-east-1".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else {
throw new ElasticsearchIllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
}
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
for (AmazonS3Client client : clients.values()) {
client.shutdown();
}
}
public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
AmazonS3 client();
AmazonS3 client(String region, String account, String key);
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import java.util.HashMap;
import java.util.Map;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
/**
*
*/
public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Service> implements AwsS3Service {
/**
* (acceskey, endpoint) -> client
*/
private Map<Tuple<String, String>, AmazonS3Client> clients = new HashMap<Tuple<String,String>, AmazonS3Client>();
@Inject
public InternalAwsS3Service(Settings settings, SettingsFilter settingsFilter) {
super(settings);
settingsFilter.addFilter(new AwsSettingsFilter());
}
@Override
public synchronized AmazonS3 client() {
String endpoint = getDefaultEndpoint();
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
return getClient(endpoint, account, key);
}
@Override
public synchronized AmazonS3 client(String region, String account, String key) {
String endpoint;
if (region == null) {
endpoint = getDefaultEndpoint();
} else {
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
if (account == null || key == null) {
account = componentSettings.get("access_key", settings.get("cloud.account"));
key = componentSettings.get("secret_key", settings.get("cloud.key"));
}
return getClient(endpoint, account, key);
}
private synchronized AmazonS3 getClient(String endpoint, String account, String key) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
return client;
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
String protocol = componentSettings.get("protocol", "http").toLowerCase();
if ("http".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTP);
} else if ("https".equals(protocol)) {
clientConfiguration.setProtocol(Protocol.HTTPS);
} else {
throw new ElasticsearchIllegalArgumentException("No protocol supported [" + protocol + "], can either be [http] or [https]");
}
String proxyHost = componentSettings.get("proxy_host");
if (proxyHost != null) {
String portString = componentSettings.get("proxy_port", "80");
Integer proxyPort;
try {
proxyPort = Integer.parseInt(portString, 10);
} catch (NumberFormatException ex) {
throw new ElasticsearchIllegalArgumentException("The configured proxy port value [" + portString + "] is invalid", ex);
}
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
}
AWSCredentialsProvider credentials;
if (account == null && key == null) {
credentials = new AWSCredentialsProviderChain(
new EnvironmentVariableCredentialsProvider(),
new SystemPropertiesCredentialsProvider(),
new InstanceProfileCredentialsProvider()
);
} else {
credentials = new AWSCredentialsProviderChain(
new StaticCredentialsProvider(new BasicAWSCredentials(account, key))
);
}
client = new AmazonS3Client(credentials, clientConfiguration);
if (endpoint != null) {
client.setEndpoint(endpoint);
}
clients.put(clientDescriptor, client);
return client;
}
private String getDefaultEndpoint() {
String endpoint = null;
if (componentSettings.get("s3.endpoint") != null) {
endpoint = componentSettings.get("s3.endpoint");
logger.debug("using explicit s3 endpoint [{}]", endpoint);
} else if (componentSettings.get("region") != null) {
String region = componentSettings.get("region").toLowerCase();
endpoint = getEndpoint(region);
logger.debug("using s3 region [{}], with endpoint [{}]", region, endpoint);
}
return endpoint;
}
private static String getEndpoint(String region) {
if ("us-east".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-east-1".equals(region)) {
return "s3.amazonaws.com";
} else if ("us-west".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-1".equals(region)) {
return "s3-us-west-1.amazonaws.com";
} else if ("us-west-2".equals(region)) {
return "s3-us-west-2.amazonaws.com";
} else if ("ap-southeast".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-1".equals(region)) {
return "s3-ap-southeast-1.amazonaws.com";
} else if ("ap-southeast-2".equals(region)) {
return "s3-ap-southeast-2.amazonaws.com";
} else if ("ap-northeast".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("ap-northeast-1".equals(region)) {
return "s3-ap-northeast-1.amazonaws.com";
} else if ("eu-west".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("eu-west-1".equals(region)) {
return "s3-eu-west-1.amazonaws.com";
} else if ("sa-east".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else if ("sa-east-1".equals(region)) {
return "s3-sa-east-1.amazonaws.com";
} else {
throw new ElasticsearchIllegalArgumentException("No automatic endpoint could be derived from region [" + region + "]");
}
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
for (AmazonS3Client client : clients.values()) {
client.shutdown();
}
}
}

View File

@ -19,12 +19,15 @@
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
@ -56,8 +59,10 @@ public class AbstractS3BlobContainer extends AbstractBlobContainer {
try {
blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName));
return true;
} catch (Exception e) {
} catch (AmazonS3Exception e) {
return false;
} catch (Throwable e) {
throw new BlobStoreException("failed to check if blob exists", e);
}
}
@ -76,7 +81,7 @@ public class AbstractS3BlobContainer extends AbstractBlobContainer {
try {
S3Object object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
is = object.getObjectContent();
} catch (Exception e) {
} catch (Throwable e) {
listener.onFailure(e);
return;
}
@ -87,12 +92,8 @@ public class AbstractS3BlobContainer extends AbstractBlobContainer {
listener.onPartial(buffer, 0, bytesRead);
}
listener.onCompleted();
} catch (Exception e) {
try {
is.close();
} catch (IOException e1) {
// ignore
}
} catch (Throwable e) {
IOUtils.closeWhileHandlingException(is);
listener.onFailure(e);
}
}
@ -135,4 +136,9 @@ public class AbstractS3BlobContainer extends AbstractBlobContainer {
protected String buildKey(String blobName) {
return keyPath + blobName;
}
protected boolean shouldRetry(AmazonS3Exception e) {
return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode());
}
}

View File

@ -54,6 +54,8 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
private final boolean serverSideEncryption;
private final int numberOfRetries;
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, ThreadPool threadPool, boolean serverSideEncryption) {
super(settings);
this.client = client;
@ -63,7 +65,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
this.serverSideEncryption = serverSideEncryption;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
this.numberOfRetries = settings.getAsInt("max_retries", 3);
if (!client.doesBucketExist(bucket)) {
if (region != null) {
client.createBucket(bucket, region);
@ -96,6 +98,10 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
return bufferSizeInBytes;
}
public int numberOfRetries() {
return numberOfRetries;
}
@Override
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new S3ImmutableBlobContainer(path, this);

View File

@ -19,8 +19,8 @@
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobStores;
@ -42,16 +42,37 @@ public class S3ImmutableBlobContainer extends AbstractS3BlobContainer implements
blobStore.executor().execute(new Runnable() {
@Override
public void run() {
try {
ObjectMetadata md = new ObjectMetadata();
if (blobStore.serverSideEncryption()) {
md.setServerSideEncryption(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
int retry = 0;
// Read limit is ignored by InputStreamIndexInput, but we will set it anyway in case
// implementation will change
is.mark(Integer.MAX_VALUE);
while (true) {
try {
ObjectMetadata md = new ObjectMetadata();
if (blobStore.serverSideEncryption()) {
md.setServerSideEncryption(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
md.setContentLength(sizeInBytes);
blobStore.client().putObject(blobStore.bucket(), buildKey(blobName), is, md);
listener.onCompleted();
return;
} catch (AmazonS3Exception e) {
if (shouldRetry(e) && retry < blobStore.numberOfRetries()) {
try {
is.reset();
} catch (IOException ex) {
listener.onFailure(e);
return;
}
retry++;
} else {
listener.onFailure(e);
return;
}
} catch (Throwable e) {
listener.onFailure(e);
return;
}
md.setContentLength(sizeInBytes);
PutObjectResult objectResult = blobStore.client().putObject(blobStore.bucket(), buildKey(blobName), is, md);
listener.onCompleted();
} catch (Exception e) {
listener.onFailure(e);
}
}
});

View File

@ -55,10 +55,10 @@ public class CloudAwsPlugin extends AbstractPlugin {
}
@Override
public Collection<Class<? extends Module>> modules() {
Collection<Class<? extends Module>> modules = Lists.newArrayList();
public Collection<Module> modules(Settings settings) {
Collection<Module> modules = Lists.newArrayList();
if (settings.getAsBoolean("cloud.enabled", true)) {
modules.add(AwsModule.class);
modules.add(new AwsModule(settings));
}
return modules;
}
@ -67,7 +67,7 @@ public class CloudAwsPlugin extends AbstractPlugin {
public Collection<Class<? extends LifecycleComponent>> services() {
Collection<Class<? extends LifecycleComponent>> services = Lists.newArrayList();
if (settings.getAsBoolean("cloud.enabled", true)) {
services.add(AwsS3Service.class);
services.add(AwsModule.getS3ServiceClass(settings));
services.add(AwsEc2Service.class);
}
return services;

View File

@ -70,6 +70,9 @@ public abstract class AbstractAwsTest extends ElasticsearchIntegrationTest {
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, true)
.put( AwsModule.S3_SERVICE_TYPE_KEY, TestAwsS3Service.class)
.put("cloud.aws.test.random", randomInt())
.put("cloud.aws.test.write_failures", 0.1)
.build();
}
}

View File

@ -0,0 +1,554 @@
/*
* Licensed to Elasticsearch (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.HttpMethod;
import com.amazonaws.regions.Region;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.S3ResponseMetadata;
import com.amazonaws.services.s3.model.*;
import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.util.Date;
import java.util.List;
/**
*
*/
public class AmazonS3Wrapper implements AmazonS3 {
protected AmazonS3 delegate;
public AmazonS3Wrapper(AmazonS3 delegate) {
this.delegate = delegate;
}
@Override
public void setEndpoint(String endpoint) {
delegate.setEndpoint(endpoint);
}
@Override
public void setRegion(Region region) throws IllegalArgumentException {
delegate.setRegion(region);
}
@Override
public void setS3ClientOptions(S3ClientOptions clientOptions) {
delegate.setS3ClientOptions(clientOptions);
}
@Override
public void changeObjectStorageClass(String bucketName, String key, StorageClass newStorageClass) throws AmazonClientException, AmazonServiceException {
delegate.changeObjectStorageClass(bucketName, key, newStorageClass);
}
@Override
public void setObjectRedirectLocation(String bucketName, String key, String newRedirectLocation) throws AmazonClientException, AmazonServiceException {
delegate.setObjectRedirectLocation(bucketName, key, newRedirectLocation);
}
@Override
public ObjectListing listObjects(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.listObjects(bucketName);
}
@Override
public ObjectListing listObjects(String bucketName, String prefix) throws AmazonClientException, AmazonServiceException {
return delegate.listObjects(bucketName, prefix);
}
@Override
public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) throws AmazonClientException, AmazonServiceException {
return delegate.listObjects(listObjectsRequest);
}
@Override
public ObjectListing listNextBatchOfObjects(ObjectListing previousObjectListing) throws AmazonClientException, AmazonServiceException {
return delegate.listNextBatchOfObjects(previousObjectListing);
}
@Override
public VersionListing listVersions(String bucketName, String prefix) throws AmazonClientException, AmazonServiceException {
return delegate.listVersions(bucketName, prefix);
}
@Override
public VersionListing listNextBatchOfVersions(VersionListing previousVersionListing) throws AmazonClientException, AmazonServiceException {
return delegate.listNextBatchOfVersions(previousVersionListing);
}
@Override
public VersionListing listVersions(String bucketName, String prefix, String keyMarker, String versionIdMarker, String delimiter, Integer maxResults) throws AmazonClientException, AmazonServiceException {
return delegate.listVersions(bucketName, prefix, keyMarker, versionIdMarker, delimiter, maxResults);
}
@Override
public VersionListing listVersions(ListVersionsRequest listVersionsRequest) throws AmazonClientException, AmazonServiceException {
return delegate.listVersions(listVersionsRequest);
}
@Override
public Owner getS3AccountOwner() throws AmazonClientException, AmazonServiceException {
return delegate.getS3AccountOwner();
}
@Override
public boolean doesBucketExist(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.doesBucketExist(bucketName);
}
@Override
public List<Bucket> listBuckets() throws AmazonClientException, AmazonServiceException {
return delegate.listBuckets();
}
@Override
public List<Bucket> listBuckets(ListBucketsRequest listBucketsRequest) throws AmazonClientException, AmazonServiceException {
return delegate.listBuckets(listBucketsRequest);
}
@Override
public String getBucketLocation(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketLocation(bucketName);
}
@Override
public String getBucketLocation(GetBucketLocationRequest getBucketLocationRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketLocation(getBucketLocationRequest);
}
@Override
public Bucket createBucket(CreateBucketRequest createBucketRequest) throws AmazonClientException, AmazonServiceException {
return delegate.createBucket(createBucketRequest);
}
@Override
public Bucket createBucket(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.createBucket(bucketName);
}
@Override
public Bucket createBucket(String bucketName, com.amazonaws.services.s3.model.Region region) throws AmazonClientException, AmazonServiceException {
return delegate.createBucket(bucketName, region);
}
@Override
public Bucket createBucket(String bucketName, String region) throws AmazonClientException, AmazonServiceException {
return delegate.createBucket(bucketName, region);
}
@Override
public AccessControlList getObjectAcl(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectAcl(bucketName, key);
}
@Override
public AccessControlList getObjectAcl(String bucketName, String key, String versionId) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectAcl(bucketName, key, versionId);
}
@Override
public void setObjectAcl(String bucketName, String key, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, acl);
}
@Override
public void setObjectAcl(String bucketName, String key, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, acl);
}
@Override
public void setObjectAcl(String bucketName, String key, String versionId, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, versionId, acl);
}
@Override
public void setObjectAcl(String bucketName, String key, String versionId, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setObjectAcl(bucketName, key, versionId, acl);
}
@Override
public AccessControlList getBucketAcl(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketAcl(bucketName);
}
@Override
public void setBucketAcl(SetBucketAclRequest setBucketAclRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketAcl(setBucketAclRequest);
}
@Override
public AccessControlList getBucketAcl(GetBucketAclRequest getBucketAclRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketAcl(getBucketAclRequest);
}
@Override
public void setBucketAcl(String bucketName, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setBucketAcl(bucketName, acl);
}
@Override
public void setBucketAcl(String bucketName, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
delegate.setBucketAcl(bucketName, acl);
}
@Override
public ObjectMetadata getObjectMetadata(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectMetadata(bucketName, key);
}
@Override
public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getObjectMetadata(getObjectMetadataRequest);
}
@Override
public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
return delegate.getObject(bucketName, key);
}
@Override
public S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getObject(getObjectRequest);
}
@Override
public ObjectMetadata getObject(GetObjectRequest getObjectRequest, File destinationFile) throws AmazonClientException, AmazonServiceException {
return delegate.getObject(getObjectRequest, destinationFile);
}
@Override
public void deleteBucket(DeleteBucketRequest deleteBucketRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucket(deleteBucketRequest);
}
@Override
public void deleteBucket(String bucketName) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucket(bucketName);
}
@Override
public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException, AmazonServiceException {
return delegate.putObject(putObjectRequest);
}
@Override
public PutObjectResult putObject(String bucketName, String key, File file) throws AmazonClientException, AmazonServiceException {
return delegate.putObject(bucketName, key, file);
}
@Override
public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata) throws AmazonClientException, AmazonServiceException {
return delegate.putObject(bucketName, key, input, metadata);
}
@Override
public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName, String destinationKey) throws AmazonClientException, AmazonServiceException {
return delegate.copyObject(sourceBucketName, sourceKey, destinationBucketName, destinationKey);
}
@Override
public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) throws AmazonClientException, AmazonServiceException {
return delegate.copyObject(copyObjectRequest);
}
@Override
public CopyPartResult copyPart(CopyPartRequest copyPartRequest) throws AmazonClientException, AmazonServiceException {
return delegate.copyPart(copyPartRequest);
}
@Override
public void deleteObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
delegate.deleteObject(bucketName, key);
}
@Override
public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteObject(deleteObjectRequest);
}
@Override
public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest) throws AmazonClientException, AmazonServiceException {
return delegate.deleteObjects(deleteObjectsRequest);
}
@Override
public void deleteVersion(String bucketName, String key, String versionId) throws AmazonClientException, AmazonServiceException {
delegate.deleteVersion(bucketName, key, versionId);
}
@Override
public void deleteVersion(DeleteVersionRequest deleteVersionRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteVersion(deleteVersionRequest);
}
@Override
public BucketLoggingConfiguration getBucketLoggingConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketLoggingConfiguration(bucketName);
}
@Override
public void setBucketLoggingConfiguration(SetBucketLoggingConfigurationRequest setBucketLoggingConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketLoggingConfiguration(setBucketLoggingConfigurationRequest);
}
@Override
public BucketVersioningConfiguration getBucketVersioningConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketVersioningConfiguration(bucketName);
}
@Override
public void setBucketVersioningConfiguration(SetBucketVersioningConfigurationRequest setBucketVersioningConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketVersioningConfiguration(setBucketVersioningConfigurationRequest);
}
@Override
public BucketLifecycleConfiguration getBucketLifecycleConfiguration(String bucketName) {
return delegate.getBucketLifecycleConfiguration(bucketName);
}
@Override
public void setBucketLifecycleConfiguration(String bucketName, BucketLifecycleConfiguration bucketLifecycleConfiguration) {
delegate.setBucketLifecycleConfiguration(bucketName, bucketLifecycleConfiguration);
}
@Override
public void setBucketLifecycleConfiguration(SetBucketLifecycleConfigurationRequest setBucketLifecycleConfigurationRequest) {
delegate.setBucketLifecycleConfiguration(setBucketLifecycleConfigurationRequest);
}
@Override
public void deleteBucketLifecycleConfiguration(String bucketName) {
delegate.deleteBucketLifecycleConfiguration(bucketName);
}
@Override
public void deleteBucketLifecycleConfiguration(DeleteBucketLifecycleConfigurationRequest deleteBucketLifecycleConfigurationRequest) {
delegate.deleteBucketLifecycleConfiguration(deleteBucketLifecycleConfigurationRequest);
}
@Override
public BucketCrossOriginConfiguration getBucketCrossOriginConfiguration(String bucketName) {
return delegate.getBucketCrossOriginConfiguration(bucketName);
}
@Override
public void setBucketCrossOriginConfiguration(String bucketName, BucketCrossOriginConfiguration bucketCrossOriginConfiguration) {
delegate.setBucketCrossOriginConfiguration(bucketName, bucketCrossOriginConfiguration);
}
@Override
public void setBucketCrossOriginConfiguration(SetBucketCrossOriginConfigurationRequest setBucketCrossOriginConfigurationRequest) {
delegate.setBucketCrossOriginConfiguration(setBucketCrossOriginConfigurationRequest);
}
@Override
public void deleteBucketCrossOriginConfiguration(String bucketName) {
delegate.deleteBucketCrossOriginConfiguration(bucketName);
}
@Override
public void deleteBucketCrossOriginConfiguration(DeleteBucketCrossOriginConfigurationRequest deleteBucketCrossOriginConfigurationRequest) {
delegate.deleteBucketCrossOriginConfiguration(deleteBucketCrossOriginConfigurationRequest);
}
@Override
public BucketTaggingConfiguration getBucketTaggingConfiguration(String bucketName) {
return delegate.getBucketTaggingConfiguration(bucketName);
}
@Override
public void setBucketTaggingConfiguration(String bucketName, BucketTaggingConfiguration bucketTaggingConfiguration) {
delegate.setBucketTaggingConfiguration(bucketName, bucketTaggingConfiguration);
}
@Override
public void setBucketTaggingConfiguration(SetBucketTaggingConfigurationRequest setBucketTaggingConfigurationRequest) {
delegate.setBucketTaggingConfiguration(setBucketTaggingConfigurationRequest);
}
@Override
public void deleteBucketTaggingConfiguration(String bucketName) {
delegate.deleteBucketTaggingConfiguration(bucketName);
}
@Override
public void deleteBucketTaggingConfiguration(DeleteBucketTaggingConfigurationRequest deleteBucketTaggingConfigurationRequest) {
delegate.deleteBucketTaggingConfiguration(deleteBucketTaggingConfigurationRequest);
}
@Override
public BucketNotificationConfiguration getBucketNotificationConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketNotificationConfiguration(bucketName);
}
@Override
public void setBucketNotificationConfiguration(SetBucketNotificationConfigurationRequest setBucketNotificationConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketNotificationConfiguration(setBucketNotificationConfigurationRequest);
}
@Override
public void setBucketNotificationConfiguration(String bucketName, BucketNotificationConfiguration bucketNotificationConfiguration) throws AmazonClientException, AmazonServiceException {
delegate.setBucketNotificationConfiguration(bucketName, bucketNotificationConfiguration);
}
@Override
public BucketWebsiteConfiguration getBucketWebsiteConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketWebsiteConfiguration(bucketName);
}
@Override
public BucketWebsiteConfiguration getBucketWebsiteConfiguration(GetBucketWebsiteConfigurationRequest getBucketWebsiteConfigurationRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketWebsiteConfiguration(getBucketWebsiteConfigurationRequest);
}
@Override
public void setBucketWebsiteConfiguration(String bucketName, BucketWebsiteConfiguration configuration) throws AmazonClientException, AmazonServiceException {
delegate.setBucketWebsiteConfiguration(bucketName, configuration);
}
@Override
public void setBucketWebsiteConfiguration(SetBucketWebsiteConfigurationRequest setBucketWebsiteConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketWebsiteConfiguration(setBucketWebsiteConfigurationRequest);
}
@Override
public void deleteBucketWebsiteConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucketWebsiteConfiguration(bucketName);
}
@Override
public void deleteBucketWebsiteConfiguration(DeleteBucketWebsiteConfigurationRequest deleteBucketWebsiteConfigurationRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucketWebsiteConfiguration(deleteBucketWebsiteConfigurationRequest);
}
@Override
public BucketPolicy getBucketPolicy(String bucketName) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketPolicy(bucketName);
}
@Override
public BucketPolicy getBucketPolicy(GetBucketPolicyRequest getBucketPolicyRequest) throws AmazonClientException, AmazonServiceException {
return delegate.getBucketPolicy(getBucketPolicyRequest);
}
@Override
public void setBucketPolicy(String bucketName, String policyText) throws AmazonClientException, AmazonServiceException {
delegate.setBucketPolicy(bucketName, policyText);
}
@Override
public void setBucketPolicy(SetBucketPolicyRequest setBucketPolicyRequest) throws AmazonClientException, AmazonServiceException {
delegate.setBucketPolicy(setBucketPolicyRequest);
}
@Override
public void deleteBucketPolicy(String bucketName) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucketPolicy(bucketName);
}
@Override
public void deleteBucketPolicy(DeleteBucketPolicyRequest deleteBucketPolicyRequest) throws AmazonClientException, AmazonServiceException {
delegate.deleteBucketPolicy(deleteBucketPolicyRequest);
}
@Override
public URL generatePresignedUrl(String bucketName, String key, Date expiration) throws AmazonClientException {
return delegate.generatePresignedUrl(bucketName, key, expiration);
}
@Override
public URL generatePresignedUrl(String bucketName, String key, Date expiration, HttpMethod method) throws AmazonClientException {
return delegate.generatePresignedUrl(bucketName, key, expiration, method);
}
@Override
public URL generatePresignedUrl(GeneratePresignedUrlRequest generatePresignedUrlRequest) throws AmazonClientException {
return delegate.generatePresignedUrl(generatePresignedUrlRequest);
}
@Override
public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.initiateMultipartUpload(request);
}
@Override
public UploadPartResult uploadPart(UploadPartRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.uploadPart(request);
}
@Override
public PartListing listParts(ListPartsRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.listParts(request);
}
@Override
public void abortMultipartUpload(AbortMultipartUploadRequest request) throws AmazonClientException, AmazonServiceException {
delegate.abortMultipartUpload(request);
}
@Override
public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.completeMultipartUpload(request);
}
@Override
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest request) throws AmazonClientException, AmazonServiceException {
return delegate.listMultipartUploads(request);
}
@Override
public S3ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
return delegate.getCachedResponseMetadata(request);
}
@Override
public void restoreObject(RestoreObjectRequest copyGlacierObjectRequest) throws AmazonServiceException {
delegate.restoreObject(copyGlacierObjectRequest);
}
@Override
public void restoreObject(String bucketName, String key, int expirationInDays) throws AmazonServiceException {
delegate.restoreObject(bucketName, key, expirationInDays);
}
@Override
public void enableRequesterPays(String bucketName) throws AmazonServiceException, AmazonClientException {
delegate.enableRequesterPays(bucketName);
}
@Override
public void disableRequesterPays(String bucketName) throws AmazonServiceException, AmazonClientException {
delegate.disableRequesterPays(bucketName);
}
@Override
public boolean isRequesterPaysEnabled(String bucketName) throws AmazonServiceException, AmazonClientException {
return delegate.isRequesterPaysEnabled(bucketName);
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
/**
*
*/
public class TestAmazonS3 extends AmazonS3Wrapper {
private double writeFailureRate = 0.0;
private String randomPrefix;
ConcurrentMap<String, AtomicLong> accessCounts = new ConcurrentHashMap<String, AtomicLong>();
private long incrementAndGet(String path) {
AtomicLong value = accessCounts.get(path);
if (value == null) {
value = accessCounts.putIfAbsent(path, new AtomicLong(1));
}
if (value != null) {
return value.incrementAndGet();
}
return 1;
}
public TestAmazonS3(AmazonS3 delegate, Settings componentSettings) {
super(delegate);
randomPrefix = componentSettings.get("test.random");
writeFailureRate = componentSettings.getAsDouble("test.write_failures", 0.0);
}
@Override
public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata) throws AmazonClientException, AmazonServiceException {
if (shouldFail(bucketName, key, writeFailureRate)) {
long length = metadata.getContentLength();
long partToRead = (long) (length * randomDouble());
byte[] buffer = new byte[1024];
for (long cur = 0; cur < partToRead; cur += buffer.length) {
try {
input.read(buffer, 0, (int) (partToRead - cur > buffer.length ? buffer.length : partToRead - cur));
} catch (IOException ex) {
throw new ElasticsearchException("cannot read input stream", ex);
}
}
AmazonS3Exception ex = new AmazonS3Exception("Random S3 exception");
ex.setStatusCode(400);
ex.setErrorCode("RequestTimeout");
throw ex;
} else {
return super.putObject(bucketName, key, input, metadata);
}
}
private boolean shouldFail(String bucketName, String key, double probability) {
if (probability > 0.0) {
String path = randomPrefix + "-" + bucketName + "+" + key;
path += "/" + incrementAndGet(path);
return Math.abs(hashCode(path)) < Integer.MAX_VALUE * probability;
} else {
return false;
}
}
private int hashCode(String path) {
try {
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] bytes = digest.digest(path.getBytes("UTF-8"));
int i = 0;
return ((bytes[i++] & 0xFF) << 24) | ((bytes[i++] & 0xFF) << 16)
| ((bytes[i++] & 0xFF) << 8) | (bytes[i++] & 0xFF);
} catch (UnsupportedEncodingException ex) {
throw new ElasticsearchException("cannot calculate hashcode", ex);
} catch (NoSuchAlgorithmException ex) {
throw new ElasticsearchException("cannot calculate hashcode", ex);
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import java.util.IdentityHashMap;
/**
*
*/
public class TestAwsS3Service extends InternalAwsS3Service {
IdentityHashMap<AmazonS3, TestAmazonS3> clients = new IdentityHashMap<AmazonS3, TestAmazonS3>();
@Inject
public TestAwsS3Service(Settings settings, SettingsFilter settingsFilter) {
super(settings, settingsFilter);
}
@Override
public synchronized AmazonS3 client() {
return cachedWrapper(super.client());
}
@Override
public synchronized AmazonS3 client(String region, String account, String key) {
return cachedWrapper(super.client(region, account, key));
}
private AmazonS3 cachedWrapper(AmazonS3 client) {
TestAmazonS3 wrapper = clients.get(client);
if (wrapper == null) {
wrapper = new TestAmazonS3(client, componentSettings);
clients.put(client, wrapper);
}
return wrapper;
}
@Override
protected synchronized void doClose() throws ElasticsearchException {
super.doClose();
clients.clear();
}
}