Removes the retry mechanism from the S3 blob store (#23952)

Currently, both the Amazon S3 client provides a retry mechanism, and the
S3 blob store also attempts retries for failed read/write requests.
Both retry mechanisms are controlled by the
`repositories.s3.max_retries` setting.  However, the S3 blob store retry
mechanism is unnecessary because the Amazon S3 client provided by the
Amazon SDK already handles retries (with exponential backoff) based on
the provided max retry configuration setting (defaults to 3) as long as
the request is retryable.  Hence, this commit removes the unneeded retry
logic in the S3 blob store and the S3OutputStream.

Closes #22845
This commit is contained in:
Ali Beyad 2017-04-06 19:58:53 -04:00 committed by GitHub
parent 53e3ddf2f0
commit 4f121744bd
13 changed files with 85 additions and 125 deletions

View File

@ -25,6 +25,7 @@ import java.util.function.Function;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
@ -159,5 +160,8 @@ interface AwsS3Service extends LifecycleComponent {
Setting.timeSetting("cloud.aws.s3.read_timeout", AwsS3Service.READ_TIMEOUT, Property.NodeScope, Property.Deprecated);
}
AmazonS3 client(Settings repositorySettings, Integer maxRetries, boolean useThrottleRetries, Boolean pathStyleAccess);
/**
* Creates an {@code AmazonS3} client from the given repository metadata and node settings.
*/
AmazonS3 client(RepositoryMetaData metadata, Settings repositorySettings);
}

View File

@ -72,8 +72,8 @@ class DefaultS3OutputStream extends S3OutputStream {
private int multipartChunks;
private List<PartETag> multiparts;
DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
super(blobStore, bucketName, blobName, bufferSizeInBytes, numberOfRetries, serverSideEncryption);
DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) {
super(blobStore, bucketName, blobName, bufferSizeInBytes, serverSideEncryption);
}
@Override
@ -106,19 +106,10 @@ class DefaultS3OutputStream extends S3OutputStream {
*/
private void upload(byte[] bytes, int off, int len) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
break;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
throw new IOException("Unable to upload object " + getBlobName(), e);
}
}
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
} catch (AmazonClientException e) {
throw new IOException("Unable to upload object " + getBlobName(), e);
}
}
}
@ -131,10 +122,9 @@ class DefaultS3OutputStream extends S3OutputStream {
}
md.setContentLength(length);
InputStream inputStream = is;
// We try to compute a MD5 while reading it
MessageDigest messageDigest;
InputStream inputStream;
try {
messageDigest = MessageDigest.getInstance("MD5");
inputStream = new DigestInputStream(is, messageDigest);
@ -159,20 +149,11 @@ class DefaultS3OutputStream extends S3OutputStream {
}
private void initializeMultipart() {
int retry = 0;
while ((retry <= getNumberOfRetries()) && (multipartId == null)) {
try {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
}
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
throw e;
}
while (multipartId == null) {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
}
}
}
@ -193,22 +174,13 @@ class DefaultS3OutputStream extends S3OutputStream {
private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
multiparts.add(partETag);
multipartChunks++;
return;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
is.reset();
retry++;
} else {
abortMultipart();
throw e;
}
}
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
multiparts.add(partETag);
multipartChunks++;
} catch (AmazonClientException e) {
abortMultipart();
throw e;
}
}
}
@ -230,20 +202,13 @@ class DefaultS3OutputStream extends S3OutputStream {
}
private void completeMultipart() {
int retry = 0;
while (retry <= getNumberOfRetries()) {
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
return;
} catch (AmazonClientException e) {
if (getBlobStore().shouldRetry(e) && retry < getNumberOfRetries()) {
retry++;
} else {
abortMultipart();
throw e;
}
}
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
return;
} catch (AmazonClientException e) {
abortMultipart();
throw e;
}
}

View File

