mirror of
synced 2025-02-25 22:36:20 +00:00
BlobStore: BlobContainer interface changed in elasticsearch 1.4.0
Adding a S3OutputStream that upload blobs to the S3 Storage service with two modes (single/multipart). When the length of the chunk is lower than buffer_size (default to 5mb), the chunk is uploaded with a single request. Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size). For example, when uploading a blob (say, 1Gb) with chunk_size set for accepting large chunks (chunk_size = 5Gb) and buffer_size set to 100Mb, the blob will be sent into 10 multiple parts, each of ~100Mb. Each part upload may failed independently and will be retried 3 times. Closes #117
This commit is contained in:
@ -158,6 +158,7 @@ The following settings are supported:
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`.
* `server_side_encryption`: When set to `true` files are encrypted on server side using AES256 algorithm. Defaults to `false`.
* `buffer_size`: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, the S3 repository will use the [AWS Multipart Upload API](http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html) to split the chunk into several parts, each of `buffer_size` length, and to upload each part in its own request. Note that positionning a buffer size lower than `5mb` is not allowed since it will prevents the use of the Multipart API and may result in upload errors. Defaults to `5mb`.
* `max_retries`: Number of retries in case of S3 errors. Defaults to `3`.
The S3 repositories are using the same credentials as the rest of the AWS services provided by this plugin (`discovery`).
@ -34,6 +34,7 @@
@ -85,7 +86,7 @@
<!-- jackson is optional -->
@ -22,6 +22,7 @@ package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
import com.amazonaws.http.IdleConnectionReaper;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
@ -192,5 +193,8 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent<AwsS3Servic
for (AmazonS3Client client : clients.values()) {
// Ensure that IdleConnectionReaper is shutdown
@ -0,0 +1,225 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.model.*;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
* DefaultS3OutputStream uploads data to the AWS S3 service using 2 modes: single and multi part.
* <p/>
* When the length of the chunk is lower than buffer_size, the chunk is uploaded with a single request.
* Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size).
* <p/>
* Quick facts about S3:
* <p/>
* Maximum object size: 5 TB
* Maximum number of parts per upload: 10,000
* Part numbers: 1 to 10,000 (inclusive)
* Part size: 5 MB to 5 GB, last part can be < 5 MB
* <p/>
* See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
* See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html
public class DefaultS3OutputStream extends S3OutputStream {
private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
* Multipart Upload API data
private String multipartId;
private int multipartChunks;
private List<PartETag> multiparts;
public DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
super(blobStore, bucketName, blobName, bufferSizeInBytes, numberOfRetries, serverSideEncryption);
public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException {
if (len > MULTIPART_MAX_SIZE.getBytes()) {
throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3");
if (!closing) {
if (len < getBufferSize()) {
upload(bytes, off, len);
} else {
if (getFlushCount() == 0) {
uploadMultipart(bytes, off, len, false);
} else {
if (multipartId != null) {
uploadMultipart(bytes, off, len, true);
} else {
upload(bytes, off, len);
* Upload data using a single request.
* @param bytes
* @param off
* @param len
* @throws IOException
private void upload(byte[] bytes, int off, int len) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry < getNumberOfRetries()) {
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
} catch (AmazonS3Exception e) {
if (shouldRetry(e)) {
} else {
throw new IOException("Unable to upload object " + getBlobName(), e);
protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length,
boolean serverSideEncryption) throws AmazonS3Exception {
ObjectMetadata md = new ObjectMetadata();
if (serverSideEncryption) {
blobStore.client().putObject(bucketName, blobName, is, md);
private void initializeMultipart() {
if (multipartId == null) {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName);
if (serverSideEncryption) {
ObjectMetadata md = new ObjectMetadata();
return blobStore.client().initiateMultipartUpload(request).getUploadId();
private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
int retry = 0;
while (retry < getNumberOfRetries()) {
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
} catch (AmazonS3Exception e) {
if (shouldRetry(e) && retry < getNumberOfRetries()) {
} else {
throw e;
protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is,
int length, boolean lastPart) throws AmazonS3Exception {
UploadPartRequest request = new UploadPartRequest()
UploadPartResult response = blobStore.client().uploadPart(request);
return response.getPartETag();
private void completeMultipart() {
int retry = 0;
while (retry < getNumberOfRetries()) {
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
} catch (AmazonS3Exception e) {
if (shouldRetry(e) && retry < getNumberOfRetries()) {
} else {
throw e;
protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List<PartETag> parts)
throws AmazonS3Exception {
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId, parts);
private void abortMultipart() {
if (multipartId != null) {
try {
doAbortMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId);
} finally {
multipartId = null;
protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId)
throws AmazonS3Exception {
blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId));
protected boolean shouldRetry(AmazonS3Exception e) {
return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode());
@ -23,7 +23,6 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
@ -35,17 +34,18 @@ import org.elasticsearch.common.collect.ImmutableMap;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class AbstractS3BlobContainer extends AbstractBlobContainer {
public class S3BlobContainer extends AbstractBlobContainer {
protected final S3BlobStore blobStore;
protected final String keyPath;
public AbstractS3BlobContainer(BlobPath path, S3BlobStore blobStore) {
public S3BlobContainer(BlobPath path, S3BlobStore blobStore) {
this.blobStore = blobStore;
String keyPath = path.buildAsString("/");
@ -74,38 +74,22 @@ public class AbstractS3BlobContainer extends AbstractBlobContainer {
public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executor().execute(new Runnable() {
public void run() {
InputStream is;
try {
S3Object object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
is = object.getObjectContent();
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
listener.onFailure(new FileNotFoundException(e.getMessage()));
} else {
} catch (Throwable e) {
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
try {
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
} catch (Throwable e) {
public InputStream openInput(String blobName) throws IOException {
try {
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
return s3Object.getObjectContent();
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
throw new FileNotFoundException(e.getMessage());
throw e;
public OutputStream createOutput(final String blobName) throws IOException {
// UploadS3OutputStream does buffering internally
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
@ -145,8 +129,4 @@ public class AbstractS3BlobContainer extends AbstractBlobContainer {
return keyPath + blobName;
protected boolean shouldRetry(AmazonS3Exception e) {
return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode());
@ -25,46 +25,53 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.concurrent.Executor;
public class S3BlobStore extends AbstractComponent implements BlobStore {
public static final ByteSizeValue MIN_BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB);
private final AmazonS3 client;
private final String bucket;
private final String region;
private final ThreadPool threadPool;
private final int bufferSizeInBytes;
private final ByteSizeValue bufferSize;
private final boolean serverSideEncryption;
private final int numberOfRetries;
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, ThreadPool threadPool, boolean serverSideEncryption) {
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, String region, boolean serverSideEncryption) {
this(settings, client, bucket, region, serverSideEncryption, null);
public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, ByteSizeValue bufferSize) {
this.client = client;
this.bucket = bucket;
this.region = region;
this.threadPool = threadPool;
this.serverSideEncryption = serverSideEncryption;
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
this.bufferSize = (bufferSize != null) ? bufferSize : MIN_BUFFER_SIZE;
if (this.bufferSize.getBytes() < MIN_BUFFER_SIZE.getBytes()) {
throw new BlobStoreException("\"Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
this.numberOfRetries = settings.getAsInt("max_retries", 3);
if (!client.doesBucketExist(bucket)) {
if (region != null) {
@ -88,14 +95,10 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
return bucket;
public Executor executor() {
return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA);
public boolean serverSideEncryption() { return serverSideEncryption; }
public int bufferSizeInBytes() {
return bufferSizeInBytes;
return bufferSize.bytesAsInt();
public int numberOfRetries() {
@ -103,8 +106,8 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new S3ImmutableBlobContainer(path, this);
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
@ -1,85 +0,0 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.BlobStores;
import java.io.IOException;
import java.io.InputStream;
public class S3ImmutableBlobContainer extends AbstractS3BlobContainer implements ImmutableBlobContainer {
public S3ImmutableBlobContainer(BlobPath path, S3BlobStore blobStore) {
super(path, blobStore);
public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
blobStore.executor().execute(new Runnable() {
public void run() {
int retry = 0;
// Read limit is ignored by InputStreamIndexInput, but we will set it anyway in case
// implementation will change
while (true) {
try {
ObjectMetadata md = new ObjectMetadata();
if (blobStore.serverSideEncryption()) {
blobStore.client().putObject(blobStore.bucket(), buildKey(blobName), is, md);
} catch (AmazonS3Exception e) {
if (shouldRetry(e) && retry < blobStore.numberOfRetries()) {
try {
} catch (IOException ex) {
} else {
} catch (Throwable e) {
public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
@ -0,0 +1,125 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.cloud.aws.blobstore;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException;
import java.io.OutputStream;
* S3OutputStream buffers data before flushing it to an underlying S3OutputStream.
public abstract class S3OutputStream extends OutputStream {
* Limit of upload allowed by AWS S3.
protected static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
protected static final ByteSizeValue MULTIPART_MIN_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB);
private S3BlobStore blobStore;
private String bucketName;
private String blobName;
private int numberOfRetries;
private boolean serverSideEncryption;
private byte[] buffer;
private int count;
private long length;
private int flushCount = 0;
public S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) {
this.blobStore = blobStore;
this.bucketName = bucketName;
this.blobName = blobName;
this.numberOfRetries = numberOfRetries;
this.serverSideEncryption = serverSideEncryption;
if (bufferSizeInBytes < MULTIPART_MIN_SIZE.getBytes()) {
throw new IllegalArgumentException("Buffer size can't be smaller than " + MULTIPART_MIN_SIZE);
if (bufferSizeInBytes > MULTIPART_MAX_SIZE.getBytes()) {
throw new IllegalArgumentException("Buffer size can't be larger than " + MULTIPART_MAX_SIZE);
this.buffer = new byte[bufferSizeInBytes];
public abstract void flush(byte[] bytes, int off, int len, boolean closing) throws IOException;
private void flushBuffer(boolean closing) throws IOException {
flush(buffer, 0, count, closing);
count = 0;
public void write(int b) throws IOException {
if (count >= buffer.length) {
buffer[count++] = (byte) b;
public void close() throws IOException {
if (count > 0) {
count = 0;
public S3BlobStore getBlobStore() {
return blobStore;
public String getBucketName() {
return bucketName;
public String getBlobName() {
return blobName;
public int getBufferSize() {
return buffer.length;
public int getNumberOfRetries() {
return numberOfRetries;
public boolean isServerSideEncryption() {
return serverSideEncryption;
public long getLength() {
return length;
public int getFlushCount() {
return flushCount;
@ -19,7 +19,6 @@
package org.elasticsearch.discovery.ec2;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
@ -48,7 +47,7 @@ public class Ec2Discovery extends ZenDiscovery {
DiscoveryNodeService discoveryNodeService, AwsEc2Service ec2Service, DiscoverySettings discoverySettings,
ElectMasterService electMasterService) {
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService,
discoveryNodeService, pingService, electMasterService, Version.CURRENT, discoverySettings);
discoveryNodeService, pingService, electMasterService, discoverySettings);
if (settings.getAsBoolean("cloud.enabled", true)) {
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
UnicastZenPing unicastZenPing = null;
@ -32,7 +32,6 @@ import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Locale;
@ -72,7 +71,7 @@ public class S3Repository extends BlobStoreRepository {
* @throws IOException
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service, ThreadPool threadPool) throws IOException {
public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);
String bucket = repositorySettings.settings().get("bucket", componentSettings.get("bucket"));
@ -120,8 +119,9 @@ public class S3Repository extends BlobStoreRepository {
boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false));
logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}]", bucket, region, chunkSize, serverSideEncryption);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, threadPool, serverSideEncryption);
ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null));
logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}]", bucket, region, chunkSize, serverSideEncryption, bufferSize);
blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, serverSideEncryption, bufferSize);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
String basePath = repositorySettings.settings().get("base_path", null);
@ -165,5 +165,4 @@ public class S3Repository extends BlobStoreRepository {
return chunkSize;
@ -0,0 +1,99 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.PartETag;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.elasticsearch.common.io.Streams;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
public class MockDefaultS3OutputStream extends DefaultS3OutputStream {
private ByteArrayOutputStream out = new ByteArrayOutputStream();
private boolean initialized = false;
private boolean completed = false;
private boolean aborted = false;
private int numberOfUploadRequests = 0;
public MockDefaultS3OutputStream(int bufferSizeInBytes) {
super(null, "test-bucket", "test-blobname", bufferSizeInBytes, 3, false);
protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, boolean serverSideEncryption) throws AmazonS3Exception {
try {
long copied = Streams.copy(is, out);
if (copied != length) {
throw new AmazonS3Exception("Not all the bytes were copied");
} catch (IOException e) {
throw new AmazonS3Exception(e.getMessage());
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
initialized = true;
return RandomizedTest.randomAsciiOfLength(50);
protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, int length, boolean lastPart) throws AmazonS3Exception {
try {
long copied = Streams.copy(is, out);
if (copied != length) {
throw new AmazonS3Exception("Not all the bytes were copied");
return new PartETag(numberOfUploadRequests++, RandomizedTest.randomAsciiOfLength(50));
} catch (IOException e) {
throw new AmazonS3Exception(e.getMessage());
protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List<PartETag> parts) throws AmazonS3Exception {
completed = true;
protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) throws AmazonS3Exception {
aborted = true;
public int getNumberOfUploadRequests() {
return numberOfUploadRequests;
public boolean isMultipart() {
return (numberOfUploadRequests > 1) && initialized && completed && !aborted;
public byte[] toByteArray() {
return out.toByteArray();
@ -0,0 +1,144 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.cloud.aws.blobstore;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.common.io.Streams.copy;
import static org.hamcrest.Matchers.equalTo;
* Unit test for {@link S3OutputStream}.
public class S3OutputStreamTest extends ElasticsearchTestCase {
private static final int BUFFER_SIZE = S3BlobStore.MIN_BUFFER_SIZE.bytesAsInt();
public void testWriteLessDataThanBufferSize() throws IOException {
MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE);
byte[] content = randomUnicodeOfLengthBetween(1, 512).getBytes(Charsets.UTF_8);
copy(content, out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) content.length));
assertThat(Arrays.equals(content, out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE));
assertThat(out.getFlushCount(), equalTo(1));
assertThat(out.getNumberOfUploadRequests(), equalTo(1));
public void testWriteSameDataThanBufferSize() throws IOException {
int size = randomIntBetween(BUFFER_SIZE, 10 * BUFFER_SIZE);
MockDefaultS3OutputStream out = newS3OutputStream(size);
ByteArrayOutputStream content = new ByteArrayOutputStream(size);
for (int i = 0; i < size; i++) {
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) size));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(size));
assertThat(out.getFlushCount(), equalTo(1));
assertThat(out.getNumberOfUploadRequests(), equalTo(1));
public void testWriteExactlyNTimesMoreDataThanBufferSize() throws IOException {
int n = randomIntBetween(2, 10);
int length = n * BUFFER_SIZE;
ByteArrayOutputStream content = new ByteArrayOutputStream(length);
for (int i = 0; i < length; i++) {
MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE);
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) length));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE));
assertThat(out.getFlushCount(), equalTo(n));
assertThat(out.getNumberOfUploadRequests(), equalTo(n));
public void testWriteRandomNumberOfBytes() throws IOException {
Integer randomBufferSize = randomIntBetween(BUFFER_SIZE, 5 * BUFFER_SIZE);
MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize);
Integer randomLength = randomIntBetween(1, 10 * BUFFER_SIZE);
ByteArrayOutputStream content = new ByteArrayOutputStream(randomLength);
for (int i = 0; i < randomLength; i++) {
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) randomLength));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
assertThat(out.getBufferSize(), equalTo(randomBufferSize));
int times = (int) Math.ceil(randomLength.doubleValue() / randomBufferSize.doubleValue());
assertThat(out.getFlushCount(), equalTo(times));
if (times > 1) {
} else {
@Test(expected = IllegalArgumentException.class)
public void testWrongBufferSize() throws IOException {
Integer randomBufferSize = randomIntBetween(1, 4 * 1024 * 1024);
MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize);
fail("Buffer size can't be smaller than 5mb");
private MockDefaultS3OutputStream newS3OutputStream(int bufferSizeInBytes) {
return new MockDefaultS3OutputStream(bufferSizeInBytes);
@ -34,9 +34,9 @@ import org.elasticsearch.cloud.aws.AwsS3Service;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
@ -247,19 +247,17 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest {
* This test verifies that the test configuration is set up in a manner that
* does not make the test {@link #testRepositoryWithCustomCredentials()} pointless.
@Test(expected = UncategorizedExecutionException.class)
@Test(expected = RepositoryVerificationException.class)
public void assertRepositoryWithCustomCredentialsIsNotAccessibleByDefaultCredentials() {
Client client = client();
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
fail("repository verification should have raise an exception!");
@ -284,21 +282,20 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest {
* This test verifies that the test configuration is set up in a manner that
* does not make the test {@link #testRepositoryInRemoteRegion()} pointless.
@Test(expected = UncategorizedExecutionException.class)
@Test(expected = RepositoryVerificationException.class)
public void assertRepositoryInRemoteRegionIsRemote() {
Client client = client();
Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.remote-bucket.");
logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.put("base_path", basePath)
.put("bucket", bucketSettings.get("bucket"))
// Below setting intentionally omitted to assert bucket is not available in default region.
// .put("region", privateBucketSettings.get("region"))
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
assertRepositoryIsOperational(client, "test-repo");
fail("repository verification should have raise an exception!");
Reference in New Issue
Block a user