Remove S3 output stream (#27280)

Now the blob size information is available before writing anything, 
the repository implementation can know upfront what will be the 
more suitable API to upload the blob to S3.

This commit removes the DefaultS3OutputStream and S3OutputStream 
classes and moves the implementation of the upload logic directly in the 
S3BlobContainer.

related #26993
closes #26969
This commit is contained in:
Tanguy Leroux 2017-11-10 12:22:33 +01:00 committed by GitHub
parent 4f43fe70cb
commit 9c4d6c629a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 566 additions and 615 deletions

View File

@ -175,9 +175,10 @@ The following settings are supported:
http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html[AWS Multipart Upload API]
to split the chunk into several parts, each of `buffer_size` length, and
to upload each part in its own request. Note that setting a buffer
size lower than `5mb` is not allowed since it will prevents the use of the
Multipart API and may result in upload errors. Defaults to the minimum
between `100mb` and `5%` of the heap size.
size lower than `5mb` is not allowed since it will prevent the use of the
Multipart API and may result in upload errors. It is also not possible to
set a buffer size greater than `5gb` as it is the maximum upload size
allowed by S3. Defaults to the minimum between `100mb` and `5%` of the heap size.
`canned_acl`::

View File

@ -1,223 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.util.Base64;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
/**
* DefaultS3OutputStream uploads data to the AWS S3 service using 2 modes: single and multi part.
* <p>
* When the length of the chunk is lower than buffer_size, the chunk is uploaded with a single request.
* Otherwise multiple requests are made, each of buffer_size (except the last one which can be lower than buffer_size).
* <p>
* Quick facts about S3:
* <p>
* Maximum object size: 5 TB
* Maximum number of parts per upload: 10,000
* Part numbers: 1 to 10,000 (inclusive)
* Part size: 5 MB to 5 GB, last part can be &lt; 5 MB
* <p>
* See http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
* See http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html
*/
class DefaultS3OutputStream extends S3OutputStream {
private static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
private static final Logger logger = Loggers.getLogger("cloud.aws");
/**
* Multipart Upload API data
*/
private String multipartId;
private int multipartChunks;
private List<PartETag> multiparts;
DefaultS3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) {
super(blobStore, bucketName, blobName, bufferSizeInBytes, serverSideEncryption);
}
@Override
public void flush(byte[] bytes, int off, int len, boolean closing) throws IOException {
SocketAccess.doPrivilegedIOException(() -> {
flushPrivileged(bytes, off, len, closing);
return null;
});
}
private void flushPrivileged(byte[] bytes, int off, int len, boolean closing) throws IOException {
if (len > MULTIPART_MAX_SIZE.getBytes()) {
throw new IOException("Unable to upload files larger than " + MULTIPART_MAX_SIZE + " to Amazon S3");
}
if (!closing) {
if (len < getBufferSize()) {
upload(bytes, off, len);
} else {
if (getFlushCount() == 0) {
initializeMultipart();
}
uploadMultipart(bytes, off, len, false);
}
} else {
if (multipartId != null) {
uploadMultipart(bytes, off, len, true);
completeMultipart();
} else {
upload(bytes, off, len);
}
}
}
/**
* Upload data using a single request.
*/
private void upload(byte[] bytes, int off, int len) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
try {
doUpload(getBlobStore(), getBucketName(), getBlobName(), is, len, isServerSideEncryption());
} catch (AmazonClientException e) {
throw new IOException("Unable to upload object " + getBlobName(), e);
}
}
}
protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length,
boolean serverSideEncryption) throws AmazonS3Exception {
ObjectMetadata md = new ObjectMetadata();
if (serverSideEncryption) {
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
md.setContentLength(length);
PutObjectRequest putRequest = new PutObjectRequest(bucketName, blobName, is, md)
.withStorageClass(blobStore.getStorageClass())
.withCannedAcl(blobStore.getCannedACL());
blobStore.client().putObject(putRequest);
}
private void initializeMultipart() {
while (multipartId == null) {
multipartId = doInitialize(getBlobStore(), getBucketName(), getBlobName(), isServerSideEncryption());
if (multipartId != null) {
multipartChunks = 1;
multiparts = new ArrayList<>();
}
}
}
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, blobName)
.withCannedACL(blobStore.getCannedACL())
.withStorageClass(blobStore.getStorageClass());
if (serverSideEncryption) {
ObjectMetadata md = new ObjectMetadata();
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
request.setObjectMetadata(md);
}
return blobStore.client().initiateMultipartUpload(request).getUploadId();
}
private void uploadMultipart(byte[] bytes, int off, int len, boolean lastPart) throws IOException {
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes, off, len)) {
try {
PartETag partETag = doUploadMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, is, len, lastPart);
multiparts.add(partETag);
multipartChunks++;
} catch (AmazonClientException e) {
abortMultipart();
throw e;
}
}
}
protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is,
int length, boolean lastPart) throws AmazonS3Exception {
UploadPartRequest request = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(blobName)
.withUploadId(uploadId)
.withPartNumber(multipartChunks)
.withInputStream(is)
.withPartSize(length)
.withLastPart(lastPart);
UploadPartResult response = blobStore.client().uploadPart(request);
return response.getPartETag();
}
private void completeMultipart() {
try {
doCompleteMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId, multiparts);
multipartId = null;
return;
} catch (AmazonClientException e) {
abortMultipart();
throw e;
}
}
protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List<PartETag> parts)
throws AmazonS3Exception {
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId, parts);
blobStore.client().completeMultipartUpload(request);
}
private void abortMultipart() {
if (multipartId != null) {
try {
doAbortMultipart(getBlobStore(), getBucketName(), getBlobName(), multipartId);
} finally {
multipartId = null;
}
}
}
protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId)
throws AmazonS3Exception {
blobStore.client().abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, blobName, uploadId));
}
}