@ -36,6 +36,7 @@ import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -45,6 +46,8 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import static org.elasticsearch.repositories.s3.S3Repository.getValue;
class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Service {
// pkg private for tests
@ -60,8 +63,7 @@ class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Se
}
@Override
public synchronized AmazonS3 client(Settings repositorySettings, Integer maxRetries,
boolean useThrottleRetries, Boolean pathStyleAccess) {
public synchronized AmazonS3 client(RepositoryMetaData metadata, Settings repositorySettings) {
String clientName = CLIENT_NAME.get(repositorySettings);
String foundEndpoint = findEndpoint(logger, repositorySettings, settings, clientName);
@ -73,6 +75,26 @@ class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Se
return client;
}
Integer maxRetries = getValue(metadata.settings(), settings,
S3Repository.Repository.MAX_RETRIES_SETTING,
S3Repository.Repositories.MAX_RETRIES_SETTING);
boolean useThrottleRetries = getValue(metadata.settings(), settings,
S3Repository.Repository.USE_THROTTLE_RETRIES_SETTING,
S3Repository.Repositories.USE_THROTTLE_RETRIES_SETTING);
// If the user defined a path style access setting, we rely on it,
// otherwise we use the default value set by the SDK
Boolean pathStyleAccess = null;
if (S3Repository.Repository.PATH_STYLE_ACCESS_SETTING.exists(metadata.settings()) ||
S3Repository.Repositories.PATH_STYLE_ACCESS_SETTING.exists(settings)) {
pathStyleAccess = getValue(metadata.settings(), settings,
S3Repository.Repository.PATH_STYLE_ACCESS_SETTING,
S3Repository.Repositories.PATH_STYLE_ACCESS_SETTING);
}
logger.debug("creating S3 client with client_name [{}], endpoint [{}], max_retries [{}], " +
"use_throttle_retries [{}], path_style_access [{}]",
clientName, foundEndpoint, maxRetries, useThrottleRetries, pathStyleAccess);
client = new AmazonS3Client(
credentials,
buildConfiguration(logger, repositorySettings, settings, clientName, maxRetries, foundEndpoint, useThrottleRetries));
@ -187,7 +209,7 @@ class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Se
// no repository setting, just use global setting
return globalSetting.get(globalSettings);
} else {
return S3Repository.getValue(repositorySettings, globalSettings, repositorySetting, globalSetting);
return getValue(repositorySettings, globalSettings, repositorySetting, globalSetting);
}
}

View File

