Add retry logic for S3 connection errors when restoring snapshots

This commit adds a retry logic when reading blobs from S3. It also adds a retry logic when initializing a multipart upload and sets the internal "max retries" parameter of the Amazon S3 client with the same value as the "max_retry" parameter set for the snapshot repository (so in worst cases with the default value set to 3, 3x3=9 attempts will be made). The internal S3 client uses an exponential back off strategy between each connection exception (mainly IOException).

Closes elasticsearch/elasticsearch#8280
This commit is contained in:
tlrx 2014-11-04 18:33:42 +01:00
parent 65bda62388
commit ea91adf6a1
6 changed files with 79 additions and 39 deletions

View File

@ -29,4 +29,6 @@ public interface AwsS3Service extends LifecycleComponent<AwsS3Service> {
AmazonS3 client();
AmazonS3 client(String region, String account, String key);
AmazonS3 client(String region, String account, String key, Integer maxRetries);
}

View File

@ -60,11 +60,16 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
String account = componentSettings.get("access_key", settings.get("cloud.account"));
String key = componentSettings.get("secret_key", settings.get("cloud.key"));
return getClient(endpoint, account, key);
return getClient(endpoint, account, key, null);
}
@Override
public synchronized AmazonS3 client(String region, String account, String key) {
public AmazonS3 client(String region, String account, String key) {
return client(region, account, key, null);
}
@Override
public synchronized AmazonS3 client(String region, String account, String key, Integer maxRetries) {
String endpoint;
if (region == null) {
endpoint = getDefaultEndpoint();
@ -77,11 +82,11 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
key = componentSettings.get("secret_key", settings.get("cloud.key"));
}
return getClient(endpoint, account, key);
return getClient(endpoint, account, key, maxRetries);
}
private synchronized AmazonS3 getClient(String endpoint, String account, String key) {
private synchronized AmazonS3 getClient(String endpoint, String account, String key, Integer maxRetries) {
Tuple<String, String> clientDescriptor = new Tuple<String, String>(endpoint, account);
AmazonS3Client client = clients.get(clientDescriptor);
if (client != null) {
@ -111,6 +116,11 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
clientConfiguration.withProxyHost(proxyHost).setProxyPort(proxyPort);
}
if (maxRetries != null) {
// If not explicitly set, default to 3 with exponential backoff policy
clientConfiguration.setMaxErrorRetry(maxRetries);
}
AWSCredentialsProvider credentials;
if (account == null && key == null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.*;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -96,12 +97,12 @@ public class DefaultS3OutputStream extends S3OutputStream {
private void upload(byte[] bytes, int off, int len) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry < getNumberOfRetries()) {
while (retry <= getNumberOfRetries()) {
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
break;
} catch (AmazonS3Exception e) {
if (shouldRetry(e)) {
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
@ -123,11 +124,20 @@ public class DefaultS3OutputStream extends S3OutputStream {
}
private void initializeMultipart() {
if (multipartId == null) {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
int retry = 0;
while ((retry <= getNumberOfRetries()) && (multipartId == null)) {
try {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
}
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
throw e;
}
}
}
}
@ -145,14 +155,14 @@ public class DefaultS3OutputStream extends S3OutputStream {
private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry < getNumberOfRetries()) {
while (retry <= getNumberOfRetries()) {
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
multiparts.add(partETag);
multipartChunks++;
return;
} catch (AmazonS3Exception e) {
if (shouldRetry(e) && retry < getNumberOfRetries()) {
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
@ -182,13 +192,13 @@ public class DefaultS3OutputStream extends S3OutputStream {
private void completeMultipart() {
int retry = 0;
while (retry < getNumberOfRetries()) {
while (retry <= getNumberOfRetries()) {
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
return;
} catch (AmazonS3Exception e) {
if (shouldRetry(e) && retry < getNumberOfRetries()) {
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
abortMultipart();
@ -218,8 +228,4 @@ public class DefaultS3OutputStream extends S3OutputStream {
throws AmazonS3Exception {
blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId));
}
protected boolean shouldRetry(AmazonS3Exception e) {
return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode());
}
}

View File

@ -79,20 +79,30 @@ public class S3BlobContainer extends AbstractBlobContainer {
@Override
public InputStream openInput(String blobName) throws IOException {
try {
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
return s3Object.getObjectContent();
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
throw new FileNotFoundException(e.getMessage());
int retry = 0;
while (retry <= blobStore.numberOfRetries()) {
try {
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) {
retry++;
} else {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new FileNotFoundException(e.getMessage());
}
}
throw e;
}
}
throw e;
}
throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() +"]");
}
@Override
public OutputStream createOutput(final String blobName) throws IOException {
// UploadS3OutputStream does buffering internally
// UploadS3OutputStream does buffering & retry logic internally
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
}

View File

@ -19,7 +19,9 @@
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
@ -55,12 +57,8 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
private final int numberOfRetries;
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, String region, boolean serverSideEncryption) {
this(settings, client, bucket, region, serverSideEncryption, null);
}
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, ByteSizeValue bufferSize) {
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption,
ByteSizeValue bufferSize, int maxRetries) {
super(settings);
this.client = client;
this.bucket = bucket;
@ -72,7 +70,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
throw new BlobStoreException("\"Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
}
this.numberOfRetries = settings.getAsInt("max_retries", 3);
this.numberOfRetries = maxRetries;
if (!client.doesBucketExist(bucket)) {
if (region != null) {
client.createBucket(bucket, region);
@ -152,6 +150,16 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
}
}
protected boolean shouldRetry(AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
AmazonS3Exception s3e = (AmazonS3Exception)e;
if (s3e.getStatusCode() == 400 && "RequestTimeout".equals(s3e.getErrorCode())) {
return true;
}
}
return e.isRetryable();
}
@Override
public void close() {
}

View File

@ -120,10 +120,14 @@ public class S3Repository extends BlobStoreRepository {
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false));
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null));
logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}]", bucket, region, chunkSize, serverSideEncryption, bufferSize);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, serverSideEncryption, bufferSize);
Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", componentSettings.getAsInt("max_retries", 3));
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]",
bucket, region, chunkSize, serverSideEncryption, bufferSize, maxRetries);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries);
String basePath = repositorySettings.settings().get("base_path", null);
if (Strings.hasLength(basePath)) {
BlobPath path = new BlobPath();