View File

@ -21,35 +21,48 @@ package org.elasticsearch.repositories.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.collect.Tuple;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
class S3BlobContainer extends AbstractBlobContainer {
protected final S3BlobStore blobStore;
protected final String keyPath;
private final S3BlobStore blobStore;
private final String keyPath;
S3BlobContainer(BlobPath path, S3BlobStore blobStore) {
super(path);
@ -91,9 +104,15 @@ class S3BlobContainer extends AbstractBlobContainer {
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
try (OutputStream stream = createOutput(blobName)) {
Streams.copy(inputStream, stream);
}
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= blobStore.bufferSizeInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
}
return null;
});
}
@Override
@ -109,12 +128,6 @@ class S3BlobContainer extends AbstractBlobContainer {
}
}
private OutputStream createOutput(final String blobName) throws IOException {
// UploadS3OutputStream does buffering & retry logic internally
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName),
blobStore.bufferSizeInBytes(), blobStore.serverSideEncryption());
}
@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
return AccessController.doPrivileged((PrivilegedAction<Map<String, BlobMetaData>>) () -> {
@ -175,7 +188,158 @@ class S3BlobContainer extends AbstractBlobContainer {
return listBlobsByPrefix(null);
}
protected String buildKey(String blobName) {
private String buildKey(String blobName) {
return keyPath + blobName;
}
/**
* Uploads a blob using a single upload request
*/
void executeSingleUpload(final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize) throws IOException {
// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE);
}
if (blobSize > blobStore.bufferSizeInBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
}
try {
final ObjectMetadata md = new ObjectMetadata();
md.setContentLength(blobSize);
if (blobStore.serverSideEncryption()) {
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
final PutObjectRequest putRequest = new PutObjectRequest(blobStore.bucket(), blobName, input, md);
putRequest.setStorageClass(blobStore.getStorageClass());
putRequest.setCannedAcl(blobStore.getCannedACL());
blobStore.client().putObject(putRequest);
} catch (AmazonClientException e) {
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
}
}
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize) throws IOException {
if (blobSize > MAX_FILE_SIZE_USING_MULTIPART.getBytes()) {
throw new IllegalArgumentException("Multipart upload request size [" + blobSize
+ "] can't be larger than " + MAX_FILE_SIZE_USING_MULTIPART);
}
if (blobSize < MIN_PART_SIZE_USING_MULTIPART.getBytes()) {
throw new IllegalArgumentException("Multipart upload request size [" + blobSize
+ "] can't be smaller than " + MIN_PART_SIZE_USING_MULTIPART);
}
final long partSize = blobStore.bufferSizeInBytes();
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
if (multiparts.v1() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Too many multipart upload requests, maybe try a larger buffer size?");
}
final int nbParts = multiparts.v1().intValue();
final long lastPartSize = multiparts.v2();
assert blobSize == (nbParts - 1) * partSize + lastPartSize : "blobSize does not match multipart sizes";
final SetOnce<String> uploadId = new SetOnce<>();
final String bucketName = blobStore.bucket();
boolean success = false;
try {
final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, blobName);
initRequest.setStorageClass(blobStore.getStorageClass());
initRequest.setCannedACL(blobStore.getCannedACL());
if (blobStore.serverSideEncryption()) {
final ObjectMetadata md = new ObjectMetadata();
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
initRequest.setObjectMetadata(md);
}
uploadId.set(blobStore.client().initiateMultipartUpload(initRequest).getUploadId());
if (Strings.isEmpty(uploadId.get())) {
throw new IOException("Failed to initialize multipart upload " + blobName);
}
final List<PartETag> parts = new ArrayList<>();
long bytesCount = 0;
for (int i = 1; i <= nbParts; i++) {
final UploadPartRequest uploadRequest = new UploadPartRequest();
uploadRequest.setBucketName(bucketName);
uploadRequest.setKey(blobName);
uploadRequest.setUploadId(uploadId.get());
uploadRequest.setPartNumber(i);
uploadRequest.setInputStream(input);
if (i < nbParts) {
uploadRequest.setPartSize(partSize);
uploadRequest.setLastPart(false);
} else {
uploadRequest.setPartSize(lastPartSize);
uploadRequest.setLastPart(true);
}
bytesCount += uploadRequest.getPartSize();
final UploadPartResult uploadResponse = blobStore.client().uploadPart(uploadRequest);
parts.add(uploadResponse.getPartETag());
}
if (bytesCount != blobSize) {
throw new IOException("Failed to execute multipart upload for [" + blobName + "], expected " + blobSize
+ "bytes sent but got " + bytesCount);
}
CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId.get(), parts);
blobStore.client().completeMultipartUpload(complRequest);
success = true;
} catch (AmazonClientException e) {
throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e);
} finally {
if (success == false && Strings.hasLength(uploadId.get())) {
final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get());
blobStore.client().abortMultipartUpload(abortRequest);
}
}
}
/**
* Returns the number parts of size of {@code partSize} needed to reach {@code totalSize},
* along with the size of the last (or unique) part.
*
* @param totalSize the total size
* @param partSize the part size
* @return a {@link Tuple} containing the number of parts to fill {@code totalSize} and
* the size of the last part
*/
static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long partSize) {
if (partSize <= 0) {
throw new IllegalArgumentException("Part size must be greater than zero");
}
if (totalSize == 0L || totalSize <= partSize) {
return Tuple.tuple(1L, totalSize);
}
final long parts = totalSize / partSize;
final long remaining = totalSize % partSize;
if (remaining == 0) {
return Tuple.tuple(parts, partSize);
} else {
return Tuple.tuple(parts + 1, remaining);
}
}
}