@ -73,25 +73,17 @@ class S3BlobContainer extends AbstractBlobContainer {
@Override
public InputStream readBlob(String blobName) throws IOException {
int retry = 0;
while (retry <= blobStore.numberOfRetries()) {
try {
S3Object s3Object = SocketAccess.doPrivileged(() -> blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) {
retry++;
} else {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
try {
S3Object s3Object = SocketAccess.doPrivileged(() -> blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
}
throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() + "]");
}
@Override
@ -120,7 +112,7 @@ class S3BlobContainer extends AbstractBlobContainer {
private OutputStream createOutput(final String blobName) throws IOException {
// UploadS3OutputStream does buffering & retry logic internally
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName),
blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
blobStore.bufferSizeInBytes(), blobStore.serverSideEncryption());
}
@Override

View File

@ -51,21 +51,18 @@ class S3BlobStore extends AbstractComponent implements BlobStore {
private final boolean serverSideEncryption;
private final int numberOfRetries;
private final CannedAccessControlList cannedACL;
private final StorageClass storageClass;
S3BlobStore(Settings settings, AmazonS3 client, String bucket, boolean serverSideEncryption,
ByteSizeValue bufferSize, int maxRetries, String cannedACL, String storageClass) {
ByteSizeValue bufferSize, String cannedACL, String storageClass) {
super(settings);
this.client = client;
this.bucket = bucket;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.numberOfRetries = maxRetries;
this.storageClass = initStorageClass(storageClass);
// Note: the method client.doesBucketExist() may return 'true' is the bucket exists
@ -102,10 +99,6 @@ class S3BlobStore extends AbstractComponent implements BlobStore {
return bufferSize.bytesAsInt();
}
public int numberOfRetries() {
return numberOfRetries;
}
@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);

View File

@ -39,7 +39,6 @@ abstract class S3OutputStream extends OutputStream {
private S3BlobStore blobStore;
private String bucketName;
private String blobName;
private int numberOfRetries;
private boolean serverSideEncryption;
private byte[] buffer;
@ -48,11 +47,10 @@ abstract class S3OutputStream extends OutputStream {
private int flushCount = 0;
S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) {
this.blobStore = blobStore;
this.bucketName = bucketName;
this.blobName = blobName;
this.numberOfRetries = numberOfRetries;
this.serverSideEncryption = serverSideEncryption;
if (bufferSizeInBytes < MULTIPART_MIN_SIZE.getBytes()) {
@ -107,10 +105,6 @@ abstract class S3OutputStream extends OutputStream {
return buffer.length;
}
public int getNumberOfRetries() {
return numberOfRetries;
}
public boolean isServerSideEncryption() {
return serverSideEncryption;
}

View File

@ -311,8 +311,6 @@ class S3Repository extends BlobStoreRepository {
boolean serverSideEncryption = getValue(metadata.settings(), settings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING);
ByteSizeValue bufferSize = getValue(metadata.settings(), settings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING);
Integer maxRetries = getValue(metadata.settings(), settings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING);
boolean useThrottleRetries = getValue(metadata.settings(), settings, Repository.USE_THROTTLE_RETRIES_SETTING, Repositories.USE_THROTTLE_RETRIES_SETTING);
this.chunkSize = getValue(metadata.settings(), settings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
this.compress = getValue(metadata.settings(), settings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);
@ -326,21 +324,12 @@ class S3Repository extends BlobStoreRepository {
String storageClass = getValue(metadata.settings(), settings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
String cannedACL = getValue(metadata.settings(), settings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);
// If the user defined a path style access setting, we rely on it otherwise we use the default
// value set by the SDK
Boolean pathStyleAccess = null;
if (Repository.PATH_STYLE_ACCESS_SETTING.exists(metadata.settings()) ||
Repositories.PATH_STYLE_ACCESS_SETTING.exists(settings)) {
pathStyleAccess = getValue(metadata.settings(), settings, Repository.PATH_STYLE_ACCESS_SETTING, Repositories.PATH_STYLE_ACCESS_SETTING);
}
logger.debug("using bucket [{}], chunk_size [{}], server_side_encryption [{}], " +
"buffer_size [{}], max_retries [{}], use_throttle_retries [{}], cannedACL [{}], storageClass [{}], path_style_access [{}]",
bucket, chunkSize, serverSideEncryption, bufferSize, maxRetries, useThrottleRetries, cannedACL,
storageClass, pathStyleAccess);
"buffer_size [{}], cannedACL [{}], storageClass [{}]",
bucket, chunkSize, serverSideEncryption, bufferSize, cannedACL, storageClass);
AmazonS3 client = s3Service.client(metadata.settings(), maxRetries, useThrottleRetries, pathStyleAccess);
blobStore = new S3BlobStore(settings, client, bucket, serverSideEncryption, bufferSize, maxRetries, cannedACL, storageClass);
AmazonS3 client = s3Service.client(metadata, metadata.settings());
blobStore = new S3BlobStore(settings, client, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);
String basePath = getValue(metadata.settings(), settings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING);
if (Strings.hasLength(basePath)) {

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException;
@ -165,6 +166,7 @@ public abstract class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
.put(S3Repository.Repository.BASE_PATH_SETTING.getKey(), basePath)
.put(S3Repository.Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000))
.put(S3Repository.Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), true)
.put(S3Repository.Repository.USE_THROTTLE_RETRIES_SETTING.getKey(), randomBoolean())
.build();
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
@ -194,7 +196,8 @@ public abstract class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
Settings settings = internalCluster().getInstance(Settings.class);
Settings bucket = settings.getByPrefix("repositories.s3.");
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(repositorySettings, null, randomBoolean(), null);
RepositoryMetaData metadata = new RepositoryMetaData("test-repo", "fs", Settings.EMPTY);
AmazonS3 s3Client = internalCluster().getInstance(AwsS3Service.class).client(metadata, repositorySettings);
String bucketName = bucket.get("bucket");
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
@ -462,7 +465,9 @@ public abstract class AbstractS3SnapshotRestoreTest extends AbstractAwsTestCase
// We check that settings has been set in elasticsearch.yml integration test file
// as described in README
assertThat("Your settings in elasticsearch.yml are incorrects. Check README file.", bucketName, notNullValue());
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(Settings.EMPTY, null, randomBoolean(), null);
RepositoryMetaData metadata = new RepositoryMetaData("test-repo", "fs", Settings.EMPTY);
AmazonS3 client = internalCluster().getInstance(AwsS3Service.class).client(metadata,
Settings.builder().put(S3Repository.Repository.USE_THROTTLE_RETRIES_SETTING.getKey(), randomBoolean()).build());
try {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html

View File

@ -26,9 +26,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.s3.AwsS3Service;
import org.elasticsearch.repositories.s3.InternalAwsS3Service;
import org.elasticsearch.repositories.s3.S3Repository;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.instanceOf;

View File

@ -42,7 +42,7 @@ public class MockDefaultS3OutputStream extends DefaultS3OutputStream {
private int numberOfUploadRequests = 0;
public MockDefaultS3OutputStream(int bufferSizeInBytes) {
super(null, "test-bucket", "test-blobname", bufferSizeInBytes, 3, false);
super(null, "test-bucket", "test-blobname", bufferSizeInBytes, false);
}
@Override

View File

@ -34,6 +34,6 @@ public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase {
String bucket = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
return new S3BlobStore(Settings.EMPTY, client, bucket, false,
new ByteSizeValue(10, ByteSizeUnit.MB), 5, "public-read-write", "standard");
new ByteSizeValue(10, ByteSizeUnit.MB), "public-read-write", "standard");
}
}

View File

@ -60,8 +60,7 @@ public class S3RepositoryTests extends ESTestCase {
@Override
protected void doClose() {}
@Override
public AmazonS3 client(Settings repositorySettings, Integer maxRetries,
boolean useThrottleRetries, Boolean pathStyleAccess) {
public AmazonS3 client(RepositoryMetaData metadata, Settings settings) {
return new DummyS3Client();
}
}

View File

@ -23,6 +23,7 @@ import java.util.IdentityHashMap;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.settings.Settings;
public class TestAwsS3Service extends InternalAwsS3Service {
@ -33,7 +34,7 @@ public class TestAwsS3Service extends InternalAwsS3Service {
}
}
IdentityHashMap<AmazonS3, TestAmazonS3> clients = new IdentityHashMap<AmazonS3, TestAmazonS3>();
IdentityHashMap<AmazonS3, TestAmazonS3> clients = new IdentityHashMap<>();
public TestAwsS3Service(Settings settings) {
super(settings);
@ -41,9 +42,8 @@ public class TestAwsS3Service extends InternalAwsS3Service {
@Override
public synchronized AmazonS3 client(Settings repositorySettings, Integer maxRetries,
boolean useThrottleRetries, Boolean pathStyleAccess) {
return cachedWrapper(super.client(repositorySettings, maxRetries, useThrottleRetries, pathStyleAccess));
public synchronized AmazonS3 client(RepositoryMetaData metadata, Settings repositorySettings) {
return cachedWrapper(super.client(metadata, repositorySettings));
}
private AmazonS3 cachedWrapper(AmazonS3 client) {