From 1eab85197d8e705d3589bfb70c341917869307cd Mon Sep 17 00:00:00 2001 From: adriancole Date: Mon, 8 Apr 2013 18:03:37 -0700 Subject: [PATCH 1/2] refactored aws-s3 MPU test --- providers/aws-s3/pom.xml | 5 + .../java/org/jclouds/aws/s3/AWSS3Client.java | 1 - .../aws/s3/blobstore/AWSS3BlobStore.java | 2 +- .../strategy/MultipartUploadStrategy.java | 3 +- .../SequentialMultipartUploadStrategy.java | 124 ++++++----- ...entialMultipartUploadStrategyMockTest.java | 146 +++++++++++++ ...SequentialMultipartUploadStrategyTest.java | 193 ------------------ 7 files changed, 212 insertions(+), 262 deletions(-) create mode 100644 providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java delete mode 100644 providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java diff --git a/providers/aws-s3/pom.xml b/providers/aws-s3/pom.xml index e3d7a6ead9..5387384e70 100644 --- a/providers/aws-s3/pom.xml +++ b/providers/aws-s3/pom.xml @@ -111,6 +111,11 @@ ${project.version} test + + com.google.mockwebserver + mockwebserver + test + diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java index 4072a2ed50..b9d8974661 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java @@ -19,7 +19,6 @@ package org.jclouds.aws.s3; import java.util.Map; -import java.util.Set; import org.jclouds.aws.s3.domain.DeleteResult; import org.jclouds.io.Payload; import org.jclouds.s3.S3Client; diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java index 3c610d5ebe..077d1c4fcf 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java @@ -88,7 +88,7 @@ public class AWSS3BlobStore extends S3BlobStore { public String putBlob(String container, Blob blob, PutOptions options) { if (options.isMultipart()) { // need to use a provider if the strategy object is stateful - return multipartUploadStrategy.get().execute(container, blob, options); + return multipartUploadStrategy.get().execute(container, blob); } else if ((options instanceof AWSS3PutOptions) && (((AWSS3PutOptions) options).getStorageClass() == REDUCED_REDUNDANCY)) { diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java index a511ffb7ae..40c4ec5ebb 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java @@ -20,7 +20,6 @@ package org.jclouds.aws.s3.blobstore.strategy; import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.PutOptions; import com.google.inject.ImplementedBy; @@ -32,5 +31,5 @@ import com.google.inject.ImplementedBy; @ImplementedBy(SequentialMultipartUploadStrategy.class) public interface MultipartUploadStrategy extends MultipartUpload { - String execute(String container, Blob blob, PutOptions options); + String execute(String container, Blob blob); } diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index 22529b7e23..834dcc4b36 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -25,19 +25,16 @@ import java.util.SortedMap; import javax.annotation.Resource; import javax.inject.Named; -import org.jclouds.aws.s3.AWSS3ApiMetadata; import org.jclouds.aws.s3.AWSS3Client; -import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.PutOptions; import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; +import org.jclouds.s3.blobstore.functions.BlobToObject; import org.jclouds.s3.domain.ObjectMetadataBuilder; -import org.jclouds.util.Throwables2; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -47,86 +44,83 @@ import com.google.inject.Inject; * * The file partitioning algorithm: * - * The default partition size we choose is 32mb. A multiple of this default partition size is used. - * The number of parts first grows to a chosen magnitude (for example 100 parts), then it grows the - * partition size instead of number of partitions. When we reached the maximum part size, then again - * it starts to grow the number of partitions. + * The default partition size we choose is 32mb. A multiple of this default + * partition size is used. The number of partCount first grows to a chosen magnitude + * (for example 100 partCount), then it grows the partition size instead of number + * of partitions. When we reached the maximum part size, then again it starts to + * grow the number of partitions. * - * @author Tibor Kiss + * @author Tibor Kiss, Adrian Cole */ public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { @Resource @Named(BlobStoreConstants.BLOBSTORE_LOGGER) - protected Logger logger = Logger.NULL; + private Logger logger = Logger.NULL; - protected final AWSS3BlobStore ablobstore; - protected final PayloadSlicer slicer; + private final AWSS3Client client; + private final BlobToObject blobToObject; + private final MultipartUploadSlicingAlgorithm algorithm; + private final PayloadSlicer slicer; @Inject - public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) { - this.ablobstore = checkNotNull(ablobstore, "ablobstore"); + public SequentialMultipartUploadStrategy(AWSS3Client client, BlobToObject blobToObject, + MultipartUploadSlicingAlgorithm algorithm, PayloadSlicer slicer) { + this.client = checkNotNull(client, "client"); + this.blobToObject = checkNotNull(blobToObject, "blobToObject"); + this.algorithm = checkNotNull(algorithm, "algorithm"); this.slicer = checkNotNull(slicer, "slicer"); } - - protected void prepareUploadPart(String container, String key, String uploadId, int part, - Payload payload, long offset, long size, SortedMap etags) { - AWSS3Client client = ablobstore.getContext().unwrap(AWSS3ApiMetadata.CONTEXT_TOKEN).getApi(); + + @Override + public String execute(String container, Blob blob) { + String key = blob.getMetadata().getName(); + Payload payload = blob.getPayload(); + Long length = payload.getContentMetadata().getContentLength(); + checkNotNull(length, + "please invoke payload.getContentMetadata().setContentLength(length) prior to multipart upload"); + long chunkSize = algorithm.calculateChunkSize(length); + int partCount = algorithm.getParts(); + if (partCount > 0) { + String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); + try { + SortedMap etags = Maps.newTreeMap(); + int part; + while ((part = algorithm.getNextPart()) <= partCount) { + prepareUploadPart(container, key, uploadId, part, payload, algorithm.getNextChunkOffset(), chunkSize, + etags); + } + long remaining = algorithm.getRemaining(); + if (remaining > 0) { + prepareUploadPart(container, key, uploadId, part, payload, algorithm.getNextChunkOffset(), remaining, + etags); + } + return client.completeMultipartUpload(container, key, uploadId, etags); + } catch (RuntimeException ex) { + client.abortMultipartUpload(container, key, uploadId); + throw ex; + } + } else { + // TODO: find a way to disable multipart. if we pass the original + // options, it goes into a stack overflow + return client.putObject(container, blobToObject.apply(blob)); + } + } + + private void prepareUploadPart(String container, String key, String uploadId, int part, Payload payload, + long offset, long size, SortedMap etags) { Payload chunkedPart = slicer.slice(payload, offset, size); String eTag = null; try { eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); etags.put(Integer.valueOf(part), eTag); } catch (KeyNotFoundException e) { - // note that because of eventual consistency, the upload id may not be present yet - // we may wish to add this condition to the retry handler + // note that because of eventual consistency, the upload id may not be + // present yet we may wish to add this condition to the retry handler - // we may also choose to implement ListParts and wait for the uploadId to become - // available there. + // we may also choose to implement ListParts and wait for the uploadId + // to become available there. eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); etags.put(Integer.valueOf(part), eTag); } } - - @Override - public String execute(String container, Blob blob, PutOptions options) { - String key = blob.getMetadata().getName(); - Payload payload = blob.getPayload(); - MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); - algorithm - .calculateChunkSize(checkNotNull( - payload.getContentMetadata().getContentLength(), - "contentLength required on all uploads to amazon s3; please invoke payload.getContentMetadata().setContentLength(length) first")); - int parts = algorithm.getParts(); - long chunkSize = algorithm.getChunkSize(); - if (parts > 0) { - AWSS3Client client = (AWSS3Client) ablobstore.getContext() - .unwrap(AWSS3ApiMetadata.CONTEXT_TOKEN).getApi(); - String uploadId = client.initiateMultipartUpload(container, - ObjectMetadataBuilder.create().key(key).build()); // TODO md5 - try { - SortedMap etags = Maps.newTreeMap(); - int part; - while ((part = algorithm.getNextPart()) <= parts) { - prepareUploadPart(container, key, uploadId, part, payload, - algorithm.getNextChunkOffset(), chunkSize, etags); - } - long remaining = algorithm.getRemaining(); - if (remaining > 0) { - prepareUploadPart(container, key, uploadId, part, payload, - algorithm.getNextChunkOffset(), remaining, etags); - } - return client.completeMultipartUpload(container, key, uploadId, etags); - } catch (Exception ex) { - RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); - if (rtex == null) { - rtex = new RuntimeException(ex); - } - client.abortMultipartUpload(container, key, uploadId); - throw rtex; - } - } else { - // TODO: find a way to disable multipart. if we pass the original options, it goes into a stack overflow - return ablobstore.putBlob(container, blob, PutOptions.NONE); - } - } } diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java new file mode 100644 index 0000000000..249d9e97da --- /dev/null +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; +import static org.jclouds.Constants.PROPERTY_MAX_RETRIES; +import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT; +import static org.jclouds.s3.reference.S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS; +import static org.testng.Assert.assertEquals; + +import java.io.IOException; +import java.util.Properties; +import java.util.Set; + +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.domain.internal.BlobBuilderImpl; +import org.jclouds.concurrent.config.ExecutorServiceModule; +import org.jclouds.http.HttpResponseException; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Module; +import com.google.mockwebserver.MockResponse; +import com.google.mockwebserver.MockWebServer; +import com.google.mockwebserver.RecordedRequest; + +/** + * + * @author Adrian Cole + */ +@Test(singleThreaded = true) +public class SequentialMultipartUploadStrategyMockTest { + + @Test + public void testMPUDoesMultipart() throws IOException, InterruptedException { + MockWebServer server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(200).setBody("upload-id")); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "a00")); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "b00")); + server.enqueue(new MockResponse().setResponseCode(200).setBody("fff")); + server.play(); + + byte[] bytes = "0123456789abcdef".getBytes(Charsets.US_ASCII); + int partSize = bytes.length / 2; + + SequentialMultipartUploadStrategy api = mockSequentialMultipartUploadStrategy(server.getUrl("/").toString(), + partSize); + + try { + assertEquals(api.execute("container", new BlobBuilderImpl().name("foo").payload(bytes).build()), "fff"); + } finally { + + RecordedRequest initiate = server.takeRequest(); + assertEquals(initiate.getRequestLine(), "POST /container/foo?uploads HTTP/1.1"); + assertEquals(initiate.getHeader("Content-Length"), "0"); + + RecordedRequest part1 = server.takeRequest(); + assertEquals(part1.getRequestLine(), "PUT /container/foo?partNumber=1&uploadId=upload-id HTTP/1.1"); + assertEquals(part1.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part1.getBody()), "01234567"); + + RecordedRequest part2 = server.takeRequest(); + assertEquals(part2.getRequestLine(), "PUT /container/foo?partNumber=2&uploadId=upload-id HTTP/1.1"); + assertEquals(part2.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part2.getBody()), "89abcdef"); + + RecordedRequest manifest = server.takeRequest(); + assertEquals(manifest.getRequestLine(), "POST /container/foo?uploadId=upload-id HTTP/1.1"); + assertEquals(manifest.getHeader("Content-Length"), "161"); + assertEquals( + new String(manifest.getBody()), + "1a002b00"); + + server.shutdown(); + } + } + + @Test(expectedExceptions = HttpResponseException.class) + public void testMPUAbortsOnProblem() throws IOException, InterruptedException { + MockWebServer server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(200).setBody("upload-id")); + server.enqueue(new MockResponse().setResponseCode(400)); + server.enqueue(new MockResponse().setResponseCode(200)); + server.play(); + + byte[] bytes = "0123456789abcdef".getBytes(Charsets.US_ASCII); + int partSize = bytes.length / 2; + + SequentialMultipartUploadStrategy api = mockSequentialMultipartUploadStrategy(server.getUrl("/").toString(), + partSize); + + try { + assertEquals(api.execute("container", new BlobBuilderImpl().name("foo").payload(bytes).build()), "fff"); + } finally { + + RecordedRequest initiate = server.takeRequest(); + assertEquals(initiate.getRequestLine(), "POST /container/foo?uploads HTTP/1.1"); + assertEquals(initiate.getHeader("Content-Length"), "0"); + + RecordedRequest part1 = server.takeRequest(); + assertEquals(part1.getRequestLine(), "PUT /container/foo?partNumber=1&uploadId=upload-id HTTP/1.1"); + assertEquals(part1.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part1.getBody()), "01234567"); + + RecordedRequest abort = server.takeRequest(); + assertEquals(abort.getRequestLine(), "DELETE /container/foo?uploadId=upload-id HTTP/1.1"); + + server.shutdown(); + } + } + + private static final Set modules = ImmutableSet. of(new ExecutorServiceModule(sameThreadExecutor(), + sameThreadExecutor())); + + static SequentialMultipartUploadStrategy mockSequentialMultipartUploadStrategy(String uri, int partSize) { + Properties overrides = new Properties(); + overrides.setProperty(PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); + // prevent expect-100 bug http://code.google.com/p/mockwebserver/issues/detail?id=6 + overrides.setProperty(PROPERTY_SO_TIMEOUT, "0"); + overrides.setProperty(PROPERTY_MAX_RETRIES, "1"); + overrides.setProperty("jclouds.mpu.parts.size", String.valueOf(partSize)); + return ContextBuilder.newBuilder("aws-s3") + .credentials("accessKey", "secretKey") + .endpoint(uri) + .overrides(overrides) + .modules(modules) + .buildInjector().getInstance(SequentialMultipartUploadStrategy.class); + } +} diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java deleted file mode 100644 index 5303de59ea..0000000000 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to jclouds, Inc. (jclouds) under one or more - * contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. jclouds licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.jclouds.aws.s3.blobstore.strategy.internal; - -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; -import static org.testng.Assert.fail; - -import java.util.SortedMap; -import java.util.concurrent.TimeoutException; - -import org.jclouds.aws.s3.AWSS3ApiMetadata; -import org.jclouds.aws.s3.AWSS3AsyncClient; -import org.jclouds.aws.s3.AWSS3Client; -import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; -import org.jclouds.blobstore.BlobStoreContext; -import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.domain.MutableBlobMetadata; -import org.jclouds.blobstore.options.PutOptions; -import org.jclouds.io.MutableContentMetadata; -import org.jclouds.io.Payload; -import org.jclouds.io.PayloadSlicer; -import org.jclouds.rest.RestContext; -import org.jclouds.rest.internal.RestContextImpl; -import org.jclouds.s3.domain.ObjectMetadata; -import org.jclouds.s3.domain.ObjectMetadataBuilder; -import org.jclouds.util.Throwables2; -import org.testng.annotations.Test; - -import com.google.common.collect.Maps; - -/** - * Tests behavior of {@code SequentialMultipartUploadStrategy} - * - * @author Tibor Kiss - */ -@Test(groups = "unit") -public class SequentialMultipartUploadStrategyTest { - - @SuppressWarnings("unchecked") - @Test - public void testWithTwoParts() { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - String container = "container"; - String key = "mpu-test"; - Blob blob = createMock(Blob.class); - MutableBlobMetadata blobMeta = createMock(MutableBlobMetadata.class); - Payload payload = createMock(Payload.class); - MutableContentMetadata contentMeta = createMock(MutableContentMetadata.class); - BlobStoreContext context = createMock(BlobStoreContext.class); - RestContext psc = createMock(RestContextImpl.class); - AWSS3Client client = createMock(AWSS3Client.class); - ObjectMetadata ometa = createMock(ObjectMetadata.class); - String uploadId = "uploadId"; - long chunkSize = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE; - long remaining = 100L; - SortedMap etags = Maps.newTreeMap(); - etags.put(Integer.valueOf(1), "eTag1"); - etags.put(Integer.valueOf(2), "eTag2"); - - expect(blob.getMetadata()).andReturn(blobMeta).atLeastOnce(); - expect(blobMeta.getName()).andReturn(key).atLeastOnce(); - expect(blob.getPayload()).andReturn(payload).atLeastOnce(); - expect(payload.getContentMetadata()).andReturn(contentMeta).atLeastOnce(); - expect(contentMeta.getContentLength()).andReturn(Long.valueOf(chunkSize + remaining)); - expect(ablobStore.getContext()).andReturn(context).atLeastOnce(); - expect(context.unwrap(AWSS3ApiMetadata.CONTEXT_TOKEN)).andReturn(psc).atLeastOnce(); - expect(psc.getApi()).andReturn(client).atLeastOnce(); - expect(client.initiateMultipartUpload(container, new ObjectMetadataBuilder().key(key).build())).andReturn("uploadId").atLeastOnce(); - expect(slicer.slice(payload, 0, chunkSize)).andReturn(payload).atLeastOnce(); - expect(client.uploadPart(container, key, 1, uploadId, payload)).andReturn("eTag1").atLeastOnce(); - expect(slicer.slice(payload, chunkSize, remaining)).andReturn(payload).atLeastOnce(); - expect(client.uploadPart(container, key, 2, uploadId, payload)).andReturn("eTag2").atLeastOnce(); - expect(client.completeMultipartUpload(container, key, uploadId, etags)).andReturn("eTag").atLeastOnce(); - - replay(ablobStore); - replay(slicer); - replay(blob); - replay(blobMeta); - replay(payload); - replay(contentMeta); - replay(context); - replay(psc); - replay(client); - replay(ometa); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); - strategy.execute(container, blob, PutOptions.NONE); - - verify(ablobStore); - verify(slicer); - verify(blob); - verify(blobMeta); - verify(payload); - verify(contentMeta); - verify(context); - verify(psc); - verify(client); - verify(ometa); - } - - @SuppressWarnings("unchecked") - @Test - public void testWithTimeout() { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - String container = "container"; - String key = "mpu-test"; - Blob blob = createMock(Blob.class); - MutableBlobMetadata blobMeta = createMock(MutableBlobMetadata.class); - Payload payload = createMock(Payload.class); - MutableContentMetadata contentMeta = createMock(MutableContentMetadata.class); - BlobStoreContext context = createMock(BlobStoreContext.class); - RestContext psc = createMock(RestContextImpl.class); - AWSS3Client client = createMock(AWSS3Client.class); - ObjectMetadata ometa = createMock(ObjectMetadata.class); - String uploadId = "uploadId"; - long chunkSize = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE; - long remaining = 100L; - SortedMap etags = Maps.newTreeMap(); - etags.put(Integer.valueOf(1), "eTag1"); - etags.put(Integer.valueOf(2), "eTag2"); - - expect(blob.getMetadata()).andReturn(blobMeta).atLeastOnce(); - expect(blobMeta.getName()).andReturn(key).atLeastOnce(); - expect(blob.getPayload()).andReturn(payload).atLeastOnce(); - expect(payload.getContentMetadata()).andReturn(contentMeta).atLeastOnce(); - expect(contentMeta.getContentLength()).andReturn(Long.valueOf(chunkSize + remaining)); - expect(ablobStore.getContext()).andReturn(context).atLeastOnce(); - expect(context.unwrap(AWSS3ApiMetadata.CONTEXT_TOKEN)).andReturn(psc).atLeastOnce(); - expect(psc.getApi()).andReturn(client).atLeastOnce(); - expect(client.initiateMultipartUpload(container, new ObjectMetadataBuilder().key(key).build())).andReturn("uploadId").atLeastOnce(); - expect(slicer.slice(payload, 0, chunkSize)).andReturn(payload).atLeastOnce(); - expect(client.uploadPart(container, key, 1, uploadId, payload)).andReturn("eTag1").atLeastOnce(); - expect(slicer.slice(payload, chunkSize, remaining)).andReturn(payload).atLeastOnce(); - expect(client.uploadPart(container, key, 2, uploadId, payload)).andThrow(new RuntimeException(new TimeoutException())); - client.abortMultipartUpload(container, key, uploadId); - expectLastCall().atLeastOnce(); - - replay(ablobStore); - replay(slicer); - replay(blob); - replay(blobMeta); - replay(payload); - replay(contentMeta); - replay(context); - replay(psc); - replay(client); - replay(ometa); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); - try { - strategy.execute(container, blob, PutOptions.NONE); - fail("Should throw RuntimeException with TimeoutException cause!"); - } catch (RuntimeException rtex) { - TimeoutException timeout = Throwables2.getFirstThrowableOfType(rtex, TimeoutException.class); - if (timeout == null) { - throw rtex; - } - } - - verify(ablobStore); - verify(slicer); - verify(blob); - verify(blobMeta); - verify(payload); - verify(contentMeta); - verify(context); - verify(psc); - verify(client); - verify(ometa); - } -} From dc12122afd44f185ad7e8e6af6c7bf59823bf704 Mon Sep 17 00:00:00 2001 From: adriancole Date: Mon, 8 Apr 2013 19:12:38 -0700 Subject: [PATCH 2/2] refactored swift MPU and added test --- apis/swift/pom.xml | 5 + .../swift/blobstore/SwiftBlobStore.java | 2 +- .../blobstore/strategy/MultipartUpload.java | 18 +++ .../internal/MultipartUploadStrategy.java | 25 ++- .../SequentialMultipartUploadStrategy.java | 133 +++++++++------- ...entialMultipartUploadStrategyMockTest.java | 149 ++++++++++++++++++ 6 files changed, 266 insertions(+), 66 deletions(-) create mode 100644 apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java diff --git a/apis/swift/pom.xml b/apis/swift/pom.xml index a2f06c6a51..17a35d170b 100644 --- a/apis/swift/pom.xml +++ b/apis/swift/pom.xml @@ -110,6 +110,11 @@ log4j test + + com.google.mockwebserver + mockwebserver + test + diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index 2afc789227..0630c584ad 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -211,7 +211,7 @@ public class SwiftBlobStore extends BaseBlobStore { @Override public String putBlob(String container, Blob blob, PutOptions options) { if (options.isMultipart()) { - return multipartUploadStrategy.get().execute(container, blob, options, blob2Object); + return multipartUploadStrategy.get().execute(container, blob); } else { return putBlob(container, blob); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java index 9c2ead8253..d5aacdf1ca 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java @@ -1,3 +1,21 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jclouds.openstack.swift.blobstore.strategy; public interface MultipartUpload { diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java index c536a8e8c2..470308acb7 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java @@ -1,13 +1,30 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jclouds.openstack.swift.blobstore.strategy.internal; -import com.google.inject.ImplementedBy; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.PutOptions; -import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload; +import com.google.inject.ImplementedBy; + @ImplementedBy(SequentialMultipartUploadStrategy.class) public interface MultipartUploadStrategy extends MultipartUpload { - String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object); + String execute(String container, Blob blob); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index 9618e25aa3..19dd2f69b5 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -1,83 +1,94 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jclouds.openstack.swift.blobstore.strategy.internal; import static com.google.common.base.Preconditions.checkNotNull; import javax.annotation.Resource; import javax.inject.Named; +import javax.inject.Provider; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.blobstore.domain.BlobBuilder; import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; import org.jclouds.openstack.swift.CommonSwiftClient; -import org.jclouds.openstack.swift.SwiftApiMetadata; -import org.jclouds.openstack.swift.blobstore.SwiftBlobStore; import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; -import org.jclouds.util.Throwables2; import com.google.inject.Inject; - public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { - public static final String PART_SEPARATOR = "/"; + private static final String PART_SEPARATOR = "/"; - @Resource - @Named(BlobStoreConstants.BLOBSTORE_LOGGER) - protected Logger logger = Logger.NULL; + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + private Logger logger = Logger.NULL; - protected final SwiftBlobStore ablobstore; - protected final PayloadSlicer slicer; - - @Inject - public SequentialMultipartUploadStrategy(SwiftBlobStore ablobstore, PayloadSlicer slicer) { - this.ablobstore = checkNotNull(ablobstore, "ablobstore"); - this.slicer = checkNotNull(slicer, "slicer"); - } + private final CommonSwiftClient client; + private final Provider blobBuilders; + private final BlobToObject blob2Object; + private final MultipartUploadSlicingAlgorithm algorithm; + private final PayloadSlicer slicer; - @Override - public String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object) { - String key = blob.getMetadata().getName(); - Payload payload = blob.getPayload(); - MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); - algorithm - .calculateChunkSize(checkNotNull( - payload.getContentMetadata().getContentLength(), - "contentLength required on all uploads to swift; please invoke payload.getContentMetadata().setContentLength(length) first")); - int parts = algorithm.getParts(); - long chunkSize = algorithm.getChunkSize(); - if (parts > 0) { - CommonSwiftClient client = ablobstore.getContext().unwrap(SwiftApiMetadata.CONTEXT_TOKEN).getApi(); - try { - int part; - while ((part = algorithm.getNextPart()) <= parts) { - Payload chunkedPart = slicer.slice(payload, - algorithm.getNextChunkOffset(), chunkSize); - Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + - String.valueOf(part)).payload(chunkedPart).contentDisposition( - blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build(); - client.putObject(container, blob2Object.apply(blobPart)); - } - long remaining = algorithm.getRemaining(); - if (remaining > 0) { - Payload chunkedPart = slicer.slice(payload, - algorithm.getNextChunkOffset(), remaining); - Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + - String.valueOf(part)).payload(chunkedPart).contentDisposition( - blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build(); - client.putObject(container, blob2Object.apply(blobPart)); - } - return client.putObjectManifest(container, key); - } catch (Exception ex) { - RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); - if (rtex == null) { - rtex = new RuntimeException(ex); - } - throw rtex; - } - } else { - return ablobstore.putBlob(container, blob, PutOptions.NONE); - } - } + @Inject + public SequentialMultipartUploadStrategy(CommonSwiftClient client, Provider blobBuilders, + BlobToObject blob2Object, MultipartUploadSlicingAlgorithm algorithm, PayloadSlicer slicer) { + this.client = checkNotNull(client, "client"); + this.blobBuilders = checkNotNull(blobBuilders, "blobBuilders"); + this.blob2Object = checkNotNull(blob2Object, "blob2Object"); + this.algorithm = checkNotNull(algorithm, "algorithm"); + this.slicer = checkNotNull(slicer, "slicer"); + } + + @Override + public String execute(String container, Blob blob) { + String key = blob.getMetadata().getName(); + Payload payload = blob.getPayload(); + Long length = payload.getContentMetadata().getContentLength(); + checkNotNull(length, + "please invoke payload.getContentMetadata().setContentLength(length) prior to multipart upload"); + long chunkSize = algorithm.calculateChunkSize(length); + int partCount = algorithm.getParts(); + if (partCount > 0) { + int part; + while ((part = algorithm.getNextPart()) <= partCount) { + Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), chunkSize); + Blob blobPart = blobBuilders.get() + .name(key + PART_SEPARATOR + part) + .payload(chunkedPart) + .contentDisposition(key + PART_SEPARATOR + part).build(); + client.putObject(container, blob2Object.apply(blobPart)); + } + long remaining = algorithm.getRemaining(); + if (remaining > 0) { + Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), remaining); + Blob blobPart = blobBuilders.get() + .name(key + PART_SEPARATOR + part) + .payload(chunkedPart) + .contentDisposition(key + PART_SEPARATOR + part).build(); + client.putObject(container, blob2Object.apply(blobPart)); + } + return client.putObjectManifest(container, key); + } else { + return client.putObject(container, blob2Object.apply(blob)); + } + } } diff --git a/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java b/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java new file mode 100644 index 0000000000..dfa06ce804 --- /dev/null +++ b/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; +import static org.jclouds.Constants.PROPERTY_MAX_RETRIES; +import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT; +import static org.testng.Assert.assertEquals; + +import java.io.IOException; +import java.net.URL; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.domain.internal.BlobBuilderImpl; +import org.jclouds.concurrent.config.ExecutorServiceModule; +import org.jclouds.openstack.keystone.v2_0.internal.KeystoneFixture; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Module; +import com.google.mockwebserver.MockResponse; +import com.google.mockwebserver.MockWebServer; +import com.google.mockwebserver.QueueDispatcher; +import com.google.mockwebserver.RecordedRequest; + +/** + * + * @author Adrian Cole + */ +@Test(singleThreaded = true) +public class SequentialMultipartUploadStrategyMockTest { + + String authRequestBody = KeystoneFixture.INSTANCE.initialAuthWithUsernameAndPassword("user", "password") + .getPayload().getRawContent().toString(); + String authResponse = KeystoneFixture.INSTANCE.responseWithAccess().getPayload().getRawContent().toString() + .replace("https://objects.jclouds.org/v1.0/40806637803162", "URL"); + String token = "Auth_4f173437e4b013bee56d1007"; + + @Test + public void testMPUDoesMultipart() throws IOException, InterruptedException { + MockWebServer server = new MockWebServer(); + AtomicReference url = setURLReplacingDispatcher(server); + server.enqueue(new MockResponse().setResponseCode(200).setBody(authResponse)); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "a00")); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "b00")); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "fff")); + server.play(); + url.set(server.getUrl("/")); + + byte[] bytes = "0123456789abcdef".getBytes(Charsets.US_ASCII); + int partSize = bytes.length / 2; + SequentialMultipartUploadStrategy api = mockSequentialMultipartUploadStrategy(url.get().toString(), partSize); + + try { + assertEquals(api.execute("container", new BlobBuilderImpl().name("foo").payload(bytes).build()), "fff"); + } finally { + RecordedRequest authRequest = server.takeRequest(); + assertEquals(authRequest.getRequestLine(), "POST /tokens HTTP/1.1"); + assertEquals(new String(authRequest.getBody()), authRequestBody); + assertEquals(authRequest.getHeader("Content-Length"), String.valueOf(authRequestBody.length())); + + RecordedRequest part1 = server.takeRequest(); + assertEquals(part1.getRequestLine(), "PUT /container/foo/1 HTTP/1.1"); + assertEquals(part1.getHeader("X-Auth-Token"), token); + assertEquals(part1.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part1.getBody()), "01234567"); + + RecordedRequest part2 = server.takeRequest(); + assertEquals(part2.getRequestLine(), "PUT /container/foo/2 HTTP/1.1"); + assertEquals(part2.getHeader("X-Auth-Token"), token); + assertEquals(part2.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part2.getBody()), "89abcdef"); + + RecordedRequest manifest = server.takeRequest(); + assertEquals(manifest.getRequestLine(), "PUT /container/foo HTTP/1.1"); + assertEquals(manifest.getHeader("X-Auth-Token"), token); + assertEquals(manifest.getHeader("Content-Length"), "0"); + + server.shutdown(); + } + } + + private static final Set modules = ImmutableSet. of(new ExecutorServiceModule(sameThreadExecutor(), + sameThreadExecutor())); + + static SequentialMultipartUploadStrategy mockSequentialMultipartUploadStrategy(String uri, int partSize) { + Properties overrides = new Properties(); + // prevent expect-100 bug http://code.google.com/p/mockwebserver/issues/detail?id=6 + overrides.setProperty(PROPERTY_SO_TIMEOUT, "0"); + overrides.setProperty(PROPERTY_MAX_RETRIES, "1"); + overrides.setProperty("jclouds.mpu.parts.size", String.valueOf(partSize)); + return ContextBuilder.newBuilder("swift-keystone") + .credentials("user", "password").endpoint(uri) + .overrides(overrides) + .modules(modules) + .buildInjector().getInstance(SequentialMultipartUploadStrategy.class); + } + + /** + * there's no built-in way to defer evaluation of a response header, hence + * this method, which allows us to send back links to the mock server. + */ + private AtomicReference setURLReplacingDispatcher(MockWebServer server) { + final AtomicReference url = new AtomicReference(); + + final QueueDispatcher dispatcher = new QueueDispatcher() { + protected final BlockingQueue responseQueue = new LinkedBlockingQueue(); + + @Override + public MockResponse dispatch(RecordedRequest request) throws InterruptedException { + MockResponse response = responseQueue.take(); + if (response.getBody() != null) { + String newBody = new String(response.getBody()).replace("URL", url.get().toString()); + response = response.setBody(newBody); + } + return response; + } + + @Override + public void enqueueResponse(MockResponse response) { + responseQueue.add(response); + } + }; + server.setDispatcher(dispatcher); + return url; + } +}