View File

@ -93,8 +93,8 @@ class S3BlobStore extends AbstractComponent implements BlobStore {
return serverSideEncryption;
}
public int bufferSizeInBytes() {
return bufferSize.bytesAsInt();
public long bufferSizeInBytes() {
return bufferSize.getBytes();
}
@Override

View File

@ -1,119 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException;
import java.io.OutputStream;
/**
* S3OutputStream buffers data before flushing it to an underlying S3OutputStream.
*/
abstract class S3OutputStream extends OutputStream {
/**
* Limit of upload allowed by AWS S3.
*/
protected static final ByteSizeValue MULTIPART_MAX_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
protected static final ByteSizeValue MULTIPART_MIN_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB);
private S3BlobStore blobStore;
private String bucketName;
private String blobName;
private boolean serverSideEncryption;
private byte[] buffer;
private int count;
private long length;
private int flushCount = 0;
S3OutputStream(S3BlobStore blobStore, String bucketName, String blobName, int bufferSizeInBytes, boolean serverSideEncryption) {
this.blobStore = blobStore;
this.bucketName = bucketName;
this.blobName = blobName;
this.serverSideEncryption = serverSideEncryption;
if (bufferSizeInBytes < MULTIPART_MIN_SIZE.getBytes()) {
throw new IllegalArgumentException("Buffer size can't be smaller than " + MULTIPART_MIN_SIZE);
}
if (bufferSizeInBytes > MULTIPART_MAX_SIZE.getBytes()) {
throw new IllegalArgumentException("Buffer size can't be larger than " + MULTIPART_MAX_SIZE);
}
this.buffer = new byte[bufferSizeInBytes];
}
public abstract void flush(byte[] bytes, int off, int len, boolean closing) throws IOException;
private void flushBuffer(boolean closing) throws IOException {
flush(buffer, 0, count, closing);
flushCount++;
count = 0;
}
@Override
public void write(int b) throws IOException {
if (count >= buffer.length) {
flushBuffer(false);
}
buffer[count++] = (byte) b;
length++;
}
@Override
public void close() throws IOException {
if (count > 0) {
flushBuffer(true);
count = 0;
}
}
public S3BlobStore getBlobStore() {
return blobStore;
}
public String getBucketName() {
return bucketName;
}
public String getBlobName() {
return blobName;
}
public int getBufferSize() {
return buffer.length;
}
public boolean isServerSideEncryption() {
return serverSideEncryption;
}
public long getLength() {
return length;
}
public int getFlushCount() {
return flushCount;
}
}

