diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index 98d0718005..f775295771 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -39,6 +39,7 @@ import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; import org.jclouds.s3.domain.ObjectMetadataBuilder; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; /** @@ -58,8 +59,8 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg @Named(BlobStoreConstants.BLOBSTORE_LOGGER) protected Logger logger = Logger.NULL; - private final long DEFAULT_PART_SIZE = 33554432; // 32mb - private final int MAGNITUDE_BASE = 100; + static final long DEFAULT_PART_SIZE = 33554432; // 32mb + static final int MAGNITUDE_BASE = 100; private final AWSS3BlobStore ablobstore; private final PayloadSlicer slicer; @@ -80,6 +81,7 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg this.slicer = checkNotNull(slicer, "slicer"); } + @VisibleForTesting protected long calculateChunkSize(long length) { long unitPartSize = DEFAULT_PART_SIZE; // first try with default part size long parts = length / unitPartSize; @@ -92,6 +94,14 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg unitPartSize = MAX_PART_SIZE; } parts = length / partSize; + if (parts * partSize < length) { + partSize = (magnitude + 1) * unitPartSize; + if (partSize > MAX_PART_SIZE) { + partSize = MAX_PART_SIZE; + unitPartSize = MAX_PART_SIZE; + } + parts = length / partSize; + } } if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or // cannot be split @@ -114,6 +124,7 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg return this.chunkSize; } + @VisibleForTesting protected long getParts() { return parts; } @@ -132,10 +143,12 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg return next; } + @VisibleForTesting protected long getChunkSize() { return chunkSize; } + @VisibleForTesting protected long getRemaining() { return remaining; } diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuGraphData.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuGraphData.java new file mode 100644 index 0000000000..6e8b9a506c --- /dev/null +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuGraphData.java @@ -0,0 +1,79 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static org.easymock.classextension.EasyMock.createMock; +import static org.easymock.classextension.EasyMock.replay; +import static org.easymock.classextension.EasyMock.verify; + +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.io.PayloadSlicer; + +/** + * Print out on the console some graph data regarding the partitioning algorithm. + * + * @author Tibor Kiss + */ +public class MpuGraphData { + + private static void calculate(long length, SequentialMultipartUploadStrategy strategy) { + System.out.println("" + length + " " + strategy.getParts() + " " + + strategy.calculateChunkSize(length) + " " + + strategy.getRemaining()); + } + + private static void foreach(long from, long to1, long to2, long to3, SequentialMultipartUploadStrategy strategy) { + long i = 0L, step = 1L; + System.out.println("=== {" + from + "," + to1 + "} ==="); + for (; i < to1 - from; step += i, i += step) { + calculate(i + from, strategy); + } + calculate(to1, strategy); + System.out.println("=== {" + (to1 + 1) + "," + to2 + "} ==="); + for (; i < to2 - to1; step += i / 20, i += step) { + calculate(i + from, strategy); + } + calculate(to2, strategy); + System.out.println("=== {" + (to2 + 1) + "," + to3 + "} ==="); + for (; i < to3 - to2; step += i / 40, i += step) { + calculate(i + from, strategy); + } + calculate(to3, strategy); + } + + public static void main(String[] args) { + AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); + PayloadSlicer slicer = createMock(PayloadSlicer.class); + + replay(ablobStore); + replay(slicer); + + SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + foreach(1L, + SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE * SequentialMultipartUploadStrategy.MAGNITUDE_BASE, + MultipartUploadStrategy.MAX_PART_SIZE * SequentialMultipartUploadStrategy.MAGNITUDE_BASE, + MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_PARTS, + strategy); + + verify(ablobStore); + verify(slicer); + } + +} diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuPartitioningAlgorithmTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuPartitioningAlgorithmTest.java new file mode 100644 index 0000000000..861ad4e72a --- /dev/null +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/MpuPartitioningAlgorithmTest.java @@ -0,0 +1,185 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static org.easymock.classextension.EasyMock.createMock; +import static org.easymock.classextension.EasyMock.replay; +import static org.easymock.classextension.EasyMock.verify; +import static org.testng.Assert.assertEquals; + +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; +import org.jclouds.io.PayloadSlicer; +import org.testng.annotations.Test; + +/** + * Tests behavior of {@code SequentialMultipartUploadStrategy} from the perspective of + * partitioning algorithm + * + * @author Tibor Kiss + */ +@Test(groups = "unit") +public class MpuPartitioningAlgorithmTest { + + /** + * Below 1 parts the MPU is not used. + * When we have more than {@code SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE} bytes data, + * the MPU starts to become active. + */ + @Test + public void testLowerLimitFromWhereMultipartBecomeActive() { + AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); + PayloadSlicer slicer = createMock(PayloadSlicer.class); + + replay(ablobStore); + replay(slicer); + + SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + + // exactly the MIN_PART_SIZE + long length = MultipartUploadStrategy.MIN_PART_SIZE; + long chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(strategy.getParts(), 0); + assertEquals(strategy.getRemaining(), length); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + // below DEFAULT_PART_SIZE + length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE; + chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(strategy.getParts(), 0); + assertEquals(strategy.getRemaining(), length); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + // exactly the DEFAULT_PART_SIZE + length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE + 1; + chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(strategy.getParts(), 1); + assertEquals(strategy.getRemaining(), 1); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + verify(ablobStore); + verify(slicer); + } + + /** + * Phase 1 of the algorithm. + * ChunkSize does not grow from a {@code MultipartUploadStrategy.DEFAULT_PART_SIZE} + * until we reach {@code SequentialMultipartUploadStrategy.MAGNITUDE_BASE} number of parts. + */ + @Test + public void testWhenChunkSizeHasToStartGrowing() { + AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); + PayloadSlicer slicer = createMock(PayloadSlicer.class); + + replay(ablobStore); + replay(slicer); + + SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + // upper limit while we still have exactly DEFAULT_PART_SIZE chunkSize + long length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE * SequentialMultipartUploadStrategy.MAGNITUDE_BASE; + long chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.MAGNITUDE_BASE - 1); + assertEquals(strategy.getRemaining(), SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + // then chunkSize is increasing + length += 1; + chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE * 2); + assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.MAGNITUDE_BASE / 2); + assertEquals(strategy.getRemaining(), 1); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + verify(ablobStore); + verify(slicer); + } + + /** + * Phase 2 of the algorithm. + * The number of parts does not grow from {@code SequentialMultipartUploadStrategy.MAGNITUDE_BASE} + * until we reach the {@code MultipartUploadStrategy.MAX_PART_SIZE}. + */ + @Test + public void testWhenPartsHasToStartGrowingFromMagnitudeBase() { + AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); + PayloadSlicer slicer = createMock(PayloadSlicer.class); + + replay(ablobStore); + replay(slicer); + + SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + // upper limit while we still have exactly MAGNITUDE_BASE parts (together with the remaining) + long length = MultipartUploadStrategy.MAX_PART_SIZE * SequentialMultipartUploadStrategy.MAGNITUDE_BASE; + long chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE); + assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.MAGNITUDE_BASE - 1); + assertEquals(strategy.getRemaining(), MultipartUploadStrategy.MAX_PART_SIZE); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + // then the number of parts is increasing + length += 1; + chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE); + assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.MAGNITUDE_BASE); + assertEquals(strategy.getRemaining(), 1); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + verify(ablobStore); + verify(slicer); + } + + /** + * Phase 3 of the algorithm. + * The number of parts are increasing until {@code MAX_NUMBER_OF_PARTS} + * while its size does not exceeds the {@code MultipartUploadStrategy.MAX_PART_SIZE}. + */ + @Test + public void testWhenPartsExceedsMaxNumberOfParts() { + AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); + PayloadSlicer slicer = createMock(PayloadSlicer.class); + + replay(ablobStore); + replay(slicer); + + SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + // upper limit while we still have exactly MAX_NUMBER_OF_PARTS parts (together with the remaining) + long length = MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_PARTS; + long chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE); + assertEquals(strategy.getParts(), MultipartUploadStrategy.MAX_NUMBER_OF_PARTS - 1); + assertEquals(strategy.getRemaining(), MultipartUploadStrategy.MAX_PART_SIZE); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + // then the number of parts is increasing + length += 1; + chunkSize = strategy.calculateChunkSize(length); + assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE); + assertEquals(strategy.getParts(), MultipartUploadStrategy.MAX_NUMBER_OF_PARTS); + assertEquals(strategy.getRemaining(), 1); + assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length); + + verify(ablobStore); + verify(slicer); + } +} diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java new file mode 100644 index 0000000000..d240e56ea5 --- /dev/null +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java @@ -0,0 +1,115 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static org.easymock.EasyMock.expect; +import static org.easymock.classextension.EasyMock.createMock; +import static org.easymock.classextension.EasyMock.replay; +import static org.easymock.classextension.EasyMock.verify; + +import java.util.SortedMap; + +import org.jclouds.aws.s3.AWSS3Client; +import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.MutableBlobMetadata; +import org.jclouds.io.MutableContentMetadata; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.rest.RestContext; +import org.jclouds.rest.internal.RestContextImpl; +import org.jclouds.s3.domain.ObjectMetadata; +import org.jclouds.s3.domain.ObjectMetadataBuilder; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; + +/** + * Tests behavior of {@code SequentialMultipartUploadStrategy} + * + * @author Tibor Kiss + */ +@Test(groups = "unit") +public class SequentialMultipartUploadStrategyTest { + + @Test + public void testWithTwoParts() { + AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); + PayloadSlicer slicer = createMock(PayloadSlicer.class); + String container = "container"; + String key = "mpu-test"; + Blob blob = createMock(Blob.class); + MutableBlobMetadata blobMeta = createMock(MutableBlobMetadata.class); + Payload payload = createMock(Payload.class); + MutableContentMetadata contentMeta = createMock(MutableContentMetadata.class); + BlobStoreContext context = createMock(BlobStoreContext.class); + @SuppressWarnings("unchecked") + RestContext psc = createMock(RestContextImpl.class); + AWSS3Client client = createMock(AWSS3Client.class); + ObjectMetadata ometa = createMock(ObjectMetadata.class); + String uploadId = "uploadId"; + long chunkSize = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE; + long remaining = 100L; + SortedMap etags = Maps.newTreeMap(); + etags.put(new Integer(1), "eTag1"); + etags.put(new Integer(2), "eTag2"); + + expect(blob.getMetadata()).andReturn(blobMeta).atLeastOnce(); + expect(blobMeta.getName()).andReturn(key).atLeastOnce(); + expect(blob.getPayload()).andReturn(payload).atLeastOnce(); + expect(payload.getContentMetadata()).andReturn(contentMeta).atLeastOnce(); + expect(contentMeta.getContentLength()).andReturn(new Long(chunkSize + remaining)); + expect(ablobStore.getContext()).andReturn(context).atLeastOnce(); + expect(context.getProviderSpecificContext()).andReturn(psc).atLeastOnce(); + expect(psc.getApi()).andReturn(client).atLeastOnce(); + expect(client.initiateMultipartUpload(container, new ObjectMetadataBuilder().key(key).build())).andReturn("uploadId").atLeastOnce(); + expect(slicer.slice(payload, 0, chunkSize)).andReturn(payload).atLeastOnce(); + expect(client.uploadPart(container, key, 1, uploadId, payload)).andReturn("eTag1").atLeastOnce(); + expect(slicer.slice(payload, chunkSize, remaining)).andReturn(payload).atLeastOnce(); + expect(client.uploadPart(container, key, 2, uploadId, payload)).andReturn("eTag2").atLeastOnce(); + expect(client.completeMultipartUpload(container, key, uploadId, etags)).andReturn("eTag").atLeastOnce(); + + replay(ablobStore); + replay(slicer); + replay(blob); + replay(blobMeta); + replay(payload); + replay(contentMeta); + replay(context); + replay(psc); + replay(client); + replay(ometa); + + SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); + strategy.execute(container, blob); + + verify(ablobStore); + verify(slicer); + verify(blob); + verify(blobMeta); + verify(payload); + verify(contentMeta); + verify(context); + verify(psc); + verify(client); + verify(ometa); + } +}