From 8767a8100beabb0c7ff4713e2c66dd098827fff4 Mon Sep 17 00:00:00 2001 From: Tibor Kiss Date: Fri, 25 Mar 2011 17:52:53 +0100 Subject: [PATCH] Issue 430: ParallelMultipartUploadStrategy added. --- .../aws/s3/blobstore/AWSS3AsyncBlobStore.java | 8 +- .../config/AWSS3BlobStoreContextModule.java | 3 + .../AsyncMultipartUploadStrategy.java | 38 +++ .../blobstore/strategy/MultipartUpload.java | 41 +++ .../strategy/MultipartUploadStrategy.java | 18 +- .../MultipartUploadSlicingAlgorithm.java | 135 ++++++++ .../ParallelMultipartUploadStrategy.java | 287 ++++++++++++++++++ .../SequentialMultipartUploadStrategy.java | 150 ++------- .../strategy/internal/MpuGraphData.java | 43 +-- .../MpuPartitioningAlgorithmTest.java | 85 ++---- ...SequentialMultipartUploadStrategyTest.java | 8 +- 11 files changed, 570 insertions(+), 246 deletions(-) create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/AsyncMultipartUploadStrategy.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUpload.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java create mode 100644 providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java index 7c83451d57..87b9e7704c 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3AsyncBlobStore.java @@ -29,6 +29,7 @@ import javax.inject.Provider; import org.jclouds.Constants; import org.jclouds.aws.s3.AWSS3AsyncClient; import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.strategy.AsyncMultipartUploadStrategy; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; @@ -55,7 +56,7 @@ import com.google.common.util.concurrent.ListenableFuture; */ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { - private final Provider multipartUploadStrategy; + private final Provider multipartUploadStrategy; @Inject public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, @@ -65,7 +66,7 @@ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob, BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, Provider fetchBlobMetadataProvider, - Provider multipartUploadStrategy) { + Provider multipartUploadStrategy) { super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd, container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object, object2BlobMd, fetchBlobMetadataProvider); @@ -74,9 +75,8 @@ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore { @Override public ListenableFuture putBlobMultipart(String container, Blob blob) { - // TODO: make this better // need to use a provider if the strategy object is stateful - return Futures.immediateFuture(multipartUploadStrategy.get().execute(container, blob)); + return multipartUploadStrategy.get().execute(container, blob); } } diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java index 75cea06094..eec01d566d 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/config/AWSS3BlobStoreContextModule.java @@ -23,7 +23,9 @@ import org.jclouds.aws.s3.AWSS3AsyncClient; import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore; import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.aws.s3.blobstore.strategy.AsyncMultipartUploadStrategy; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.aws.s3.blobstore.strategy.internal.ParallelMultipartUploadStrategy; import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.internal.BlobStoreContextImpl; @@ -47,6 +49,7 @@ public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule { bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON); bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON); bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class); + bind(AsyncMultipartUploadStrategy.class).to(ParallelMultipartUploadStrategy.class); } @Override diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/AsyncMultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/AsyncMultipartUploadStrategy.java new file mode 100644 index 0000000000..71f8d81291 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/AsyncMultipartUploadStrategy.java @@ -0,0 +1,38 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy; + +import org.jclouds.aws.s3.blobstore.strategy.internal.ParallelMultipartUploadStrategy; +import org.jclouds.blobstore.domain.Blob; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.ImplementedBy; + +/** + * @see execute(String container, Blob blob); + +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUpload.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUpload.java new file mode 100644 index 0000000000..820e44ccec --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUpload.java @@ -0,0 +1,41 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy; + +/** + * @see 0) { + partSize = magnitude * unitPartSize; + if (partSize > MultipartUpload.MAX_PART_SIZE) { + partSize = MultipartUpload.MAX_PART_SIZE; + unitPartSize = MultipartUpload.MAX_PART_SIZE; + } + parts = (int)(length / partSize); + if (parts * partSize < length) { + partSize = (magnitude + 1) * unitPartSize; + if (partSize > MultipartUpload.MAX_PART_SIZE) { + partSize = MultipartUpload.MAX_PART_SIZE; + unitPartSize = MultipartUpload.MAX_PART_SIZE; + } + parts = (int)(length / partSize); + } + } + if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if splits in too many parts or + // cannot be split + unitPartSize = MultipartUpload.MIN_PART_SIZE; // take the minimum part size + parts = (int)(length / unitPartSize); + } + if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if still splits in too many parts + parts = MultipartUpload.MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not + // covering + } + long remainder = length % unitPartSize; + if (remainder == 0 && parts > 0) { + parts -= 1; + } + this.chunkSize = partSize; + this.parts = parts; + this.remaining = length - partSize * parts; + logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize, + remaining, (remaining > MultipartUpload.MAX_PART_SIZE ? " overflow!" : "")); + return this.chunkSize; + } + + public long getCopied() { + return copied; + } + + public void setCopied(long copied) { + this.copied = copied; + } + + @VisibleForTesting + protected int getParts() { + return parts; + } + + protected int getNextPart() { + return ++part; + } + + protected void addCopied(long copied) { + this.copied += copied; + } + + protected long getNextChunkOffset() { + long next = chunkOffset; + chunkOffset += getChunkSize(); + return next; + } + + @VisibleForTesting + protected long getChunkSize() { + return chunkSize; + } + + @VisibleForTesting + protected long getRemaining() { + return remaining; + } + +} diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java new file mode 100644 index 0000000000..2676537981 --- /dev/null +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java @@ -0,0 +1,287 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; +import java.util.Queue; +import java.util.SortedMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Resource; +import javax.inject.Named; + +import org.jclouds.Constants; +import org.jclouds.aws.s3.AWSS3AsyncClient; +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore; +import org.jclouds.aws.s3.blobstore.strategy.AsyncMultipartUploadStrategy; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.internal.BlobRuntimeException; +import org.jclouds.blobstore.reference.BlobStoreConstants; +import org.jclouds.concurrent.Futures; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.logging.Logger; +import org.jclouds.s3.domain.ObjectMetadataBuilder; +import org.jclouds.util.Throwables2; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; + +public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStrategy { + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + + @VisibleForTesting + static final int DEFAULT_PARALLEL_DEGREE = 4; + @VisibleForTesting + static final int DEFAULT_MIN_RETRIES = 5; + @VisibleForTesting + static final int DEFAULT_MAX_PERCENT_RETRIES = 10; + + private final ExecutorService ioWorkerExecutor; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.degree") + @VisibleForTesting + int parallelDegree = DEFAULT_PARALLEL_DEGREE; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.retries.min") + @VisibleForTesting + int minRetries = DEFAULT_MIN_RETRIES; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.retries.maxpercent") + @VisibleForTesting + int maxPercentRetries = DEFAULT_MAX_PERCENT_RETRIES; + + /** + * maximum duration of an blob Request + */ + @Inject(optional = true) + @Named(Constants.PROPERTY_REQUEST_TIMEOUT) + protected Long maxTime; + + protected final AWSS3AsyncBlobStore ablobstore; + protected final PayloadSlicer slicer; + + @Inject + public ParallelMultipartUploadStrategy(AWSS3AsyncBlobStore ablobstore, PayloadSlicer slicer, + @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor) { + this.ablobstore = checkNotNull(ablobstore, "ablobstore"); + this.slicer = checkNotNull(slicer, "slicer"); + this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor"); + } + + protected void prepareUploadPart(final String container, final String key, + final String uploadId, final Integer part, final Payload payload, + final long offset, final long size, final SortedMap etags, + final BlockingQueue activeParts, + final Map> futureParts, + final AtomicInteger errors, final int maxRetries, final Map errorMap, + final Queue toRetry, final CountDownLatch latch) { + if (errors.get() > maxRetries) { + activeParts.remove(part); // remove part from the bounded-queue without blocking + latch.countDown(); + return; + } + final AWSS3AsyncClient client = (AWSS3AsyncClient) ablobstore.getContext() + .getProviderSpecificContext().getAsyncApi(); + Payload chunkedPart = slicer.slice(payload, offset, size); + logger.debug(String.format("async uploading part %s of %s to container %s with uploadId %s", part, key, container, uploadId)); + final long start = System.currentTimeMillis(); + final ListenableFuture futureETag = client.uploadPart(container, key, part, uploadId, chunkedPart); + futureETag.addListener(new Runnable() { + @Override + public void run() { + try { + etags.put(part, futureETag.get()); + logger.debug(String.format("async uploaded part %s of %s to container %s in %sms with uploadId %s", + part, key, container, (System.currentTimeMillis()-start), uploadId)); + } catch (CancellationException e) { + errorMap.put(part, e); + String message = String.format("%s while uploading part %s - [%s,%s] to container %s with uploadId: %s running since %dms", + e.getMessage(), part, offset, size, container, uploadId, (System.currentTimeMillis()-start)); + logger.debug(message); + } catch (Exception e) { + errorMap.put(part, e); + String message = String.format("%s while uploading part %s - [%s,%s] to container %s with uploadId: %s running since %dms", + e.getMessage(), part, offset, size, container, uploadId, (System.currentTimeMillis()-start)); + logger.error(message, e); + if (errors.incrementAndGet() <= maxRetries) + toRetry.add(new Part(part, offset, size)); + } finally { + activeParts.remove(part); // remove part from the bounded-queue without blocking + futureParts.remove(part); + latch.countDown(); + } + } + }, ioWorkerExecutor); + futureParts.put(part, futureETag); + } + + @Override + public ListenableFuture execute(final String container, final Blob blob) { + return Futures.makeListenable( + ioWorkerExecutor.submit(new Callable() { + @Override + public String call() throws Exception { + String key = blob.getMetadata().getName(); + Payload payload = blob.getPayload(); + MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); + algorithm.calculateChunkSize(payload.getContentMetadata() + .getContentLength()); + int parts = algorithm.getParts(); + long chunkSize = algorithm.getChunkSize(); + long remaining = algorithm.getRemaining(); + if (parts > 0) { + AWSS3Client client = (AWSS3Client) ablobstore + .getContext().getProviderSpecificContext().getApi(); + String uploadId = null; + final Map> futureParts = + new ConcurrentHashMap>(); + final Map errorMap = Maps.newHashMap(); + AtomicInteger errors = new AtomicInteger(0); + int maxRetries = Math.max(minRetries, parts * maxPercentRetries / 100); + int effectiveParts = remaining > 0 ? parts + 1 : parts; + try { + uploadId = client.initiateMultipartUpload(container, + ObjectMetadataBuilder.create().key(key).build()); // TODO md5 + logger.debug(String.format("initiated multipart upload of %s to container %s" + + " with uploadId %s consisting from %s part (possible max. retries: %d)", + key, container, uploadId, effectiveParts, maxRetries)); + // we need a bounded-blocking queue to control the amount of parallel jobs + ArrayBlockingQueue activeParts = new ArrayBlockingQueue(parallelDegree); + Queue toRetry = new ConcurrentLinkedQueue(); + SortedMap etags = new ConcurrentSkipListMap(); + CountDownLatch latch = new CountDownLatch(effectiveParts); + int part; + while ((part = algorithm.getNextPart()) <= parts) { + Integer partKey = new Integer(part); + activeParts.put(partKey); + prepareUploadPart(container, key, uploadId, partKey, payload, + algorithm.getNextChunkOffset(), chunkSize, etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch); + } + if (remaining > 0) { + Integer partKey = new Integer(part); + activeParts.put(partKey); + prepareUploadPart(container, key, uploadId, partKey, payload, + algorithm.getNextChunkOffset(), remaining, etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch); + } + latch.await(1, TimeUnit.MINUTES); + // handling retries + while (errors.get() <= maxRetries && toRetry.size() > 0) { + int atOnce = Math.min(Math.min(toRetry.size(), errors.get()), parallelDegree); + CountDownLatch retryLatch = new CountDownLatch(atOnce); + for (int i = 0; i < atOnce; i++) { + Part failedPart = toRetry.poll(); + Integer partKey = new Integer(failedPart.getPart()); + activeParts.put(partKey); + prepareUploadPart(container, key, uploadId, partKey, payload, + failedPart.getOffset(), failedPart.getSize(), etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, retryLatch); + } + retryLatch.await(1, TimeUnit.MINUTES); + } + if (errors.get() > maxRetries) { + throw new BlobRuntimeException(String.format( + "Too many failed parts: %s while multipart upload of %s to container %s with uploadId %s", + errors.get(), key, container, uploadId)); + } + String eTag = client.completeMultipartUpload(container, key, uploadId, etags); + logger.debug(String.format("multipart upload of %s to container %s with uploadId %s" + + " succeffully finished with %s retries", key, container, uploadId, errors.get())); + return eTag; + } catch (Exception ex) { + RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); + if (rtex == null) { + rtex = new RuntimeException(ex); + } + for (Map.Entry> entry : futureParts.entrySet()) { + entry.getValue().cancel(false); + } + if (uploadId != null) { + client.abortMultipartUpload(container, key, uploadId); + } + throw rtex; + } + } else { + ListenableFuture futureETag = ablobstore.putBlob(container, blob); + return maxTime != null ? + futureETag.get(maxTime,TimeUnit.SECONDS) : futureETag.get(); + } + } + }), ioWorkerExecutor); + } + + class Part { + private int part; + private long offset; + private long size; + + Part(int part, long offset, long size) { + this.part = part; + this.offset = offset; + this.size = size; + } + + public int getPart() { + return part; + } + + public void setPart(int part) { + this.part = part; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public long getSize() { + return size; + } + + public void setSize(long size) { + this.size = size; + } + } +} \ No newline at end of file diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index 4cedf3fa86..5f0c276c38 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -22,9 +22,7 @@ package org.jclouds.aws.s3.blobstore.strategy.internal; import static com.google.common.base.Preconditions.checkNotNull; -import java.util.Map; import java.util.SortedMap; -import java.util.concurrent.TimeoutException; import javax.annotation.Resource; import javax.inject.Named; @@ -41,7 +39,6 @@ import org.jclouds.logging.Logger; import org.jclouds.s3.domain.ObjectMetadataBuilder; import org.jclouds.util.Throwables2; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -62,126 +59,24 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg @Named(BlobStoreConstants.BLOBSTORE_LOGGER) protected Logger logger = Logger.NULL; - @VisibleForTesting - static final long DEFAULT_PART_SIZE = 33554432; // 32MB - - @VisibleForTesting - static final int DEFAULT_MAGNITUDE_BASE = 100; - - @Inject(optional = true) - @Named("jclouds.mpu.parts.size") - @VisibleForTesting - long defaultPartSize = DEFAULT_PART_SIZE; - - @Inject(optional = true) - @Named("jclouds.mpu.parts.magnitude") - @VisibleForTesting - int magnitudeBase = DEFAULT_MAGNITUDE_BASE; - - private final AWSS3BlobStore ablobstore; - private final PayloadSlicer slicer; - - // calculated only once, but not from the constructor - private volatile long parts; // required number of parts with chunkSize - private volatile long chunkSize; - private volatile long remaining; // number of bytes remained for the last part - - // sequentially updated values - private volatile int part; - private volatile long chunkOffset; - private volatile long copied; + protected final AWSS3BlobStore ablobstore; + protected final PayloadSlicer slicer; @Inject public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) { this.ablobstore = checkNotNull(ablobstore, "ablobstore"); this.slicer = checkNotNull(slicer, "slicer"); } - - @VisibleForTesting - protected long calculateChunkSize(long length) { - long unitPartSize = defaultPartSize; // first try with default part size - long parts = length / unitPartSize; - long partSize = unitPartSize; - int magnitude = (int) (parts / magnitudeBase); - if (magnitude > 0) { - partSize = magnitude * unitPartSize; - if (partSize > MAX_PART_SIZE) { - partSize = MAX_PART_SIZE; - unitPartSize = MAX_PART_SIZE; - } - parts = length / partSize; - if (parts * partSize < length) { - partSize = (magnitude + 1) * unitPartSize; - if (partSize > MAX_PART_SIZE) { - partSize = MAX_PART_SIZE; - unitPartSize = MAX_PART_SIZE; - } - parts = length / partSize; - } - } - if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or - // cannot be split - unitPartSize = MIN_PART_SIZE; // take the minimum part size - parts = length / unitPartSize; - } - if (parts > MAX_NUMBER_OF_PARTS) { // if still splits in too many parts - parts = MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not - // covering - } - long remainder = length % unitPartSize; - if (remainder == 0 && parts > 0) { - parts -= 1; - } - this.chunkSize = partSize; - this.parts = parts; - this.remaining = length - partSize * parts; - logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize, - remaining, (remaining > MAX_PART_SIZE ? " overflow!" : "")); - return this.chunkSize; - } - - public long getCopied() { - return copied; - } - - public void setCopied(long copied) { - this.copied = copied; - } - - @VisibleForTesting - protected long getParts() { - return parts; - } - - protected int getNextPart() { - return ++part; - } - - protected void addCopied(long copied) { - this.copied += copied; - } - - protected long getNextChunkOffset() { - long next = chunkOffset; - chunkOffset += getChunkSize(); - return next; - } - - @VisibleForTesting - protected long getChunkSize() { - return chunkSize; - } - - @VisibleForTesting - protected long getRemaining() { - return remaining; - } - - private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, int part, - Payload chunkedPart) { + + protected void prepareUploadPart(String container, String key, String uploadId, int part, + Payload payload, long offset, long size, SortedMap etags) { + AWSS3Client client = (AWSS3Client) ablobstore.getContext() + .getProviderSpecificContext().getApi(); + Payload chunkedPart = slicer.slice(payload, offset, size); String eTag = null; try { - eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + etags.put(new Integer(part), eTag); } catch (KeyNotFoundException e) { // note that because of eventual consistency, the upload id may not be present yet // we may wish to add this condition to the retry handler @@ -189,15 +84,18 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg // we may also choose to implement ListParts and wait for the uploadId to become // available there. eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); + etags.put(new Integer(part), eTag); } - return eTag; } @Override public String execute(String container, Blob blob) { String key = blob.getMetadata().getName(); - calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength()); - long parts = getParts(); + Payload payload = blob.getPayload(); + MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); + algorithm.calculateChunkSize(payload.getContentMetadata().getContentLength()); + int parts = algorithm.getParts(); + long chunkSize = algorithm.getChunkSize(); if (parts > 0) { AWSS3Client client = (AWSS3Client) ablobstore.getContext() .getProviderSpecificContext().getApi(); @@ -206,18 +104,14 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg try { SortedMap etags = Maps.newTreeMap(); int part; - while ((part = getNextPart()) <= getParts()) { - String eTag = prepareUploadPart(client, container, key, - uploadId, part, slicer.slice(blob.getPayload(), - getNextChunkOffset(), chunkSize)); - etags.put(new Integer(part), eTag); + while ((part = algorithm.getNextPart()) <= parts) { + prepareUploadPart(container, key, uploadId, part, payload, + algorithm.getNextChunkOffset(), chunkSize, etags); } - long remaining = getRemaining(); + long remaining = algorithm.getRemaining(); if (remaining > 0) { - String eTag = prepareUploadPart(client, container, key, - uploadId, part, slicer.slice(blob.getPayload(), - getNextChunkOffset(), remaining)); - etags.put(new Integer(part), eTag); + prepareUploadPart(container, key, uploadId, part, payload, + algorithm.getNextChunkOffset(), remaining, etags); } return client.completeMultipartUpload(container, key, uploadId, etags); } catch (Exception ex) { diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuGraphData.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuGraphData.java index a51dda2f41..0b3cc9287f 100644 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuGraphData.java +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuGraphData.java @@ -19,13 +19,7 @@ package org.jclouds.aws.s3.blobstore.strategy.internal; -import static org.easymock.classextension.EasyMock.createMock; -import static org.easymock.classextension.EasyMock.replay; -import static org.easymock.classextension.EasyMock.verify; - -import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; -import org.jclouds.io.PayloadSlicer; /** * Print out on the console some graph data regarding the partitioning algorithm. @@ -34,46 +28,37 @@ import org.jclouds.io.PayloadSlicer; */ public class MpuGraphData { - private static void calculate(long length, SequentialMultipartUploadStrategy strategy) { - System.out.println("" + length + " " + strategy.getParts() + " " - + strategy.calculateChunkSize(length) + " " + + strategy.getRemaining()); + private static void calculate(long length, MultipartUploadSlicingAlgorithm algorithm) { + System.out.println("" + length + " " + algorithm.getParts() + " " + + algorithm.calculateChunkSize(length) + " " + + algorithm.getRemaining()); } - private static void foreach(long from, long to1, long to2, long to3, SequentialMultipartUploadStrategy strategy) { + private static void foreach(long from, long to1, long to2, long to3, MultipartUploadSlicingAlgorithm algorithm) { long i = 0L, step = 1L; System.out.println("=== {" + from + "," + to1 + "} ==="); for (; i < to1 - from; step += i, i += step) { - calculate(i + from, strategy); + calculate(i + from, algorithm); } - calculate(to1, strategy); + calculate(to1, algorithm); System.out.println("=== {" + (to1 + 1) + "," + to2 + "} ==="); for (; i < to2 - to1; step += i / 20, i += step) { - calculate(i + from, strategy); + calculate(i + from, algorithm); } - calculate(to2, strategy); + calculate(to2, algorithm); System.out.println("=== {" + (to2 + 1) + "," + to3 + "} ==="); for (; i < to3 - to2; step += i / 40, i += step) { - calculate(i + from, strategy); + calculate(i + from, algorithm); } - calculate(to3, strategy); + calculate(to3, algorithm); } public static void main(String[] args) { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - - replay(ablobStore); - replay(slicer); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); foreach(1L, - strategy.defaultPartSize * strategy.magnitudeBase, - MultipartUploadStrategy.MAX_PART_SIZE * strategy.magnitudeBase, + algorithm.defaultPartSize * algorithm.magnitudeBase, + MultipartUploadStrategy.MAX_PART_SIZE * algorithm.magnitudeBase, MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_PARTS, - strategy); - - verify(ablobStore); - verify(slicer); + algorithm); } } diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuPartitioningAlgorithmTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuPartitioningAlgorithmTest.java index 0c6927f337..bf65f0c310 100644 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuPartitioningAlgorithmTest.java +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuPartitioningAlgorithmTest.java @@ -19,18 +19,13 @@ package org.jclouds.aws.s3.blobstore.strategy.internal; -import static org.easymock.classextension.EasyMock.createMock; -import static org.easymock.classextension.EasyMock.replay; -import static org.easymock.classextension.EasyMock.verify; import static org.testng.Assert.assertEquals; -import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; -import org.jclouds.io.PayloadSlicer; import org.testng.annotations.Test; /** - * Tests behavior of {@code SequentialMultipartUploadStrategy} from the perspective of + * Tests behavior of {@code MultipartUploadSlicingAlgorithm} from the perspective of * partitioning algorithm * * @author Tibor Kiss @@ -40,100 +35,76 @@ public class MpuPartitioningAlgorithmTest { /** * Below 1 parts the MPU is not used. - * When we have more than {@code SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE} bytes data, + * When we have more than {@code MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE} bytes data, * the MPU starts to become active. */ @Test public void testLowerLimitFromWhereMultipartBecomeActive() { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - - replay(ablobStore); - replay(slicer); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + MultipartUploadSlicingAlgorithm strategy = new MultipartUploadSlicingAlgorithm(); // exactly the MIN_PART_SIZE long length = MultipartUploadStrategy.MIN_PART_SIZE; long chunkSize = strategy.calculateChunkSize(length); - assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE); assertEquals(strategy.getParts(), 0); assertEquals(strategy.getRemaining(), length); assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); // below DEFAULT_PART_SIZE - length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE; + length = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE; chunkSize = strategy.calculateChunkSize(length); - assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE); assertEquals(strategy.getParts(), 0); assertEquals(strategy.getRemaining(), length); assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); // exactly the DEFAULT_PART_SIZE - length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE + 1; + length = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE + 1; chunkSize = strategy.calculateChunkSize(length); - assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE); assertEquals(strategy.getParts(), 1); assertEquals(strategy.getRemaining(), 1); assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); - - verify(ablobStore); - verify(slicer); } /** * Phase 1 of the algorithm. * ChunkSize does not grow from a {@code MultipartUploadStrategy.DEFAULT_PART_SIZE} - * until we reach {@code SequentialMultipartUploadStrategy.MAGNITUDE_BASE} number of parts. + * until we reach {@code MultipartUploadSlicingAlgorithm.MAGNITUDE_BASE} number of parts. */ @Test public void testWhenChunkSizeHasToStartGrowing() { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - - replay(ablobStore); - replay(slicer); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + MultipartUploadSlicingAlgorithm strategy = new MultipartUploadSlicingAlgorithm(); // upper limit while we still have exactly defaultPartSize chunkSize - long length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE * SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE; + long length = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE * MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE; long chunkSize = strategy.calculateChunkSize(length); - assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); - assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE - 1); - assertEquals(strategy.getRemaining(), SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE); + assertEquals(strategy.getParts(), MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE - 1); + assertEquals(strategy.getRemaining(), MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE); assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); // then chunkSize is increasing length += 1; chunkSize = strategy.calculateChunkSize(length); - assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE * 2); - assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE / 2); + assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE * 2); + assertEquals(strategy.getParts(), MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE / 2); assertEquals(strategy.getRemaining(), 1); assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); - - verify(ablobStore); - verify(slicer); } /** * Phase 2 of the algorithm. - * The number of parts does not grow from {@code SequentialMultipartUploadStrategy.MAGNITUDE_BASE} + * The number of parts does not grow from {@code MultipartUploadSlicingAlgorithm.MAGNITUDE_BASE} * until we reach the {@code MultipartUploadStrategy.MAX_PART_SIZE}. */ @Test public void testWhenPartsHasToStartGrowingFromMagnitudeBase() { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - - replay(ablobStore); - replay(slicer); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + MultipartUploadSlicingAlgorithm strategy = new MultipartUploadSlicingAlgorithm(); // upper limit while we still have exactly MAGNITUDE_BASE parts (together with the remaining) - long length = MultipartUploadStrategy.MAX_PART_SIZE * SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE; + long length = MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE; long chunkSize = strategy.calculateChunkSize(length); assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE); - assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE - 1); + assertEquals(strategy.getParts(), MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE - 1); assertEquals(strategy.getRemaining(), MultipartUploadStrategy.MAX_PART_SIZE); assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); @@ -141,12 +112,9 @@ public class MpuPartitioningAlgorithmTest { length += 1; chunkSize = strategy.calculateChunkSize(length); assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE); - assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE); + assertEquals(strategy.getParts(), MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE); assertEquals(strategy.getRemaining(), 1); assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); - - verify(ablobStore); - verify(slicer); } /** @@ -156,13 +124,7 @@ public class MpuPartitioningAlgorithmTest { */ @Test public void testWhenPartsExceedsMaxNumberOfParts() { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - - replay(ablobStore); - replay(slicer); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + MultipartUploadSlicingAlgorithm strategy = new MultipartUploadSlicingAlgorithm(); // upper limit while we still have exactly MAX_NUMBER_OF_PARTS parts (together with the remaining) long length = MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_PARTS; long chunkSize = strategy.calculateChunkSize(length); @@ -178,8 +140,5 @@ public class MpuPartitioningAlgorithmTest { assertEquals(strategy.getParts(), MultipartUploadStrategy.MAX_NUMBER_OF_PARTS); assertEquals(strategy.getRemaining(), 1); assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); - - verify(ablobStore); - verify(slicer); } } diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java index 6f67f42272..957e7bc59d 100644 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java @@ -27,17 +27,13 @@ import static org.easymock.classextension.EasyMock.verify; import static org.testng.Assert.fail; import java.util.SortedMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.easymock.EasyMock; import org.jclouds.aws.s3.AWSS3Client; import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.MutableBlobMetadata; -import org.jclouds.concurrent.Timeout; import org.jclouds.io.MutableContentMetadata; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; @@ -74,7 +70,7 @@ public class SequentialMultipartUploadStrategyTest { AWSS3Client client = createMock(AWSS3Client.class); ObjectMetadata ometa = createMock(ObjectMetadata.class); String uploadId = "uploadId"; - long chunkSize = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE; + long chunkSize = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE; long remaining = 100L; SortedMap etags = Maps.newTreeMap(); etags.put(new Integer(1), "eTag1"); @@ -137,7 +133,7 @@ public class SequentialMultipartUploadStrategyTest { AWSS3Client client = createMock(AWSS3Client.class); ObjectMetadata ometa = createMock(ObjectMetadata.class); String uploadId = "uploadId"; - long chunkSize = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE; + long chunkSize = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE; long remaining = 100L; SortedMap etags = Maps.newTreeMap(); etags.put(new Integer(1), "eTag1");