diff --git a/README.md b/README.md index 013a61fe800..299b42f0550 100644 --- a/README.md +++ b/README.md @@ -158,6 +158,7 @@ The following settings are supported: * `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `100m`. * `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index files that are already compressed by default. Defaults to `false`. * `server_side_encryption`: When set to `true` files are encrypted on server side using AES256 algorithm. Defaults to `false`. +* `buffer_size`: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, the S3 repository will use the [AWS Multipart Upload API](http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html) to split the chunk into several parts, each of `buffer_size` length, and to upload each part in its own request. Note that positionning a buffer size lower than `5mb` is not allowed since it will prevents the use of the Multipart API and may result in upload errors. Defaults to `5mb`. * `max_retries`: Number of retries in case of S3 errors. Defaults to `3`. The S3 repositories are using the same credentials as the rest of the AWS services provided by this plugin (`discovery`). diff --git a/pom.xml b/pom.xml index 53c5a8a3256..e38947701b6 100644 --- a/pom.xml +++ b/pom.xml @@ -34,6 +34,7 @@ 2.0.0-SNAPSHOT 4.10.1 + 1.7.13 onerror true onerror @@ -85,7 +86,7 @@ com.amazonaws aws-java-sdk - 1.7.13 + ${amazonaws.version} compile diff --git a/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java b/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java index 3759ba969e5..75efa1601f2 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java +++ b/src/main/java/org/elasticsearch/cloud/aws/InternalAwsS3Service.java @@ -22,6 +22,7 @@ package org.elasticsearch.cloud.aws; import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.*; +import com.amazonaws.http.IdleConnectionReaper; import com.amazonaws.internal.StaticCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; @@ -192,5 +193,8 @@ public class InternalAwsS3Service extends AbstractLifecycleComponent + * When the length of the chunk is lower than buffer_size, the chunk is uploaded with a single request. + * Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size). + *

+ * Quick facts about S3: + *

+ * Maximum object size: 5 TB + * Maximum number of parts per upload: 10,000 + * Part numbers: 1 to 10,000 (inclusive) + * Part size: 5 MB to 5 GB, last part can be < 5 MB + *

