From 9c4d6c629a5d6c00e9f617c8673dd44281bcc9c5 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 10 Nov 2017 12:22:33 +0100 Subject: [PATCH] Remove S3 output stream (#27280) Now the blob size information is available before writing anything, the repository implementation can know upfront what will be the more suitable API to upload the blob to S3. This commit removes the DefaultS3OutputStream and S3OutputStream classes and moves the implementation of the upload logic directly in the S3BlobContainer. related #26993 closes #26969 --- docs/plugins/repository-s3.asciidoc | 7 +- .../s3/DefaultS3OutputStream.java | 223 ----------- .../repositories/s3/S3BlobContainer.java | 194 +++++++++- .../repositories/s3/S3BlobStore.java | 4 +- .../repositories/s3/S3OutputStream.java | 119 ------ .../repositories/s3/S3Repository.java | 30 +- .../s3/MockDefaultS3OutputStream.java | 101 ----- .../s3/S3BlobStoreContainerTests.java | 360 +++++++++++++++++- .../repositories/s3/S3OutputStreamTests.java | 143 ------- 9 files changed, 566 insertions(+), 615 deletions(-) delete mode 100644 plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java delete mode 100644 plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3OutputStream.java delete mode 100644 plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockDefaultS3OutputStream.java delete mode 100644 plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3OutputStreamTests.java diff --git a/docs/plugins/repository-s3.asciidoc b/docs/plugins/repository-s3.asciidoc index cb7cc67ddbc..565c94f5a7d 100644 --- a/docs/plugins/repository-s3.asciidoc +++ b/docs/plugins/repository-s3.asciidoc @@ -175,9 +175,10 @@ The following settings are supported: http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html[AWS Multipart Upload API] to split the chunk into several parts, each of `buffer_size` length, and to upload each part in its own request. Note that setting a buffer - size lower than `5mb` is not allowed since it will prevents the use of the - Multipart API and may result in upload errors. Defaults to the minimum - between `100mb` and `5%` of the heap size. + size lower than `5mb` is not allowed since it will prevent the use of the + Multipart API and may result in upload errors. It is also not possible to + set a buffer size greater than `5gb` as it is the maximum upload size + allowed by S3. Defaults to the minimum between `100mb` and `5%` of the heap size. `canned_acl`:: diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java deleted file mode 100644 index 811f6e72141..00000000000 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/DefaultS3OutputStream.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.s3; - -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; -import com.amazonaws.util.Base64; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.security.DigestInputStream; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.List; - -/** - * DefaultS3OutputStream uploads data to the AWS S3 service using 2 modes: single and multi part. - *

- * When the length of the chunk is lower than buffer_size, the chunk is uploaded with a single request. - * Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size). - *

- * Quick facts about S3: - *

- * Maximum object size: 5 TB - * Maximum number of parts per upload: 10,000 - * Part numbers: 1 to 10,000 (inclusive) - * Part size: 5 MB to 5 GB, last part can be < 5 MB - *