View File

@ -19,8 +19,6 @@
package org.elasticsearch.repositories.s3;
import java.io.IOException;
import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
@ -37,6 +35,8 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.io.IOException;
/**
* Shared file system implementation of the BlobStoreRepository
* <p>
@ -80,14 +80,36 @@ class S3Repository extends BlobStoreRepository {
*/
static final Setting<Boolean> SERVER_SIDE_ENCRYPTION_SETTING = Setting.boolSetting("server_side_encryption", false);
/**
* Maximum size of files that can be uploaded using a single upload request.
*/
static final ByteSizeValue MAX_FILE_SIZE = new ByteSizeValue(5, ByteSizeUnit.GB);
/**
* Minimum size of parts that can be uploaded using the Multipart Upload API.
* (see http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html)
*/
static final ByteSizeValue MIN_PART_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.MB);
/**
* Maximum size of parts that can be uploaded using the Multipart Upload API.
* (see http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html)
*/
static final ByteSizeValue MAX_PART_SIZE_USING_MULTIPART = MAX_FILE_SIZE;
/**
* Maximum size of files that can be uploaded using the Multipart Upload API.
*/
static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB);
/**
* Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold,
* the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and
* to upload each part in its own request. Note that setting a buffer size lower than 5mb is not allowed since it will prevents the
* use of the Multipart API and may result in upload errors. Defaults to the minimum between 100MB and 5% of the heap size.
*/
static final Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("buffer_size", DEFAULT_BUFFER_SIZE,
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB));
static final Setting<ByteSizeValue> BUFFER_SIZE_SETTING =
Setting.byteSizeSetting("buffer_size", DEFAULT_BUFFER_SIZE, MIN_PART_SIZE_USING_MULTIPART, MAX_PART_SIZE_USING_MULTIPART);
/**
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.

View File

@ -1,101 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.PartETag;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.repositories.s3.DefaultS3OutputStream;
import org.elasticsearch.repositories.s3.S3BlobStore;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
public class MockDefaultS3OutputStream extends DefaultS3OutputStream {
private ByteArrayOutputStream out = new ByteArrayOutputStream();
private boolean initialized = false;
private boolean completed = false;
private boolean aborted = false;
private int numberOfUploadRequests = 0;
public MockDefaultS3OutputStream(int bufferSizeInBytes) {
super(null, "test-bucket", "test-blobname", bufferSizeInBytes, false);
}
@Override
protected void doUpload(S3BlobStore blobStore, String bucketName, String blobName, InputStream is, int length, boolean serverSideEncryption) throws AmazonS3Exception {
try {
long copied = Streams.copy(is, out);
if (copied != length) {
throw new AmazonS3Exception("Not all the bytes were copied");
}
numberOfUploadRequests++;
} catch (IOException e) {
throw new AmazonS3Exception(e.getMessage());
}
}
@Override
protected String doInitialize(S3BlobStore blobStore, String bucketName, String blobName, boolean serverSideEncryption) {
initialized = true;
return RandomizedTest.randomAsciiOfLength(50);
}
@Override
protected PartETag doUploadMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, InputStream is, int length, boolean lastPart) throws AmazonS3Exception {
try {
long copied = Streams.copy(is, out);
if (copied != length) {
throw new AmazonS3Exception("Not all the bytes were copied");
}
return new PartETag(numberOfUploadRequests++, RandomizedTest.randomAsciiOfLength(50));
} catch (IOException e) {
throw new AmazonS3Exception(e.getMessage());
}
}
@Override
protected void doCompleteMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId, List<PartETag> parts) throws AmazonS3Exception {
completed = true;
}
@Override
protected void doAbortMultipart(S3BlobStore blobStore, String bucketName, String blobName, String uploadId) throws AmazonS3Exception {
aborted = true;
}
public int getNumberOfUploadRequests() {
return numberOfUploadRequests;
}
public boolean isMultipart() {
return (numberOfUploadRequests > 1) && initialized && completed && !aborted;
}
public byte[] toByteArray() {
return out.toByteArray();
}
}

View File

@ -19,10 +19,24 @@
package org.elasticsearch.repositories.s3;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -30,16 +44,29 @@ import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.mockito.ArgumentCaptor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase {
private static final Logger logger = Loggers.getLogger(S3BlobStoreContainerTests.class);
private static ServerSocket mockS3ServerSocket;
private static Thread mockS3AcceptorThread;
@ -69,6 +96,329 @@ public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase {
new ByteSizeValue(10, ByteSizeUnit.MB), "public-read-write", "standard");
}
public void testExecuteSingleUploadBlobSizeTooLarge() throws IOException {
final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(6, 10));
final S3BlobStore blobStore = mock(S3BlobStore.class);
final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize));
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
}
public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() throws IOException {
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bufferSizeInBytes()).thenReturn(ByteSizeUnit.MB.toBytes(1));
final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore);
final String blobName = randomAlphaOfLengthBetween(1, 10);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
blobContainer.executeSingleUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), ByteSizeUnit.MB.toBytes(2)));
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
}
public void testExecuteSingleUpload() throws IOException {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final String blobName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
if (randomBoolean()) {
IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value));
}
final int bufferSize = randomIntBetween(1024, 2048);
final int blobSize = randomIntBetween(0, bufferSize);
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.bufferSizeInBytes()).thenReturn((long) bufferSize);
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
final boolean serverSideEncryption = randomBoolean();
when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption);
final StorageClass storageClass = randomFrom(StorageClass.values());
when(blobStore.getStorageClass()).thenReturn(storageClass);
final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null;
if (cannedAccessControlList != null) {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}
final AmazonS3 client = mock(AmazonS3.class);
when(blobStore.client()).thenReturn(client);
final ArgumentCaptor<PutObjectRequest> argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
when(client.putObject(argumentCaptor.capture())).thenReturn(new PutObjectResult());
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize);
final PutObjectRequest request = argumentCaptor.getValue();
assertEquals(bucketName, request.getBucketName());
assertEquals(blobPath.buildAsString() + blobName, request.getKey());
assertEquals(inputStream, request.getInputStream());
assertEquals(blobSize, request.getMetadata().getContentLength());
assertEquals(storageClass.toString(), request.getStorageClass());
assertEquals(cannedAccessControlList, request.getCannedAcl());
if (serverSideEncryption) {
assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, request.getMetadata().getSSEAlgorithm());
}
}
public void testExecuteMultipartUploadBlobSizeTooLarge() throws IOException {
final long blobSize = ByteSizeUnit.TB.toBytes(randomIntBetween(6, 10));
final S3BlobStore blobStore = mock(S3BlobStore.class);
final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
}
public void testExecuteMultipartUploadBlobSizeTooSmall() throws IOException {
final long blobSize = ByteSizeUnit.MB.toBytes(randomIntBetween(1, 4));
final S3BlobStore blobStore = mock(S3BlobStore.class);
final S3BlobContainer blobContainer = new S3BlobContainer(mock(BlobPath.class), blobStore);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
}
public void testExecuteMultipartUpload() throws IOException {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final String blobName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
if (randomBoolean()) {
IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value));
}
final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(1, 1024));
final long bufferSize = ByteSizeUnit.MB.toBytes(randomIntBetween(5, 1024));
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);
final boolean serverSideEncryption = randomBoolean();
when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption);
final StorageClass storageClass = randomFrom(StorageClass.values());
when(blobStore.getStorageClass()).thenReturn(storageClass);
final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null;
if (cannedAccessControlList != null) {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}
final AmazonS3 client = mock(AmazonS3.class);
when(blobStore.client()).thenReturn(client);
final ArgumentCaptor<InitiateMultipartUploadRequest> initArgCaptor = ArgumentCaptor.forClass(InitiateMultipartUploadRequest.class);
final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult();
initResult.setUploadId(randomAlphaOfLength(10));
when(client.initiateMultipartUpload(initArgCaptor.capture())).thenReturn(initResult);
final ArgumentCaptor<UploadPartRequest> uploadArgCaptor = ArgumentCaptor.forClass(UploadPartRequest.class);
final List<String> expectedEtags = new ArrayList<>();
long partSize = Math.min(bufferSize, blobSize);
long totalBytes = 0;
do {
expectedEtags.add(randomAlphaOfLength(50));
totalBytes += partSize;
} while (totalBytes < blobSize);
when(client.uploadPart(uploadArgCaptor.capture())).thenAnswer(invocationOnMock -> {
final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0];
final UploadPartResult response = new UploadPartResult();
response.setPartNumber(request.getPartNumber());
response.setETag(expectedEtags.get(request.getPartNumber() - 1));
return response;
});
final ArgumentCaptor<CompleteMultipartUploadRequest> compArgCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class);
when(client.completeMultipartUpload(compArgCaptor.capture())).thenReturn(new CompleteMultipartUploadResult());
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize);
final InitiateMultipartUploadRequest initRequest = initArgCaptor.getValue();
assertEquals(bucketName, initRequest.getBucketName());
assertEquals(blobPath.buildAsString() + blobName, initRequest.getKey());
assertEquals(storageClass, initRequest.getStorageClass());
assertEquals(cannedAccessControlList, initRequest.getCannedACL());
if (serverSideEncryption) {
assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, initRequest.getObjectMetadata().getSSEAlgorithm());
}
final Tuple<Long, Long> numberOfParts = S3BlobContainer.numberOfMultiparts(blobSize, bufferSize);
final List<UploadPartRequest> uploadRequests = uploadArgCaptor.getAllValues();
assertEquals(numberOfParts.v1().intValue(), uploadRequests.size());
for (int i = 0; i < uploadRequests.size(); i++) {
UploadPartRequest uploadRequest = uploadRequests.get(i);
assertEquals(bucketName, uploadRequest.getBucketName());
assertEquals(blobPath.buildAsString() + blobName, uploadRequest.getKey());
assertEquals(initResult.getUploadId(), uploadRequest.getUploadId());
assertEquals(i + 1, uploadRequest.getPartNumber());
assertEquals(inputStream, uploadRequest.getInputStream());
if (i == (uploadRequests.size() -1)) {
assertTrue(uploadRequest.isLastPart());
assertEquals(numberOfParts.v2().longValue(), uploadRequest.getPartSize());
} else {
assertFalse(uploadRequest.isLastPart());
assertEquals(bufferSize, uploadRequest.getPartSize());
}
}
final CompleteMultipartUploadRequest compRequest = compArgCaptor.getValue();
assertEquals(bucketName, compRequest.getBucketName());
assertEquals(blobPath.buildAsString() + blobName, compRequest.getKey());
assertEquals(initResult.getUploadId(), compRequest.getUploadId());
List<String> actualETags = compRequest.getPartETags().stream().map(PartETag::getETag).collect(Collectors.toList());
assertEquals(expectedEtags, actualETags);
}
public void testExecuteMultipartUploadAborted() throws IOException {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final String blobName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final long blobSize = ByteSizeUnit.MB.toBytes(765);
final long bufferSize = ByteSizeUnit.MB.toBytes(150);
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);
when(blobStore.getStorageClass()).thenReturn(randomFrom(StorageClass.values()));
final AmazonS3 client = mock(AmazonS3.class);
when(blobStore.client()).thenReturn(client);
final String uploadId = randomAlphaOfLength(25);
final int stage = randomInt(2);
final List<AmazonClientException> exceptions = Arrays.asList(
new AmazonClientException("Expected initialization request to fail"),
new AmazonClientException("Expected upload part request to fail"),
new AmazonClientException("Expected completion request to fail")
);
if (stage == 0) {
// Fail the initialization request
when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)))
.thenThrow(exceptions.get(stage));
} else if (stage == 1) {
final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult();
initResult.setUploadId(uploadId);
when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(initResult);
// Fail the upload part request
when(client.uploadPart(any(UploadPartRequest.class)))
.thenThrow(exceptions.get(stage));
} else {
final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult();
initResult.setUploadId(uploadId);
when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(initResult);
when(client.uploadPart(any(UploadPartRequest.class))).thenAnswer(invocationOnMock -> {
final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0];
final UploadPartResult response = new UploadPartResult();
response.setPartNumber(request.getPartNumber());
response.setETag(randomAlphaOfLength(20));
return response;
});
// Fail the completion request
when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
.thenThrow(exceptions.get(stage));
}
final ArgumentCaptor<AbortMultipartUploadRequest> argumentCaptor = ArgumentCaptor.forClass(AbortMultipartUploadRequest.class);
doNothing().when(client).abortMultipartUpload(argumentCaptor.capture());
final IOException e = expectThrows(IOException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize);
});
assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage());
assertThat(e.getCause(), instanceOf(AmazonClientException.class));
assertEquals(exceptions.get(stage).getMessage(), e.getCause().getMessage());
if (stage == 0) {
verify(client, times(1)).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
verify(client, times(0)).uploadPart(any(UploadPartRequest.class));
verify(client, times(0)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
verify(client, times(0)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));
} else {
verify(client, times(1)).initiateMultipartUpload(any(InitiateMultipartUploadRequest.class));
if (stage == 1) {
verify(client, times(1)).uploadPart(any(UploadPartRequest.class));
verify(client, times(0)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
} else {
verify(client, times(6)).uploadPart(any(UploadPartRequest.class));
verify(client, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
}
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));
final AbortMultipartUploadRequest abortRequest = argumentCaptor.getValue();
assertEquals(bucketName, abortRequest.getBucketName());
assertEquals(blobName, abortRequest.getKey());
assertEquals(uploadId, abortRequest.getUploadId());
}
}
public void testNumberOfMultipartsWithZeroPartSize() {
IllegalArgumentException e =
expectThrows(IllegalArgumentException.class, () -> S3BlobContainer.numberOfMultiparts(randomNonNegativeLong(), 0L));
assertEquals("Part size must be greater than zero", e.getMessage());
}
public void testNumberOfMultiparts() {
final ByteSizeUnit unit = randomFrom(ByteSizeUnit.BYTES, ByteSizeUnit.KB, ByteSizeUnit.MB, ByteSizeUnit.GB);
final long size = unit.toBytes(randomIntBetween(1, 10));
final int factor = randomIntBetween(2, 10);
// Fits in 1 empty part
assertNumberOfMultiparts(1, 0L, 0L, size);
// Fits in 1 part exactly
assertNumberOfMultiparts(1, size, size, size);
assertNumberOfMultiparts(1, size, size, size * factor);
// Fits in N parts exactly
assertNumberOfMultiparts(factor, size, size * factor, size);
// Fits in N parts plus a bit more
final long remaining = randomIntBetween(1, (size > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) size - 1);
assertNumberOfMultiparts(factor + 1, remaining, size * factor + remaining, size);
}
private static void assertNumberOfMultiparts(final int expectedParts, final long expectedRemaining, long totalSize, long partSize) {
final Tuple<Long, Long> result = S3BlobContainer.numberOfMultiparts(totalSize, partSize);
assertEquals("Expected number of parts [" + expectedParts + "] but got [" + result.v1() + "]", expectedParts, (long) result.v1());
assertEquals("Expected remaining [" + expectedRemaining + "] but got [" + result.v2() + "]", expectedRemaining, (long) result.v2());
}
@AfterClass
public static void closeMockSocket() throws IOException, InterruptedException {
mockS3ServerSocket.close();

View File

@ -1,143 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.common.io.Streams.copy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
* Unit test for {@link S3OutputStream}.
*/
public class S3OutputStreamTests extends ESTestCase {
private static final int BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB).bytesAsInt();
public void testWriteLessDataThanBufferSize() throws IOException {
MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE);
byte[] content = randomUnicodeOfLengthBetween(1, 512).getBytes("UTF-8");
copy(content, out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) content.length));
assertThat(Arrays.equals(content, out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE));
assertThat(out.getFlushCount(), equalTo(1));
assertThat(out.getNumberOfUploadRequests(), equalTo(1));
assertFalse(out.isMultipart());
}
public void testWriteSameDataThanBufferSize() throws IOException {
int size = randomIntBetween(BUFFER_SIZE, 2 * BUFFER_SIZE);
MockDefaultS3OutputStream out = newS3OutputStream(size);
ByteArrayOutputStream content = new ByteArrayOutputStream(size);
for (int i = 0; i < size; i++) {
content.write(randomByte());
}
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) size));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(size));
assertThat(out.getFlushCount(), equalTo(1));
assertThat(out.getNumberOfUploadRequests(), equalTo(1));
assertFalse(out.isMultipart());
}
public void testWriteExactlyNTimesMoreDataThanBufferSize() throws IOException {
int n = randomIntBetween(2, 3);
int length = n * BUFFER_SIZE;
ByteArrayOutputStream content = new ByteArrayOutputStream(length);
for (int i = 0; i < length; i++) {
content.write(randomByte());
}
MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE);
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) length));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
// Checks single/multi part upload
assertThat(out.getBufferSize(), equalTo(BUFFER_SIZE));
assertThat(out.getFlushCount(), equalTo(n));
assertThat(out.getNumberOfUploadRequests(), equalTo(n));
assertTrue(out.isMultipart());
}
public void testWriteRandomNumberOfBytes() throws IOException {
Integer randomBufferSize = randomIntBetween(BUFFER_SIZE, 2 * BUFFER_SIZE);
MockDefaultS3OutputStream out = newS3OutputStream(randomBufferSize);
Integer randomLength = randomIntBetween(1, 2 * BUFFER_SIZE);
ByteArrayOutputStream content = new ByteArrayOutputStream(randomLength);
for (int i = 0; i < randomLength; i++) {
content.write(randomByte());
}
copy(content.toByteArray(), out);
// Checks length & content
assertThat(out.getLength(), equalTo((long) randomLength));
assertThat(Arrays.equals(content.toByteArray(), out.toByteArray()), equalTo(true));
assertThat(out.getBufferSize(), equalTo(randomBufferSize));
int times = (int) Math.ceil(randomLength.doubleValue() / randomBufferSize.doubleValue());
assertThat(out.getFlushCount(), equalTo(times));
if (times > 1) {
assertTrue(out.isMultipart());
} else {
assertFalse(out.isMultipart());
}
}
public void testWrongBufferSize() throws IOException {
Integer randomBufferSize = randomIntBetween(1, 4 * 1024 * 1024);
try {
newS3OutputStream(randomBufferSize);
fail("Buffer size can't be smaller than 5mb");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("Buffer size can't be smaller than 5mb"));
}
}
private MockDefaultS3OutputStream newS3OutputStream(int bufferSizeInBytes) {
return new MockDefaultS3OutputStream(bufferSizeInBytes);
}
}