+ * See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html + * See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html + */ +public class DefaultS3OutputStream extends S3OutputStream { + + private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); + + /** + * Multipart Upload API data + */ + private String multipartId; + private int multipartChunks; + private List multiparts; + + public DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) { + super(blobStore, bucketName, blobName, bufferSizeInBytes, numberOfRetries, serverSideEncryption); + } + + @Override + public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException { + if (len > MULTIPART_MAX_SIZE.getBytes()) { + throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3"); + } + + if (!closing) { + if (len < getBufferSize()) { + upload(bytes, off, len); + } else { + if (getFlushCount() == 0) { + initializeMultipart(); + } + uploadMultipart(bytes, off, len, false); + } + } else { + if (multipartId != null) { + uploadMultipart(bytes, off, len, true); + completeMultipart(); + } else { + upload(bytes, off, len); + } + } + } + + /** + * Upload data using a single request. + * + * @param bytes + * @param off + * @param len + * @throws IOException + */ + private void upload(byte[] bytes, int off, int len) throws IOException { + try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { + int retry = 0; + while (retry < getNumberOfRetries()) { + try { + doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption()); + break; + } catch (AmazonS3Exception e) { + if (shouldRetry(e)) { + is.reset(); + retry++; + } else { + throw new IOException("Unable to upload object " + getBlobName(), e); + } + } + } + } + } + + protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, + boolean serverSideEncryption) throws AmazonS3Exception { + ObjectMetadata md = new ObjectMetadata(); + if (serverSideEncryption) { + md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + } + md.setContentLength(length); + blobStore.client().putObject(bucketName, blobName, is, md); + } + + private void initializeMultipart() { + if (multipartId == null) { + multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption()); + if (multipartId != null) { + multipartChunks = 1; + multiparts = new ArrayList<>(); + } + } + } + + protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) { + InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName); + if (serverSideEncryption) { + ObjectMetadata md = new ObjectMetadata(); + md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + request.setObjectMetadata(md); + } + return blobStore.client().initiateMultipartUpload(request).getUploadId(); + } + + private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException { + try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { + int retry = 0; + while (retry < getNumberOfRetries()) { + try { + PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart); + multiparts.add(partETag); + multipartChunks++; + return; + } catch (AmazonS3Exception e) { + if (shouldRetry(e) && retry < getNumberOfRetries()) { + is.reset(); + retry++; + } else { + abortMultipart(); + throw e; + } + } + } + } + } + + protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, + int length, boolean lastPart) throws AmazonS3Exception { + UploadPartRequest request = new UploadPartRequest() + .withBucketName(bucketName) + .withKey(blobName) + .withUploadId(uploadId) + .withPartNumber(multipartChunks) + .withInputStream(is) + .withPartSize(length) + .withLastPart(lastPart); + + UploadPartResult response = blobStore.client().uploadPart(request); + return response.getPartETag(); + + } + + private void completeMultipart() { + int retry = 0; + while (retry < getNumberOfRetries()) { + try { + doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts); + multipartId = null; + return; + } catch (AmazonS3Exception e) { + if (shouldRetry(e) && retry < getNumberOfRetries()) { + retry++; + } else { + abortMultipart(); + throw e; + } + } + } + } + + protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List parts) + throws AmazonS3Exception { + CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId, parts); + blobStore.client().completeMultipartUpload(request); + } + + private void abortMultipart() { + if (multipartId != null) { + try { + doAbortMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId); + } finally { + multipartId = null; + } + } + } + + protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) + throws AmazonS3Exception { + blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId)); + } + + protected boolean shouldRetry(AmazonS3Exception e) { + return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode()); + } +} diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstractS3BlobContainer.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java similarity index 70% rename from src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstractS3BlobContainer.java rename to src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java index b2dfcda9119..d5a10079f6d 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstractS3BlobContainer.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java @@ -23,7 +23,6 @@ import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -35,17 +34,18 @@ import org.elasticsearch.common.collect.ImmutableMap; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; /** * */ -public class AbstractS3BlobContainer extends AbstractBlobContainer { +public class S3BlobContainer extends AbstractBlobContainer { protected final S3BlobStore blobStore; protected final String keyPath; - public AbstractS3BlobContainer(BlobPath path, S3BlobStore blobStore) { + public S3BlobContainer(BlobPath path, S3BlobStore blobStore) { super(path); this.blobStore = blobStore; String keyPath = path.buildAsString("/"); @@ -74,38 +74,22 @@ public class AbstractS3BlobContainer extends AbstractBlobContainer { } @Override - public void readBlob(final String blobName, final ReadBlobListener listener) { - blobStore.executor().execute(new Runnable() { - @Override - public void run() { - InputStream is; - try { - S3Object object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)); - is = object.getObjectContent(); - } catch (AmazonS3Exception e) { - if (e.getStatusCode() == 404) { - listener.onFailure(new FileNotFoundException(e.getMessage())); - } else { - listener.onFailure(e); - } - return; - } catch (Throwable e) { - listener.onFailure(e); - return; - } - byte[] buffer = new byte[blobStore.bufferSizeInBytes()]; - try { - int bytesRead; - while ((bytesRead = is.read(buffer)) != -1) { - listener.onPartial(buffer, 0, bytesRead); - } - listener.onCompleted(); - } catch (Throwable e) { - IOUtils.closeWhileHandlingException(is); - listener.onFailure(e); - } + public InputStream openInput(String blobName) throws IOException { + try { + S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)); + return s3Object.getObjectContent(); + } catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) { + throw new FileNotFoundException(e.getMessage()); } - }); + throw e; + } + } + + @Override + public OutputStream createOutput(final String blobName) throws IOException { + // UploadS3OutputStream does buffering internally + return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption()); } @Override @@ -145,8 +129,4 @@ public class AbstractS3BlobContainer extends AbstractBlobContainer { return keyPath + blobName; } - protected boolean shouldRetry(AmazonS3Exception e) { - return e.getStatusCode() == 400 && "RequestTimeout".equals(e.getErrorCode()); - } - } diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java index e060fb55a05..10ce6c7373f 100644 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java @@ -25,46 +25,53 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; +import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; -import java.util.concurrent.Executor; /** * */ public class S3BlobStore extends AbstractComponent implements BlobStore { + public static final ByteSizeValue MIN_BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB); + private final AmazonS3 client; private final String bucket; private final String region; - private final ThreadPool threadPool; - - private final int bufferSizeInBytes; + private final ByteSizeValue bufferSize; private final boolean serverSideEncryption; private final int numberOfRetries; - public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, ThreadPool threadPool, boolean serverSideEncryption) { + + public S3BlobStore(Settings settings, AmazonS3 client, String bucket, String region, boolean serverSideEncryption) { + this(settings, client, bucket, region, serverSideEncryption, null); + } + + public S3BlobStore(Settings settings, AmazonS3 client, String bucket, @Nullable String region, boolean serverSideEncryption, ByteSizeValue bufferSize) { super(settings); this.client = client; this.bucket = bucket; this.region = region; - this.threadPool = threadPool; this.serverSideEncryption = serverSideEncryption; - this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); + this.bufferSize = (bufferSize != null) ? bufferSize : MIN_BUFFER_SIZE; + if (this.bufferSize.getBytes() < MIN_BUFFER_SIZE.getBytes()) { + throw new BlobStoreException("\"Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]"); + } + this.numberOfRetries = settings.getAsInt("max_retries", 3); if (!client.doesBucketExist(bucket)) { if (region != null) { @@ -88,14 +95,10 @@ public class S3BlobStore extends AbstractComponent implements BlobStore { return bucket; } - public Executor executor() { - return threadPool.executor(ThreadPool.Names.SNAPSHOT_DATA); - } - public boolean serverSideEncryption() { return serverSideEncryption; } public int bufferSizeInBytes() { - return bufferSizeInBytes; + return bufferSize.bytesAsInt(); } public int numberOfRetries() { @@ -103,8 +106,8 @@ public class S3BlobStore extends AbstractComponent implements BlobStore { } @Override - public ImmutableBlobContainer immutableBlobContainer(BlobPath path) { - return new S3ImmutableBlobContainer(path, this); + public BlobContainer blobContainer(BlobPath path) { + return new S3BlobContainer(path, this); } @Override diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3ImmutableBlobContainer.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3ImmutableBlobContainer.java deleted file mode 100644 index b011e72a98b..00000000000 --- a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3ImmutableBlobContainer.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cloud.aws.blobstore; - -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.ObjectMetadata; -import org.elasticsearch.common.blobstore.BlobPath; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; -import org.elasticsearch.common.blobstore.support.BlobStores; - -import java.io.IOException; -import java.io.InputStream; - -/** - * - */ -public class S3ImmutableBlobContainer extends AbstractS3BlobContainer implements ImmutableBlobContainer { - - public S3ImmutableBlobContainer(BlobPath path, S3BlobStore blobStore) { - super(path, blobStore); - } - - @Override - public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) { - blobStore.executor().execute(new Runnable() { - @Override - public void run() { - int retry = 0; - // Read limit is ignored by InputStreamIndexInput, but we will set it anyway in case - // implementation will change - is.mark(Integer.MAX_VALUE); - while (true) { - try { - ObjectMetadata md = new ObjectMetadata(); - if (blobStore.serverSideEncryption()) { - md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - } - md.setContentLength(sizeInBytes); - blobStore.client().putObject(blobStore.bucket(), buildKey(blobName), is, md); - listener.onCompleted(); - return; - } catch (AmazonS3Exception e) { - if (shouldRetry(e) && retry < blobStore.numberOfRetries()) { - try { - is.reset(); - } catch (IOException ex) { - listener.onFailure(e); - return; - } - retry++; - } else { - listener.onFailure(e); - return; - } - } catch (Throwable e) { - listener.onFailure(e); - return; - } - } - } - }); - } - - @Override - public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException { - BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes); - } -} diff --git a/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3OutputStream.java b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3OutputStream.java new file mode 100644 index 00000000000..a1b66ad4e9b --- /dev/null +++ b/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3OutputStream.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.aws.blobstore; + +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * S3OutputStream buffers data before flushing it to an underlying S3OutputStream. + */ +public abstract class S3OutputStream extends OutputStream { + + /** + * Limit of upload allowed by AWS S3. + */ + protected static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); + protected static final ByteSizeValue MULTIPART_MIN_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB); + + private S3BlobStore blobStore; + private String bucketName; + private String blobName; + private int numberOfRetries; + private boolean serverSideEncryption; + + private byte[] buffer; + private int count; + private long length; + + private int flushCount = 0; + + public S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, int numberOfRetries, boolean serverSideEncryption) { + this.blobStore = blobStore; + this.bucketName = bucketName; + this.blobName = blobName; + this.numberOfRetries = numberOfRetries; + this.serverSideEncryption = serverSideEncryption; + + if (bufferSizeInBytes < MULTIPART_MIN_SIZE.getBytes()) { + throw new IllegalArgumentException("Buffer size can't be smaller than " + MULTIPART_MIN_SIZE); + } + if (bufferSizeInBytes > MULTIPART_MAX_SIZE.getBytes()) { + throw new IllegalArgumentException("Buffer size can't be larger than " + MULTIPART_MAX_SIZE); + } + + this.buffer = new byte[bufferSizeInBytes]; + } + + public abstract void flush(byte[] bytes, int off, int len, boolean closing) throws IOException; + + private void flushBuffer(boolean closing) throws IOException { + flush(buffer, 0, count, closing); + flushCount++; + count = 0; + } + + @Override + public void write(int b) throws IOException { + if (count >= buffer.length) { + flushBuffer(false); + } + + buffer[count++] = (byte) b; + length++; + } + + @Override + public void close() throws IOException { + if (count > 0) { + flushBuffer(true); + count = 0; + } + } + + public S3BlobStore getBlobStore() { + return blobStore; + } + + public String getBucketName() { + return bucketName; + } + + public String getBlobName() { + return blobName; + } + + public int getBufferSize() { + return buffer.length; + } + + public int getNumberOfRetries() { + return numberOfRetries; + } + + public boolean isServerSideEncryption() { + return serverSideEncryption; + } + + public long getLength() { + return length; + } + + public int getFlushCount() { + return flushCount; + } +} diff --git a/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java b/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java index 16287e3c752..64bb57ac9fe 100755 --- a/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java +++ b/src/main/java/org/elasticsearch/discovery/ec2/Ec2Discovery.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery.ec2; -import org.elasticsearch.Version; import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; @@ -48,7 +47,7 @@ public class Ec2Discovery extends ZenDiscovery { DiscoveryNodeService discoveryNodeService, AwsEc2Service ec2Service, DiscoverySettings discoverySettings, ElectMasterService electMasterService) { super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService, - discoveryNodeService, pingService, electMasterService, Version.CURRENT, discoverySettings); + discoveryNodeService, pingService, electMasterService, discoverySettings); if (settings.getAsBoolean("cloud.enabled", true)) { ImmutableList zenPings = pingService.zenPings(); UnicastZenPing unicastZenPing = null; diff --git a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 48096d32e3f..4948364e758 100644 --- a/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -32,7 +32,6 @@ import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Locale; @@ -72,7 +71,7 @@ public class S3Repository extends BlobStoreRepository { * @throws IOException */ @Inject - public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service, ThreadPool threadPool) throws IOException { + public S3Repository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, AwsS3Service s3Service) throws IOException { super(name.getName(), repositorySettings, indexShardRepository); String bucket = repositorySettings.settings().get("bucket", componentSettings.get("bucket")); @@ -120,8 +119,9 @@ public class S3Repository extends BlobStoreRepository { } boolean serverSideEncryption = repositorySettings.settings().getAsBoolean("server_side_encryption", componentSettings.getAsBoolean("server_side_encryption", false)); - logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}]", bucket, region, chunkSize, serverSideEncryption); - blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, threadPool, serverSideEncryption); + ByteSizeValue bufferSize = repositorySettings.settings().getAsBytesSize("buffer_size", componentSettings.getAsBytesSize("buffer_size", null)); + logger.debug("using bucket [{}], region [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}]", bucket, region, chunkSize, serverSideEncryption, bufferSize); + blobStore = new S3BlobStore(settings, s3Service.client(region, repositorySettings.settings().get("access_key"), repositorySettings.settings().get("secret_key")), bucket, region, serverSideEncryption, bufferSize); this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB))); this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false)); String basePath = repositorySettings.settings().get("base_path", null); @@ -165,5 +165,4 @@ public class S3Repository extends BlobStoreRepository { return chunkSize; } - } diff --git a/src/test/java/org/elasticsearch/cloud/aws/blobstore/MockDefaultS3OutputStream.java b/src/test/java/org/elasticsearch/cloud/aws/blobstore/MockDefaultS3OutputStream.java new file mode 100644 index 00000000000..cd2450f0e9d --- /dev/null +++ b/src/test/java/org/elasticsearch/cloud/aws/blobstore/MockDefaultS3OutputStream.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.aws.blobstore; + +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.PartETag; +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.elasticsearch.common.io.Streams; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +public class MockDefaultS3OutputStream extends DefaultS3OutputStream { + + private ByteArrayOutputStream out = new ByteArrayOutputStream(); + + private boolean initialized = false; + private boolean completed = false; + private boolean aborted = false; + + private int numberOfUploadRequests = 0; + + public MockDefaultS3OutputStream(int bufferSizeInBytes) { + super(null, "test-bucket", "test-blobname", bufferSizeInBytes, 3, false); + } + + @Override + protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, boolean serverSideEncryption) throws AmazonS3Exception { + try { + long copied = Streams.copy(is, out); + if (copied != length) { + throw new AmazonS3Exception("Not all the bytes were copied"); + } + numberOfUploadRequests++; + } catch (IOException e) { + throw new AmazonS3Exception(e.getMessage()); + } + } + + @Override + protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) { + initialized = true; + return RandomizedTest.randomAsciiOfLength(50); + } + + @Override + protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, int length, boolean lastPart) throws AmazonS3Exception { + try { + long copied = Streams.copy(is, out); + if (copied != length) { + throw new AmazonS3Exception("Not all the bytes were copied"); + } + return new PartETag(numberOfUploadRequests++, RandomizedTest.randomAsciiOfLength(50)); + } catch (IOException e) { + throw new AmazonS3Exception(e.getMessage()); + } + } + + @Override + protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List parts) throws AmazonS3Exception { + completed = true; + } + + @Override + protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) throws AmazonS3Exception { + aborted = true; + } + + public int getNumberOfUploadRequests() { + return numberOfUploadRequests; + } + + public boolean isMultipart() { + return (numberOfUploadRequests > 1) && initialized && completed && !aborted; + } + + public byte[] toByteArray() { + return out.toByteArray(); + } +} diff --git a/src/test/java/org/elasticsearch/cloud/aws/blobstore/S3OutputStreamTest.java b/src/test/java/org/elasticsearch/cloud/aws/blobstore/S3OutputStreamTest.java new file mode 100644 index 00000000000..d7081a70e9a --- /dev/null +++ b/src/test/java/org/elasticsearch/cloud/aws/blobstore/S3OutputStreamTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cloud.aws.blobstore; + +import org.elasticsearch.common.base.Charsets; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.elasticsearch.common.io.Streams.copy; +import static org.hamcrest.Matchers.equalTo; + +/** + * Unit test for {@link S3OutputStream}. + */ +public class S3OutputStreamTest extends ElasticsearchTestCase { + + private static final int BUFFER_SIZE = S3BlobStore.MIN_BUFFER_SIZE.bytesAsInt(); + + @Test + public void testWriteLessDataThanBufferSize() throws IOException { + MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE); + byte[] content = randomUnicodeOfLengthBetween(1, 512).getBytes(Charsets.UTF_8); + copy(content, out); + + // Checks length & content + assertThat(out.getLength(), equalTo((long) content.length)); + assertThat(Arrays.equals(content, out.toByteArray()), equalTo(true)); + + // Checks single/multi part upload + assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE)); + assertThat(out.getFlushCount(), equalTo(1)); + assertThat(out.getNumberOfUploadRequests(), equalTo(1)); + assertFalse(out.isMultipart()); + + } + + @Test + public void testWriteSameDataThanBufferSize() throws IOException { + int size = randomIntBetween(BUFFER_SIZE, 10 * BUFFER_SIZE); + MockDefaultS3OutputStream out = newS3OutputStream(size); + + ByteArrayOutputStream content = new ByteArrayOutputStream(size); + for (int i = 0; i < size; i++) { + content.write(randomByte()); + } + copy(content.toByteArray(), out); + + // Checks length & content + assertThat(out.getLength(), equalTo((long) size)); + assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); + + // Checks single/multi part upload + assertThat(out.getBufferSize(), equalTo(size)); + assertThat(out.getFlushCount(), equalTo(1)); + assertThat(out.getNumberOfUploadRequests(), equalTo(1)); + assertFalse(out.isMultipart()); + + } + + @Test + public void testWriteExactlyNTimesMoreDataThanBufferSize() throws IOException { + int n = randomIntBetween(2, 10); + int length = n * BUFFER_SIZE; + ByteArrayOutputStream content = new ByteArrayOutputStream(length); + + for (int i = 0; i < length; i++) { + content.write(randomByte()); + } + + MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE); + copy(content.toByteArray(), out); + + // Checks length & content + assertThat(out.getLength(), equalTo((long) length)); + assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); + + // Checks single/multi part upload + assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE)); + assertThat(out.getFlushCount(), equalTo(n)); + + assertThat(out.getNumberOfUploadRequests(), equalTo(n)); + assertTrue(out.isMultipart()); + } + + @Test + public void testWriteRandomNumberOfBytes() throws IOException { + Integer randomBufferSize = randomIntBetween(BUFFER_SIZE, 5 * BUFFER_SIZE); + MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize); + + Integer randomLength = randomIntBetween(1, 10 * BUFFER_SIZE); + ByteArrayOutputStream content = new ByteArrayOutputStream(randomLength); + for (int i = 0; i < randomLength; i++) { + content.write(randomByte()); + } + + copy(content.toByteArray(), out); + + // Checks length & content + assertThat(out.getLength(), equalTo((long) randomLength)); + assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); + + assertThat(out.getBufferSize(), equalTo(randomBufferSize)); + int times = (int) Math.ceil(randomLength.doubleValue() / randomBufferSize.doubleValue()); + assertThat(out.getFlushCount(), equalTo(times)); + if (times > 1) { + assertTrue(out.isMultipart()); + } else { + assertFalse(out.isMultipart()); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testWrongBufferSize() throws IOException { + Integer randomBufferSize = randomIntBetween(1, 4 * 1024 * 1024); + MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize); + fail("Buffer size can't be smaller than 5mb"); + } + + private MockDefaultS3OutputStream newS3OutputStream(int bufferSizeInBytes) { + return new MockDefaultS3OutputStream(bufferSizeInBytes); + } + +} diff --git a/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java b/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java index 65d8ce8a5fa..b2e59c8b123 100644 --- a/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java +++ b/src/test/java/org/elasticsearch/repositories/s3/AbstractS3SnapshotRestoreTest.java @@ -34,9 +34,9 @@ import org.elasticsearch.cloud.aws.AwsS3Service; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; @@ -247,19 +247,17 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { * This test verifies that the test configuration is set up in a manner that * does not make the test {@link #testRepositoryWithCustomCredentials()} pointless. */ - @Test(expected = UncategorizedExecutionException.class) + @Test(expected = RepositoryVerificationException.class) public void assertRepositoryWithCustomCredentialsIsNotAccessibleByDefaultCredentials() { Client client = client(); Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.private-bucket."); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); - PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + client.admin().cluster().preparePutRepository("test-repo") .setType("s3").setSettings(ImmutableSettings.settingsBuilder() .put("base_path", basePath) .put("bucket", bucketSettings.get("bucket")) ).get(); - assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); - - assertRepositoryIsOperational(client, "test-repo"); + fail("repository verification should have raise an exception!"); } @Test @@ -284,21 +282,20 @@ abstract public class AbstractS3SnapshotRestoreTest extends AbstractAwsTest { * This test verifies that the test configuration is set up in a manner that * does not make the test {@link #testRepositoryInRemoteRegion()} pointless. */ - @Test(expected = UncategorizedExecutionException.class) + @Test(expected = RepositoryVerificationException.class) public void assertRepositoryInRemoteRegionIsRemote() { Client client = client(); Settings bucketSettings = internalCluster().getInstance(Settings.class).getByPrefix("repositories.s3.remote-bucket."); logger.info("--> creating s3 repository with bucket[{}] and path [{}]", bucketSettings.get("bucket"), basePath); - PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") + client.admin().cluster().preparePutRepository("test-repo") .setType("s3").setSettings(ImmutableSettings.settingsBuilder() .put("base_path", basePath) .put("bucket", bucketSettings.get("bucket")) // Below setting intentionally omitted to assert bucket is not available in default region. // .put("region", privateBucketSettings.get("region")) ).get(); - assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); - assertRepositoryIsOperational(client, "test-repo"); + fail("repository verification should have raise an exception!"); } @Test