- * See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html - * See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html - */ -class DefaultS3OutputStream extends S3OutputStream { - - private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); - private static final Logger logger = Loggers.getLogger("cloud.aws"); - /** - * Multipart Upload API data - */ - private String multipartId; - private int multipartChunks; - private List multiparts; - - DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) { - super(blobStore, bucketName, blobName, bufferSizeInBytes, serverSideEncryption); - } - - @Override - public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException { - SocketAccess.doPrivilegedIOException(() -> { - flushPrivileged(bytes, off, len, closing); - return null; - }); - } - - private void flushPrivileged(byte[] bytes, int off, int len, boolean closing) throws IOException { - if (len > MULTIPART_MAX_SIZE.getBytes()) { - throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3"); - } - - if (!closing) { - if (len < getBufferSize()) { - upload(bytes, off, len); - } else { - if (getFlushCount() == 0) { - initializeMultipart(); - } - uploadMultipart(bytes, off, len, false); - } - } else { - if (multipartId != null) { - uploadMultipart(bytes, off, len, true); - completeMultipart(); - } else { - upload(bytes, off, len); - } - } - } - - /** - * Upload data using a single request. - */ - private void upload(byte[] bytes, int off, int len) throws IOException { - try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { - try { - doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption()); - } catch (AmazonClientException e) { - throw new IOException("Unable to upload object " + getBlobName(), e); - } - } - } - - protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, - boolean serverSideEncryption) throws AmazonS3Exception { - ObjectMetadata md = new ObjectMetadata(); - if (serverSideEncryption) { - md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - } - md.setContentLength(length); - - PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, is, md) - .withStorageClass(blobStore.getStorageClass()) - .withCannedAcl(blobStore.getCannedACL()); - blobStore.client().putObject(putRequest); - - } - - private void initializeMultipart() { - while (multipartId == null) { - multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption()); - if (multipartId != null) { - multipartChunks = 1; - multiparts = new ArrayList<>(); - } - } - } - - protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) { - InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName) - .withCannedACL(blobStore.getCannedACL()) - .withStorageClass(blobStore.getStorageClass()); - - if (serverSideEncryption) { - ObjectMetadata md = new ObjectMetadata(); - md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - request.setObjectMetadata(md); - } - - return blobStore.client().initiateMultipartUpload(request).getUploadId(); - } - - private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException { - try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) { - try { - PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart); - multiparts.add(partETag); - multipartChunks++; - } catch (AmazonClientException e) { - abortMultipart(); - throw e; - } - } - } - - protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, - int length, boolean lastPart) throws AmazonS3Exception { - UploadPartRequest request = new UploadPartRequest() - .withBucketName(bucketName) - .withKey(blobName) - .withUploadId(uploadId) - .withPartNumber(multipartChunks) - .withInputStream(is) - .withPartSize(length) - .withLastPart(lastPart); - - UploadPartResult response = blobStore.client().uploadPart(request); - return response.getPartETag(); - - } - - private void completeMultipart() { - try { - doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts); - multipartId = null; - return; - } catch (AmazonClientException e) { - abortMultipart(); - throw e; - } - } - - protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List parts) - throws AmazonS3Exception { - CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId, parts); - blobStore.client().completeMultipartUpload(request); - } - - private void abortMultipart() { - if (multipartId != null) { - try { - doAbortMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId); - } finally { - multipartId = null; - } - } - } - - protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) - throws AmazonS3Exception { - blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId)); - } -} diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index f49f4b348f0..bb1130db42d 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -21,35 +21,48 @@ package org.elasticsearch.repositories.s3; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.collect.Tuple; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE; +import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART; +import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART; + class S3BlobContainer extends AbstractBlobContainer { - protected final S3BlobStore blobStore; - - protected final String keyPath; + private final S3BlobStore blobStore; + private final String keyPath; S3BlobContainer(BlobPath path, S3BlobStore blobStore) { super(path); @@ -91,9 +104,15 @@ class S3BlobContainer extends AbstractBlobContainer { if (blobExists(blobName)) { throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); } - try (OutputStream stream = createOutput(blobName)) { - Streams.copy(inputStream, stream); - } + + SocketAccess.doPrivilegedIOException(() -> { + if (blobSize <= blobStore.bufferSizeInBytes()) { + executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize); + } else { + executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize); + } + return null; + }); } @Override @@ -109,12 +128,6 @@ class S3BlobContainer extends AbstractBlobContainer { } } - private OutputStream createOutput(final String blobName) throws IOException { - // UploadS3OutputStream does buffering & retry logic internally - return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName), - blobStore.bufferSizeInBytes(), blobStore.serverSideEncryption()); - } - @Override public Map listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { return AccessController.doPrivileged((PrivilegedAction>) () -> { @@ -175,7 +188,158 @@ class S3BlobContainer extends AbstractBlobContainer { return listBlobsByPrefix(null); } - protected String buildKey(String blobName) { + private String buildKey(String blobName) { return keyPath + blobName; } + + /** + * Uploads a blob using a single upload request + */ + void executeSingleUpload(final S3BlobStore blobStore, + final String blobName, + final InputStream input, + final long blobSize) throws IOException { + + // Extra safety checks + if (blobSize > MAX_FILE_SIZE.getBytes()) { + throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE); + } + if (blobSize > blobStore.bufferSizeInBytes()) { + throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size"); + } + + try { + final ObjectMetadata md = new ObjectMetadata(); + md.setContentLength(blobSize); + if (blobStore.serverSideEncryption()) { + md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + } + + final PutObjectRequest putRequest = new PutObjectRequest(blobStore.bucket(), blobName, input, md); + putRequest.setStorageClass(blobStore.getStorageClass()); + putRequest.setCannedAcl(blobStore.getCannedACL()); + + blobStore.client().putObject(putRequest); + } catch (AmazonClientException e) { + throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e); + } + } + + /** + * Uploads a blob using multipart upload requests. + */ + void executeMultipartUpload(final S3BlobStore blobStore, + final String blobName, + final InputStream input, + final long blobSize) throws IOException { + + if (blobSize > MAX_FILE_SIZE_USING_MULTIPART.getBytes()) { + throw new IllegalArgumentException("Multipart upload request size [" + blobSize + + "] can't be larger than " + MAX_FILE_SIZE_USING_MULTIPART); + } + if (blobSize < MIN_PART_SIZE_USING_MULTIPART.getBytes()) { + throw new IllegalArgumentException("Multipart upload request size [" + blobSize + + "] can't be smaller than " + MIN_PART_SIZE_USING_MULTIPART); + } + + final long partSize = blobStore.bufferSizeInBytes(); + final Tuple multiparts = numberOfMultiparts(blobSize, partSize); + + if (multiparts.v1() > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Too many multipart upload requests, maybe try a larger buffer size?"); + } + + final int nbParts = multiparts.v1().intValue(); + final long lastPartSize = multiparts.v2(); + assert blobSize == (nbParts - 1) * partSize + lastPartSize : "blobSize does not match multipart sizes"; + + final SetOnce uploadId = new SetOnce<>(); + final String bucketName = blobStore.bucket(); + boolean success = false; + + try { + final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, blobName); + initRequest.setStorageClass(blobStore.getStorageClass()); + initRequest.setCannedACL(blobStore.getCannedACL()); + if (blobStore.serverSideEncryption()) { + final ObjectMetadata md = new ObjectMetadata(); + md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + initRequest.setObjectMetadata(md); + } + + uploadId.set(blobStore.client().initiateMultipartUpload(initRequest).getUploadId()); + if (Strings.isEmpty(uploadId.get())) { + throw new IOException("Failed to initialize multipart upload " + blobName); + } + + final List parts = new ArrayList<>(); + + long bytesCount = 0; + for (int i = 1; i <= nbParts; i++) { + final UploadPartRequest uploadRequest = new UploadPartRequest(); + uploadRequest.setBucketName(bucketName); + uploadRequest.setKey(blobName); + uploadRequest.setUploadId(uploadId.get()); + uploadRequest.setPartNumber(i); + uploadRequest.setInputStream(input); + + if (i < nbParts) { + uploadRequest.setPartSize(partSize); + uploadRequest.setLastPart(false); + } else { + uploadRequest.setPartSize(lastPartSize); + uploadRequest.setLastPart(true); + } + bytesCount += uploadRequest.getPartSize(); + + final UploadPartResult uploadResponse = blobStore.client().uploadPart(uploadRequest); + parts.add(uploadResponse.getPartETag()); + } + + if (bytesCount != blobSize) { + throw new IOException("Failed to execute multipart upload for [" + blobName + "], expected " + blobSize + + "bytes sent but got " + bytesCount); + } + + CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId.get(), parts); + blobStore.client().completeMultipartUpload(complRequest); + success = true; + + } catch (AmazonClientException e) { + throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e); + } finally { + if (success == false && Strings.hasLength(uploadId.get())) { + final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get()); + blobStore.client().abortMultipartUpload(abortRequest); + } + } + } + + /** + * Returns the number parts of size of {@code partSize} needed to reach {@code totalSize}, + * along with the size of the last (or unique) part. + * + * @param totalSize the total size + * @param partSize the part size + * @return a {@link Tuple} containing the number of parts to fill {@code totalSize} and + * the size of the last part + */ + static Tuple numberOfMultiparts(final long totalSize, final long partSize) { + if (partSize <= 0) { + throw new IllegalArgumentException("Part size must be greater than zero"); + } + + if (totalSize == 0L || totalSize <= partSize) { + return Tuple.tuple(1L, totalSize); + } + + final long parts = totalSize / partSize; + final long remaining = totalSize % partSize; + + if (remaining == 0) { + return Tuple.tuple(parts, partSize); + } else { + return Tuple.tuple(parts + 1, remaining); + } + } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index d951b31c07d..27349f12135 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -93,8 +93,8 @@ class S3BlobStore extends AbstractComponent implements BlobStore { return serverSideEncryption; } - public int bufferSizeInBytes() { - return bufferSize.bytesAsInt(); + public long bufferSizeInBytes() { + return bufferSize.getBytes(); } @Override diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3OutputStream.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3OutputStream.java deleted file mode 100644 index 46c9108f1b5..00000000000 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3OutputStream.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.s3; - -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * S3OutputStream buffers data before flushing it to an underlying S3OutputStream. - */ -abstract class S3OutputStream extends OutputStream { - - /** - * Limit of upload allowed by AWS S3. - */ - protected static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); - protected static final ByteSizeValue MULTIPART_MIN_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB); - - private S3BlobStore blobStore; - private String bucketName; - private String blobName; - private boolean serverSideEncryption; - - private byte[] buffer; - private int count; - private long length; - - private int flushCount = 0; - - S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) { - this.blobStore = blobStore; - this.bucketName = bucketName; - this.blobName = blobName; - this.serverSideEncryption = serverSideEncryption; - - if (bufferSizeInBytes < MULTIPART_MIN_SIZE.getBytes()) { - throw new IllegalArgumentException("Buffer size can't be smaller than " + MULTIPART_MIN_SIZE); - } - if (bufferSizeInBytes > MULTIPART_MAX_SIZE.getBytes()) { - throw new IllegalArgumentException("Buffer size can't be larger than " + MULTIPART_MAX_SIZE); - } - - this.buffer = new byte[bufferSizeInBytes]; - } - - public abstract void flush(byte[] bytes, int off, int len, boolean closing) throws IOException; - - private void flushBuffer(boolean closing) throws IOException { - flush(buffer, 0, count, closing); - flushCount++; - count = 0; - } - - @Override - public void write(int b) throws IOException { - if (count >= buffer.length) { - flushBuffer(false); - } - - buffer[count++] = (byte) b; - length++; - } - - @Override - public void close() throws IOException { - if (count > 0) { - flushBuffer(true); - count = 0; - } - } - - public S3BlobStore getBlobStore() { - return blobStore; - } - - public String getBucketName() { - return bucketName; - } - - public String getBlobName() { - return blobName; - } - - public int getBufferSize() { - return buffer.length; - } - - public boolean isServerSideEncryption() { - return serverSideEncryption; - } - - public long getLength() { - return length; - } - - public int getFlushCount() { - return flushCount; - } -} diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index eeca906ff49..51bb6f2024c 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -19,8 +19,6 @@ package org.elasticsearch.repositories.s3; -import java.io.IOException; - import com.amazonaws.services.s3.AmazonS3; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Strings; @@ -37,6 +35,8 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import java.io.IOException; + /** * Shared file system implementation of the BlobStoreRepository *

@@ -80,14 +80,36 @@ class S3Repository extends BlobStoreRepository { */ static final Setting SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("server_side_encryption", false); + /** + * Maximum size of files that can be uploaded using a single upload request. + */ + static final ByteSizeValue MAX_FILE_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB); + + /** + * Minimum size of parts that can be uploaded using the Multipart Upload API. + * (see http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html) + */ + static final ByteSizeValue MIN_PART_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.MB); + + /** + * Maximum size of parts that can be uploaded using the Multipart Upload API. + * (see http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html) + */ + static final ByteSizeValue MAX_PART_SIZE_USING_MULTIPART = MAX_FILE_SIZE; + + /** + * Maximum size of files that can be uploaded using the Multipart Upload API. + */ + static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB); + /** * Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, * the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and * to upload each part in its own request. Note that setting a buffer size lower than 5mb is not allowed since it will prevents the * use of the Multipart API and may result in upload errors. Defaults to the minimum between 100MB and 5% of the heap size. */ - static final Setting BUFFER_SIZE_SETTING = Setting.byteSizeSetting("buffer_size", DEFAULT_BUFFER_SIZE, - new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB)); + static final Setting BUFFER_SIZE_SETTING = + Setting.byteSizeSetting("buffer_size", DEFAULT_BUFFER_SIZE, MIN_PART_SIZE_USING_MULTIPART, MAX_PART_SIZE_USING_MULTIPART); /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g. diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockDefaultS3OutputStream.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockDefaultS3OutputStream.java deleted file mode 100644 index 3a48b70e307..00000000000 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockDefaultS3OutputStream.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.s3; - -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.PartETag; -import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.repositories.s3.DefaultS3OutputStream; -import org.elasticsearch.repositories.s3.S3BlobStore; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; - -public class MockDefaultS3OutputStream extends DefaultS3OutputStream { - - private ByteArrayOutputStream out = new ByteArrayOutputStream(); - - private boolean initialized = false; - private boolean completed = false; - private boolean aborted = false; - - private int numberOfUploadRequests = 0; - - public MockDefaultS3OutputStream(int bufferSizeInBytes) { - super(null, "test-bucket", "test-blobname", bufferSizeInBytes, false); - } - - @Override - protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, boolean serverSideEncryption) throws AmazonS3Exception { - try { - long copied = Streams.copy(is, out); - if (copied != length) { - throw new AmazonS3Exception("Not all the bytes were copied"); - } - numberOfUploadRequests++; - } catch (IOException e) { - throw new AmazonS3Exception(e.getMessage()); - } - } - - @Override - protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) { - initialized = true; - return RandomizedTest.randomAsciiOfLength(50); - } - - @Override - protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, int length, boolean lastPart) throws AmazonS3Exception { - try { - long copied = Streams.copy(is, out); - if (copied != length) { - throw new AmazonS3Exception("Not all the bytes were copied"); - } - return new PartETag(numberOfUploadRequests++, RandomizedTest.randomAsciiOfLength(50)); - } catch (IOException e) { - throw new AmazonS3Exception(e.getMessage()); - } - } - - @Override - protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List parts) throws AmazonS3Exception { - completed = true; - } - - @Override - protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) throws AmazonS3Exception { - aborted = true; - } - - public int getNumberOfUploadRequests() { - return numberOfUploadRequests; - } - - public boolean isMultipart() { - return (numberOfUploadRequests > 1) && initialized && completed && !aborted; - } - - public byte[] toByteArray() { - return out.toByteArray(); - } -} diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java index 45ffac30aa7..c8d546b0974 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java @@ -19,10 +19,24 @@ package org.elasticsearch.repositories.s3; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.Logger; +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.StorageClass; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -30,16 +44,29 @@ import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.repositories.ESBlobStoreContainerTestCase; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.mockito.ArgumentCaptor; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase { - private static final Logger logger = Loggers.getLogger(S3BlobStoreContainerTests.class); - private static ServerSocket mockS3ServerSocket; private static Thread mockS3AcceptorThread; @@ -69,6 +96,329 @@ public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase { new ByteSizeValue(10, ByteSizeUnit.MB), "public-read-write", "standard"); } + public void testExecuteSingleUploadBlobSizeTooLarge() throws IOException { + final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(6, 10)); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)); + assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage()); + } + + public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() throws IOException { + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bufferSizeInBytes()).thenReturn(ByteSizeUnit.MB.toBytes(1)); + + final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore); + final String blobName = randomAlphaOfLengthBetween(1, 10); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + blobContainer.executeSingleUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), ByteSizeUnit.MB.toBytes(2))); + assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage()); + } + + public void testExecuteSingleUpload() throws IOException { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + + final BlobPath blobPath = new BlobPath(); + if (randomBoolean()) { + IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); + } + + final int bufferSize = randomIntBetween(1024, 2048); + final int blobSize = randomIntBetween(0, bufferSize); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.bufferSizeInBytes()).thenReturn((long) bufferSize); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + final boolean serverSideEncryption = randomBoolean(); + when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); + + final StorageClass storageClass = randomFrom(StorageClass.values()); + when(blobStore.getStorageClass()).thenReturn(storageClass); + + final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null; + if (cannedAccessControlList != null) { + when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); + } + + final AmazonS3 client = mock(AmazonS3.class); + when(blobStore.client()).thenReturn(client); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + when(client.putObject(argumentCaptor.capture())).thenReturn(new PutObjectResult()); + + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]); + blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize); + + final PutObjectRequest request = argumentCaptor.getValue(); + assertEquals(bucketName, request.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, request.getKey()); + assertEquals(inputStream, request.getInputStream()); + assertEquals(blobSize, request.getMetadata().getContentLength()); + assertEquals(storageClass.toString(), request.getStorageClass()); + assertEquals(cannedAccessControlList, request.getCannedAcl()); + if (serverSideEncryption) { + assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, request.getMetadata().getSSEAlgorithm()); + } + } + + public void testExecuteMultipartUploadBlobSizeTooLarge() throws IOException { + final long blobSize = ByteSizeUnit.TB.toBytes(randomIntBetween(6, 10)); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + ); + assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage()); + } + + public void testExecuteMultipartUploadBlobSizeTooSmall() throws IOException { + final long blobSize = ByteSizeUnit.MB.toBytes(randomIntBetween(1, 4)); + final S3BlobStore blobStore = mock(S3BlobStore.class); + final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + ); + assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage()); + } + + public void testExecuteMultipartUpload() throws IOException { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + + final BlobPath blobPath = new BlobPath(); + if (randomBoolean()) { + IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); + } + + final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(1, 1024)); + final long bufferSize = ByteSizeUnit.MB.toBytes(randomIntBetween(5, 1024)); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + + final boolean serverSideEncryption = randomBoolean(); + when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); + + final StorageClass storageClass = randomFrom(StorageClass.values()); + when(blobStore.getStorageClass()).thenReturn(storageClass); + + final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null; + if (cannedAccessControlList != null) { + when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); + } + + final AmazonS3 client = mock(AmazonS3.class); + when(blobStore.client()).thenReturn(client); + + final ArgumentCaptor initArgCaptor = ArgumentCaptor.forClass(InitiateMultipartUploadRequest.class); + final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult(); + initResult.setUploadId(randomAlphaOfLength(10)); + when(client.initiateMultipartUpload(initArgCaptor.capture())).thenReturn(initResult); + + final ArgumentCaptor uploadArgCaptor = ArgumentCaptor.forClass(UploadPartRequest.class); + + final List expectedEtags = new ArrayList<>(); + long partSize = Math.min(bufferSize, blobSize); + long totalBytes = 0; + do { + expectedEtags.add(randomAlphaOfLength(50)); + totalBytes += partSize; + } while (totalBytes < blobSize); + + when(client.uploadPart(uploadArgCaptor.capture())).thenAnswer(invocationOnMock -> { + final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0]; + final UploadPartResult response = new UploadPartResult(); + response.setPartNumber(request.getPartNumber()); + response.setETag(expectedEtags.get(request.getPartNumber() - 1)); + return response; + }); + + final ArgumentCaptor compArgCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class); + when(client.completeMultipartUpload(compArgCaptor.capture())).thenReturn(new CompleteMultipartUploadResult()); + + final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize); + + final InitiateMultipartUploadRequest initRequest = initArgCaptor.getValue(); + assertEquals(bucketName, initRequest.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, initRequest.getKey()); + assertEquals(storageClass, initRequest.getStorageClass()); + assertEquals(cannedAccessControlList, initRequest.getCannedACL()); + if (serverSideEncryption) { + assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, initRequest.getObjectMetadata().getSSEAlgorithm()); + } + + final Tuple numberOfParts = S3BlobContainer.numberOfMultiparts(blobSize, bufferSize); + + final List uploadRequests = uploadArgCaptor.getAllValues(); + assertEquals(numberOfParts.v1().intValue(), uploadRequests.size()); + + for (int i = 0; i < uploadRequests.size(); i++) { + UploadPartRequest uploadRequest = uploadRequests.get(i); + + assertEquals(bucketName, uploadRequest.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, uploadRequest.getKey()); + assertEquals(initResult.getUploadId(), uploadRequest.getUploadId()); + assertEquals(i + 1, uploadRequest.getPartNumber()); + assertEquals(inputStream, uploadRequest.getInputStream()); + + if (i == (uploadRequests.size() -1)) { + assertTrue(uploadRequest.isLastPart()); + assertEquals(numberOfParts.v2().longValue(), uploadRequest.getPartSize()); + } else { + assertFalse(uploadRequest.isLastPart()); + assertEquals(bufferSize, uploadRequest.getPartSize()); + } + } + + final CompleteMultipartUploadRequest compRequest = compArgCaptor.getValue(); + assertEquals(bucketName, compRequest.getBucketName()); + assertEquals(blobPath.buildAsString() + blobName, compRequest.getKey()); + assertEquals(initResult.getUploadId(), compRequest.getUploadId()); + + List actualETags = compRequest.getPartETags().stream().map(PartETag::getETag).collect(Collectors.toList()); + assertEquals(expectedEtags, actualETags); + } + + public void testExecuteMultipartUploadAborted() throws IOException { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final long blobSize = ByteSizeUnit.MB.toBytes(765); + final long bufferSize = ByteSizeUnit.MB.toBytes(150); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + when(blobStore.getStorageClass()).thenReturn(randomFrom(StorageClass.values())); + + final AmazonS3 client = mock(AmazonS3.class); + when(blobStore.client()).thenReturn(client); + + final String uploadId = randomAlphaOfLength(25); + + final int stage = randomInt(2); + final List exceptions = Arrays.asList( + new AmazonClientException("Expected initialization request to fail"), + new AmazonClientException("Expected upload part request to fail"), + new AmazonClientException("Expected completion request to fail") + ); + + if (stage == 0) { + // Fail the initialization request + when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))) + .thenThrow(exceptions.get(stage)); + + } else if (stage == 1) { + final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult(); + initResult.setUploadId(uploadId); + when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(initResult); + + // Fail the upload part request + when(client.uploadPart(any(UploadPartRequest.class))) + .thenThrow(exceptions.get(stage)); + + } else { + final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult(); + initResult.setUploadId(uploadId); + when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(initResult); + + when(client.uploadPart(any(UploadPartRequest.class))).thenAnswer(invocationOnMock -> { + final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0]; + final UploadPartResult response = new UploadPartResult(); + response.setPartNumber(request.getPartNumber()); + response.setETag(randomAlphaOfLength(20)); + return response; + }); + + // Fail the completion request + when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) + .thenThrow(exceptions.get(stage)); + } + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AbortMultipartUploadRequest.class); + doNothing().when(client).abortMultipartUpload(argumentCaptor.capture()); + + final IOException e = expectThrows(IOException.class, () -> { + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize); + }); + + assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage()); + assertThat(e.getCause(), instanceOf(AmazonClientException.class)); + assertEquals(exceptions.get(stage).getMessage(), e.getCause().getMessage()); + + if (stage == 0) { + verify(client, times(1)).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)); + verify(client, times(0)).uploadPart(any(UploadPartRequest.class)); + verify(client, times(0)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); + verify(client, times(0)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + + } else { + verify(client, times(1)).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)); + + if (stage == 1) { + verify(client, times(1)).uploadPart(any(UploadPartRequest.class)); + verify(client, times(0)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); + } else { + verify(client, times(6)).uploadPart(any(UploadPartRequest.class)); + verify(client, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); + } + + verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + + final AbortMultipartUploadRequest abortRequest = argumentCaptor.getValue(); + assertEquals(bucketName, abortRequest.getBucketName()); + assertEquals(blobName, abortRequest.getKey()); + assertEquals(uploadId, abortRequest.getUploadId()); + } + } + + public void testNumberOfMultipartsWithZeroPartSize() { + IllegalArgumentException e = + expectThrows(IllegalArgumentException.class, () -> S3BlobContainer.numberOfMultiparts(randomNonNegativeLong(), 0L)); + assertEquals("Part size must be greater than zero", e.getMessage()); + } + + public void testNumberOfMultiparts() { + final ByteSizeUnit unit = randomFrom(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB, ByteSizeUnit.GB); + final long size = unit.toBytes(randomIntBetween(1, 10)); + final int factor = randomIntBetween(2, 10); + + // Fits in 1 empty part + assertNumberOfMultiparts(1, 0L, 0L, size); + + // Fits in 1 part exactly + assertNumberOfMultiparts(1, size, size, size); + assertNumberOfMultiparts(1, size, size, size * factor); + + // Fits in N parts exactly + assertNumberOfMultiparts(factor, size, size * factor, size); + + // Fits in N parts plus a bit more + final long remaining = randomIntBetween(1, (size > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) size - 1); + assertNumberOfMultiparts(factor + 1, remaining, size * factor + remaining, size); + } + + private static void assertNumberOfMultiparts(final int expectedParts, final long expectedRemaining, long totalSize, long partSize) { + final Tuple result = S3BlobContainer.numberOfMultiparts(totalSize, partSize); + + assertEquals("Expected number of parts [" + expectedParts + "] but got [" + result.v1() + "]", expectedParts, (long) result.v1()); + assertEquals("Expected remaining [" + expectedRemaining + "] but got [" + result.v2() + "]", expectedRemaining, (long) result.v2()); + } + @AfterClass public static void closeMockSocket() throws IOException, InterruptedException { mockS3ServerSocket.close(); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3OutputStreamTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3OutputStreamTests.java deleted file mode 100644 index 8f4c7daea7e..00000000000 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3OutputStreamTests.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.repositories.s3; - -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.test.ESTestCase; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Arrays; - -import static org.elasticsearch.common.io.Streams.copy; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; - -/** - * Unit test for {@link S3OutputStream}. - */ -public class S3OutputStreamTests extends ESTestCase { - private static final int BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB).bytesAsInt(); - - public void testWriteLessDataThanBufferSize() throws IOException { - MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE); - byte[] content = randomUnicodeOfLengthBetween(1, 512).getBytes("UTF-8"); - copy(content, out); - - // Checks length & content - assertThat(out.getLength(), equalTo((long) content.length)); - assertThat(Arrays.equals(content, out.toByteArray()), equalTo(true)); - - // Checks single/multi part upload - assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE)); - assertThat(out.getFlushCount(), equalTo(1)); - assertThat(out.getNumberOfUploadRequests(), equalTo(1)); - assertFalse(out.isMultipart()); - - } - - public void testWriteSameDataThanBufferSize() throws IOException { - int size = randomIntBetween(BUFFER_SIZE, 2 * BUFFER_SIZE); - MockDefaultS3OutputStream out = newS3OutputStream(size); - - ByteArrayOutputStream content = new ByteArrayOutputStream(size); - for (int i = 0; i < size; i++) { - content.write(randomByte()); - } - copy(content.toByteArray(), out); - - // Checks length & content - assertThat(out.getLength(), equalTo((long) size)); - assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); - - // Checks single/multi part upload - assertThat(out.getBufferSize(), equalTo(size)); - assertThat(out.getFlushCount(), equalTo(1)); - assertThat(out.getNumberOfUploadRequests(), equalTo(1)); - assertFalse(out.isMultipart()); - - } - - public void testWriteExactlyNTimesMoreDataThanBufferSize() throws IOException { - int n = randomIntBetween(2, 3); - int length = n * BUFFER_SIZE; - ByteArrayOutputStream content = new ByteArrayOutputStream(length); - - for (int i = 0; i < length; i++) { - content.write(randomByte()); - } - - MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE); - copy(content.toByteArray(), out); - - // Checks length & content - assertThat(out.getLength(), equalTo((long) length)); - assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); - - // Checks single/multi part upload - assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE)); - assertThat(out.getFlushCount(), equalTo(n)); - - assertThat(out.getNumberOfUploadRequests(), equalTo(n)); - assertTrue(out.isMultipart()); - } - - public void testWriteRandomNumberOfBytes() throws IOException { - Integer randomBufferSize = randomIntBetween(BUFFER_SIZE, 2 * BUFFER_SIZE); - MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize); - - Integer randomLength = randomIntBetween(1, 2 * BUFFER_SIZE); - ByteArrayOutputStream content = new ByteArrayOutputStream(randomLength); - for (int i = 0; i < randomLength; i++) { - content.write(randomByte()); - } - - copy(content.toByteArray(), out); - - // Checks length & content - assertThat(out.getLength(), equalTo((long) randomLength)); - assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true)); - - assertThat(out.getBufferSize(), equalTo(randomBufferSize)); - int times = (int) Math.ceil(randomLength.doubleValue() / randomBufferSize.doubleValue()); - assertThat(out.getFlushCount(), equalTo(times)); - if (times > 1) { - assertTrue(out.isMultipart()); - } else { - assertFalse(out.isMultipart()); - } - } - - public void testWrongBufferSize() throws IOException { - Integer randomBufferSize = randomIntBetween(1, 4 * 1024 * 1024); - try { - newS3OutputStream(randomBufferSize); - fail("Buffer size can't be smaller than 5mb"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), is("Buffer size can't be smaller than 5mb")); - } - } - - private MockDefaultS3OutputStream newS3OutputStream(int bufferSizeInBytes) { - return new MockDefaultS3OutputStream(bufferSizeInBytes); - } - -}