From ea91adf6a1ed28c1a93a6c5d38a6e02ed295ed07 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 4 Nov 2014 18:33:42 +0100 Subject: [PATCH] Add retry logic for S3 connection errors when restoring snapshots This commit adds a retry logic when reading blobs from S3. It also adds a retry logic when initializing a multipart upload and sets the internal "max retries" parameter of the Amazon S3 client with the same value as the "max_retry" parameter set for the snapshot repository (so in worst cases with the default value set to 3, 3x3=9 attempts will be made). The internal S3 client uses an exponential back off strategy between each connection exception (mainly IOException). Closes elasticsearch/elasticsearch#8280 --- .../elasticsearch/cloud/aws/AwsS3Service.java | 2 + .../cloud/aws/InternalAwsS3Service.java | 18 ++++++-- .../aws/blobstore/DefaultS3OutputStream.java | 42 +++++++++++-------- .../cloud/aws/blobstore/S3BlobContainer.java | 26 ++++++++---- .../cloud/aws/blobstore/S3BlobStore.java | 22 ++++++---- .../repositories/s3/S3Repository.java | 8 +++- 6 files changed, 79 insertions(+), 39 deletions(-) diff --git a/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java b/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java index af0147670a1..fb01a0b9705 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java +++ b/src/main/java/org/elasticsearch/cloud/aws/AwsS3Service.java @@ -29,4 +29,6 @@ public interface AwsS3Service extends LifecycleComponent { AmazonS3 client(); AmazonS3 client(String region, String account, String key); + + AmazonS3 client(String region, String account, String key, Integer maxRetries); } diff --git a/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java b/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java index 75efa1601f2..a5828e40e0a 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java +++ b/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java @@ -60,11 +60,16 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent clientDescriptor = new Tuple(endpoint, account); AmazonS3Client client = clients.get(clientDescriptor); if (client != null) { @@ -111,6 +116,11 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent(); + int retry = 0; + while ((retry <= getNumberOfRetries()) && (multipartId == null)) { + try { + multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption()); + if (multipartId != null) { + multipartChunks = 1; + multiparts = new ArrayList<>(); + } + } catch (AmazonClientException e) { + if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) { + retry++; + } else { + throw e; + } } } } @@ -145,14 +155,14 @@ public class DefaultS3OutputStream extends S3OutputStream { private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException { try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { int retry = 0; - while (retry < getNumberOfRetries()) { + while (retry <= getNumberOfRetries()) { try { PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart); multiparts.add(partETag); multipartChunks++; return; - } catch (AmazonS3Exception e) { - if (shouldRetry(e) && retry < getNumberOfRetries()) { + } catch (AmazonClientException e) { + if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) { is.reset(); retry++; } else { @@ -182,13 +192,13 @@ public class DefaultS3OutputStream extends S3OutputStream { private void completeMultipart() { int retry = 0; - while (retry < getNumberOfRetries()) { + while (retry <= getNumberOfRetries()) { try { doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts); multipartId = null; return; - } catch (AmazonS3Exception e) { - if (shouldRetry(e) && retry < getNumberOfRetries()) { + } catch (AmazonClientException e) { + if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) { retry++; } else { abortMultipart(); @@ -218,8 +228,4 @@ public class DefaultS3OutputStream extends S3OutputStream { throws AmazonS3Exception { blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId)); } - - protected boolean shouldRetry(AmazonS3Exception e) { - return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode()); - } } diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java index d5e231b70ba..299633d2a17 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java @@ -79,20 +79,30 @@ public class S3BlobContainer extends AbstractBlobContainer { @Override public InputStream openInput(String blobName) throws IOException { - try { - S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)); - return s3Object.getObjectContent(); - } catch (AmazonS3Exception e) { - if (e.getStatusCode() == 404) { - throw new FileNotFoundException(e.getMessage()); + int retry = 0; + while (retry <= blobStore.numberOfRetries()) { + try { + S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)); + return s3Object.getObjectContent(); + } catch (AmazonClientException e) { + if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) { + retry++; + } else { + if (e instanceof AmazonS3Exception) { + if (404 == ((AmazonS3Exception) e).getStatusCode()) { + throw new FileNotFoundException(e.getMessage()); + } + } + throw e; + } } - throw e; } + throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() +"]"); } @Override public OutputStream createOutput(final String blobName) throws IOException { - // UploadS3OutputStream does buffering internally + // UploadS3OutputStream does buffering & retry logic internally return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption()); } diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java index 10ce6c7373f..fcb9e67fbce 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java @@ -19,7 +19,9 @@ package org.elasticsearch.cloud.aws.blobstore; +import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.ObjectListing; @@ -55,12 +57,8 @@ public class S3BlobStore extends AbstractComponent implements BlobStore { private final int numberOfRetries; - - public S3BlobStore(Settings settings, AmazonS3 client, String bucket, String region, boolean serverSideEncryption) { - this(settings, client, bucket, region, serverSideEncryption, null); - } - - public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, ByteSizeValue bufferSize) { + public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, + ByteSizeValue bufferSize, int maxRetries) { super(settings); this.client = client; this.bucket = bucket; @@ -72,7 +70,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore { throw new BlobStoreException("\"Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]"); } - this.numberOfRetries = settings.getAsInt("max_retries", 3); + this.numberOfRetries = maxRetries; if (!client.doesBucketExist(bucket)) { if (region != null) { client.createBucket(bucket, region); @@ -152,6 +150,16 @@ public class S3BlobStore extends AbstractComponent implements BlobStore { } } + protected boolean shouldRetry(AmazonClientException e) { + if (e instanceof AmazonS3Exception) { + AmazonS3Exception s3e = (AmazonS3Exception)e; + if (s3e.getStatusCode() == 400 && "RequestTimeout".equals(s3e.getErrorCode())) { + return true; + } + } + return e.isRetryable(); + } + @Override public void close() { } diff --git a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 4948364e758..f4bcd5bcec5 100644 --- a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -120,10 +120,14 @@ public class S3Repository extends BlobStoreRepository { boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false)); ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null)); - logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}]", bucket, region, chunkSize, serverSideEncryption, bufferSize); - blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, serverSideEncryption, bufferSize); + Integer maxRetries = repositorySettings.settings().getAsInt("max_retries", componentSettings.getAsInt("max_retries", 3)); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB))); this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false)); + + logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], max_retries [{}]", + bucket, region, chunkSize, serverSideEncryption, bufferSize, maxRetries); + + blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key"), maxRetries), bucket, region, serverSideEncryption, bufferSize, maxRetries); String basePath = repositorySettings.settings().get("base_path", null); if (Strings.hasLength(basePath)) { BlobPath path = new BlobPath();