From 413538c45326e0fd323e896b1f93c7693eeec44f Mon Sep 17 00:00:00 2001 From: adriancole Date: Mon, 8 Apr 2013 18:03:37 -0700 Subject: [PATCH] refactored aws-s3 MPU test --- providers/aws-s3/pom.xml | 5 + .../java/org/jclouds/aws/s3/AWSS3Client.java | 1 - .../aws/s3/blobstore/AWSS3BlobStore.java | 2 +- .../strategy/MultipartUploadStrategy.java | 3 +- .../SequentialMultipartUploadStrategy.java | 124 ++++++----- ...entialMultipartUploadStrategyMockTest.java | 146 +++++++++++++ ...SequentialMultipartUploadStrategyTest.java | 193 ------------------ 7 files changed, 212 insertions(+), 262 deletions(-) create mode 100644 providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java delete mode 100644 providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java diff --git a/providers/aws-s3/pom.xml b/providers/aws-s3/pom.xml index 004a0725e6..e75bab6cc2 100644 --- a/providers/aws-s3/pom.xml +++ b/providers/aws-s3/pom.xml @@ -111,6 +111,11 @@ ${project.version} test + + com.google.mockwebserver + mockwebserver + test + diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java index 4072a2ed50..b9d8974661 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java @@ -19,7 +19,6 @@ package org.jclouds.aws.s3; import java.util.Map; -import java.util.Set; import org.jclouds.aws.s3.domain.DeleteResult; import org.jclouds.io.Payload; import org.jclouds.s3.S3Client; diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java index 3c610d5ebe..077d1c4fcf 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/AWSS3BlobStore.java @@ -88,7 +88,7 @@ public class AWSS3BlobStore extends S3BlobStore { public String putBlob(String container, Blob blob, PutOptions options) { if (options.isMultipart()) { // need to use a provider if the strategy object is stateful - return multipartUploadStrategy.get().execute(container, blob, options); + return multipartUploadStrategy.get().execute(container, blob); } else if ((options instanceof AWSS3PutOptions) && (((AWSS3PutOptions) options).getStorageClass() == REDUCED_REDUNDANCY)) { diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java index a511ffb7ae..40c4ec5ebb 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/MultipartUploadStrategy.java @@ -20,7 +20,6 @@ package org.jclouds.aws.s3.blobstore.strategy; import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.PutOptions; import com.google.inject.ImplementedBy; @@ -32,5 +31,5 @@ import com.google.inject.ImplementedBy; @ImplementedBy(SequentialMultipartUploadStrategy.class) public interface MultipartUploadStrategy extends MultipartUpload { - String execute(String container, Blob blob, PutOptions options); + String execute(String container, Blob blob); } diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index 9c8fc72edf..834dcc4b36 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -25,19 +25,16 @@ import java.util.SortedMap; import javax.annotation.Resource; import javax.inject.Named; -import org.jclouds.aws.s3.AWSS3ApiMetadata; import org.jclouds.aws.s3.AWSS3Client; -import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy; import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.PutOptions; import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; +import org.jclouds.s3.blobstore.functions.BlobToObject; import org.jclouds.s3.domain.ObjectMetadataBuilder; -import org.jclouds.util.Throwables2; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -47,86 +44,83 @@ import com.google.inject.Inject; * * The file partitioning algorithm: * - * The default partition size we choose is 32mb. A multiple of this default partition size is used. - * The number of parts first grows to a chosen magnitude (for example 100 parts), then it grows the - * partition size instead of number of partitions. When we reached the maximum part size, then again - * it starts to grow the number of partitions. + * The default partition size we choose is 32mb. A multiple of this default + * partition size is used. The number of partCount first grows to a chosen magnitude + * (for example 100 partCount), then it grows the partition size instead of number + * of partitions. When we reached the maximum part size, then again it starts to + * grow the number of partitions. * - * @author Tibor Kiss + * @author Tibor Kiss, Adrian Cole */ public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { @Resource @Named(BlobStoreConstants.BLOBSTORE_LOGGER) - protected Logger logger = Logger.NULL; + private Logger logger = Logger.NULL; - protected final AWSS3BlobStore ablobstore; - protected final PayloadSlicer slicer; + private final AWSS3Client client; + private final BlobToObject blobToObject; + private final MultipartUploadSlicingAlgorithm algorithm; + private final PayloadSlicer slicer; @Inject - public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) { - this.ablobstore = checkNotNull(ablobstore, "ablobstore"); + public SequentialMultipartUploadStrategy(AWSS3Client client, BlobToObject blobToObject, + MultipartUploadSlicingAlgorithm algorithm, PayloadSlicer slicer) { + this.client = checkNotNull(client, "client"); + this.blobToObject = checkNotNull(blobToObject, "blobToObject"); + this.algorithm = checkNotNull(algorithm, "algorithm"); this.slicer = checkNotNull(slicer, "slicer"); } - - protected void prepareUploadPart(String container, String key, String uploadId, int part, - Payload payload, long offset, long size, SortedMap etags) { - AWSS3Client client = ablobstore.getContext().unwrap(AWSS3ApiMetadata.CONTEXT_TOKEN).getApi(); + + @Override + public String execute(String container, Blob blob) { + String key = blob.getMetadata().getName(); + Payload payload = blob.getPayload(); + Long length = payload.getContentMetadata().getContentLength(); + checkNotNull(length, + "please invoke payload.getContentMetadata().setContentLength(length) prior to multipart upload"); + long chunkSize = algorithm.calculateChunkSize(length); + int partCount = algorithm.getParts(); + if (partCount > 0) { + String uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build()); + try { + SortedMap etags = Maps.newTreeMap(); + int part; + while ((part = algorithm.getNextPart()) <= partCount) { + prepareUploadPart(container, key, uploadId, part, payload, algorithm.getNextChunkOffset(), chunkSize, + etags); + } + long remaining = algorithm.getRemaining(); + if (remaining > 0) { + prepareUploadPart(container, key, uploadId, part, payload, algorithm.getNextChunkOffset(), remaining, + etags); + } + return client.completeMultipartUpload(container, key, uploadId, etags); + } catch (RuntimeException ex) { + client.abortMultipartUpload(container, key, uploadId); + throw ex; + } + } else { + // TODO: find a way to disable multipart. if we pass the original + // options, it goes into a stack overflow + return client.putObject(container, blobToObject.apply(blob)); + } + } + + private void prepareUploadPart(String container, String key, String uploadId, int part, Payload payload, + long offset, long size, SortedMap etags) { Payload chunkedPart = slicer.slice(payload, offset, size); String eTag = null; try { eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); etags.put(Integer.valueOf(part), eTag); } catch (KeyNotFoundException e) { - // note that because of eventual consistency, the upload id may not be present yet - // we may wish to add this condition to the retry handler + // note that because of eventual consistency, the upload id may not be + // present yet we may wish to add this condition to the retry handler - // we may also choose to implement ListParts and wait for the uploadId to become - // available there. + // we may also choose to implement ListParts and wait for the uploadId + // to become available there. eTag = client.uploadPart(container, key, part, uploadId, chunkedPart); etags.put(Integer.valueOf(part), eTag); } } - - @Override - public String execute(String container, Blob blob, PutOptions options) { - String key = blob.getMetadata().getName(); - Payload payload = blob.getPayload(); - MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); - algorithm - .calculateChunkSize(checkNotNull( - payload.getContentMetadata().getContentLength(), - "contentLength required on all uploads to amazon s3; please invoke payload.getContentMetadata().setContentLength(length) first")); - int parts = algorithm.getParts(); - long chunkSize = algorithm.getChunkSize(); - if (parts > 0) { - AWSS3Client client = ablobstore.getContext() - .unwrap(AWSS3ApiMetadata.CONTEXT_TOKEN).getApi(); - String uploadId = client.initiateMultipartUpload(container, - ObjectMetadataBuilder.create().key(key).build()); // TODO md5 - try { - SortedMap etags = Maps.newTreeMap(); - int part; - while ((part = algorithm.getNextPart()) <= parts) { - prepareUploadPart(container, key, uploadId, part, payload, - algorithm.getNextChunkOffset(), chunkSize, etags); - } - long remaining = algorithm.getRemaining(); - if (remaining > 0) { - prepareUploadPart(container, key, uploadId, part, payload, - algorithm.getNextChunkOffset(), remaining, etags); - } - return client.completeMultipartUpload(container, key, uploadId, etags); - } catch (Exception ex) { - RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); - if (rtex == null) { - rtex = new RuntimeException(ex); - } - client.abortMultipartUpload(container, key, uploadId); - throw rtex; - } - } else { - // TODO: find a way to disable multipart. if we pass the original options, it goes into a stack overflow - return ablobstore.putBlob(container, blob, PutOptions.NONE); - } - } } diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java new file mode 100644 index 0000000000..249d9e97da --- /dev/null +++ b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.aws.s3.blobstore.strategy.internal; + +import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; +import static org.jclouds.Constants.PROPERTY_MAX_RETRIES; +import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT; +import static org.jclouds.s3.reference.S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS; +import static org.testng.Assert.assertEquals; + +import java.io.IOException; +import java.util.Properties; +import java.util.Set; + +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.domain.internal.BlobBuilderImpl; +import org.jclouds.concurrent.config.ExecutorServiceModule; +import org.jclouds.http.HttpResponseException; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Module; +import com.google.mockwebserver.MockResponse; +import com.google.mockwebserver.MockWebServer; +import com.google.mockwebserver.RecordedRequest; + +/** + * + * @author Adrian Cole + */ +@Test(singleThreaded = true) +public class SequentialMultipartUploadStrategyMockTest { + + @Test + public void testMPUDoesMultipart() throws IOException, InterruptedException { + MockWebServer server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(200).setBody("upload-id")); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "a00")); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "b00")); + server.enqueue(new MockResponse().setResponseCode(200).setBody("fff")); + server.play(); + + byte[] bytes = "0123456789abcdef".getBytes(Charsets.US_ASCII); + int partSize = bytes.length / 2; + + SequentialMultipartUploadStrategy api = mockSequentialMultipartUploadStrategy(server.getUrl("/").toString(), + partSize); + + try { + assertEquals(api.execute("container", new BlobBuilderImpl().name("foo").payload(bytes).build()), "fff"); + } finally { + + RecordedRequest initiate = server.takeRequest(); + assertEquals(initiate.getRequestLine(), "POST /container/foo?uploads HTTP/1.1"); + assertEquals(initiate.getHeader("Content-Length"), "0"); + + RecordedRequest part1 = server.takeRequest(); + assertEquals(part1.getRequestLine(), "PUT /container/foo?partNumber=1&uploadId=upload-id HTTP/1.1"); + assertEquals(part1.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part1.getBody()), "01234567"); + + RecordedRequest part2 = server.takeRequest(); + assertEquals(part2.getRequestLine(), "PUT /container/foo?partNumber=2&uploadId=upload-id HTTP/1.1"); + assertEquals(part2.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part2.getBody()), "89abcdef"); + + RecordedRequest manifest = server.takeRequest(); + assertEquals(manifest.getRequestLine(), "POST /container/foo?uploadId=upload-id HTTP/1.1"); + assertEquals(manifest.getHeader("Content-Length"), "161"); + assertEquals( + new String(manifest.getBody()), + "1a002b00"); + + server.shutdown(); + } + } + + @Test(expectedExceptions = HttpResponseException.class) + public void testMPUAbortsOnProblem() throws IOException, InterruptedException { + MockWebServer server = new MockWebServer(); + server.enqueue(new MockResponse().setResponseCode(200).setBody("upload-id")); + server.enqueue(new MockResponse().setResponseCode(400)); + server.enqueue(new MockResponse().setResponseCode(200)); + server.play(); + + byte[] bytes = "0123456789abcdef".getBytes(Charsets.US_ASCII); + int partSize = bytes.length / 2; + + SequentialMultipartUploadStrategy api = mockSequentialMultipartUploadStrategy(server.getUrl("/").toString(), + partSize); + + try { + assertEquals(api.execute("container", new BlobBuilderImpl().name("foo").payload(bytes).build()), "fff"); + } finally { + + RecordedRequest initiate = server.takeRequest(); + assertEquals(initiate.getRequestLine(), "POST /container/foo?uploads HTTP/1.1"); + assertEquals(initiate.getHeader("Content-Length"), "0"); + + RecordedRequest part1 = server.takeRequest(); + assertEquals(part1.getRequestLine(), "PUT /container/foo?partNumber=1&uploadId=upload-id HTTP/1.1"); + assertEquals(part1.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part1.getBody()), "01234567"); + + RecordedRequest abort = server.takeRequest(); + assertEquals(abort.getRequestLine(), "DELETE /container/foo?uploadId=upload-id HTTP/1.1"); + + server.shutdown(); + } + } + + private static final Set modules = ImmutableSet. of(new ExecutorServiceModule(sameThreadExecutor(), + sameThreadExecutor())); + + static SequentialMultipartUploadStrategy mockSequentialMultipartUploadStrategy(String uri, int partSize) { + Properties overrides = new Properties(); + overrides.setProperty(PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); + // prevent expect-100 bug http://code.google.com/p/mockwebserver/issues/detail?id=6 + overrides.setProperty(PROPERTY_SO_TIMEOUT, "0"); + overrides.setProperty(PROPERTY_MAX_RETRIES, "1"); + overrides.setProperty("jclouds.mpu.parts.size", String.valueOf(partSize)); + return ContextBuilder.newBuilder("aws-s3") + .credentials("accessKey", "secretKey") + .endpoint(uri) + .overrides(overrides) + .modules(modules) + .buildInjector().getInstance(SequentialMultipartUploadStrategy.class); + } +} diff --git a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java b/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java deleted file mode 100644 index 5303de59ea..0000000000 --- a/providers/aws-s3/src/test/java/org/jclouds/aws/s3/blobstore/strategy/internal/SequentialMultipartUploadStrategyTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to jclouds, Inc. (jclouds) under one or more - * contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. jclouds licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.jclouds.aws.s3.blobstore.strategy.internal; - -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; -import static org.testng.Assert.fail; - -import java.util.SortedMap; -import java.util.concurrent.TimeoutException; - -import org.jclouds.aws.s3.AWSS3ApiMetadata; -import org.jclouds.aws.s3.AWSS3AsyncClient; -import org.jclouds.aws.s3.AWSS3Client; -import org.jclouds.aws.s3.blobstore.AWSS3BlobStore; -import org.jclouds.blobstore.BlobStoreContext; -import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.domain.MutableBlobMetadata; -import org.jclouds.blobstore.options.PutOptions; -import org.jclouds.io.MutableContentMetadata; -import org.jclouds.io.Payload; -import org.jclouds.io.PayloadSlicer; -import org.jclouds.rest.RestContext; -import org.jclouds.rest.internal.RestContextImpl; -import org.jclouds.s3.domain.ObjectMetadata; -import org.jclouds.s3.domain.ObjectMetadataBuilder; -import org.jclouds.util.Throwables2; -import org.testng.annotations.Test; - -import com.google.common.collect.Maps; - -/** - * Tests behavior of {@code SequentialMultipartUploadStrategy} - * - * @author Tibor Kiss - */ -@Test(groups = "unit") -public class SequentialMultipartUploadStrategyTest { - - @SuppressWarnings("unchecked") - @Test - public void testWithTwoParts() { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - String container = "container"; - String key = "mpu-test"; - Blob blob = createMock(Blob.class); - MutableBlobMetadata blobMeta = createMock(MutableBlobMetadata.class); - Payload payload = createMock(Payload.class); - MutableContentMetadata contentMeta = createMock(MutableContentMetadata.class); - BlobStoreContext context = createMock(BlobStoreContext.class); - RestContext psc = createMock(RestContextImpl.class); - AWSS3Client client = createMock(AWSS3Client.class); - ObjectMetadata ometa = createMock(ObjectMetadata.class); - String uploadId = "uploadId"; - long chunkSize = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE; - long remaining = 100L; - SortedMap etags = Maps.newTreeMap(); - etags.put(Integer.valueOf(1), "eTag1"); - etags.put(Integer.valueOf(2), "eTag2"); - - expect(blob.getMetadata()).andReturn(blobMeta).atLeastOnce(); - expect(blobMeta.getName()).andReturn(key).atLeastOnce(); - expect(blob.getPayload()).andReturn(payload).atLeastOnce(); - expect(payload.getContentMetadata()).andReturn(contentMeta).atLeastOnce(); - expect(contentMeta.getContentLength()).andReturn(Long.valueOf(chunkSize + remaining)); - expect(ablobStore.getContext()).andReturn(context).atLeastOnce(); - expect(context.unwrap(AWSS3ApiMetadata.CONTEXT_TOKEN)).andReturn(psc).atLeastOnce(); - expect(psc.getApi()).andReturn(client).atLeastOnce(); - expect(client.initiateMultipartUpload(container, new ObjectMetadataBuilder().key(key).build())).andReturn("uploadId").atLeastOnce(); - expect(slicer.slice(payload, 0, chunkSize)).andReturn(payload).atLeastOnce(); - expect(client.uploadPart(container, key, 1, uploadId, payload)).andReturn("eTag1").atLeastOnce(); - expect(slicer.slice(payload, chunkSize, remaining)).andReturn(payload).atLeastOnce(); - expect(client.uploadPart(container, key, 2, uploadId, payload)).andReturn("eTag2").atLeastOnce(); - expect(client.completeMultipartUpload(container, key, uploadId, etags)).andReturn("eTag").atLeastOnce(); - - replay(ablobStore); - replay(slicer); - replay(blob); - replay(blobMeta); - replay(payload); - replay(contentMeta); - replay(context); - replay(psc); - replay(client); - replay(ometa); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); - strategy.execute(container, blob, PutOptions.NONE); - - verify(ablobStore); - verify(slicer); - verify(blob); - verify(blobMeta); - verify(payload); - verify(contentMeta); - verify(context); - verify(psc); - verify(client); - verify(ometa); - } - - @SuppressWarnings("unchecked") - @Test - public void testWithTimeout() { - AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class); - PayloadSlicer slicer = createMock(PayloadSlicer.class); - String container = "container"; - String key = "mpu-test"; - Blob blob = createMock(Blob.class); - MutableBlobMetadata blobMeta = createMock(MutableBlobMetadata.class); - Payload payload = createMock(Payload.class); - MutableContentMetadata contentMeta = createMock(MutableContentMetadata.class); - BlobStoreContext context = createMock(BlobStoreContext.class); - RestContext psc = createMock(RestContextImpl.class); - AWSS3Client client = createMock(AWSS3Client.class); - ObjectMetadata ometa = createMock(ObjectMetadata.class); - String uploadId = "uploadId"; - long chunkSize = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE; - long remaining = 100L; - SortedMap etags = Maps.newTreeMap(); - etags.put(Integer.valueOf(1), "eTag1"); - etags.put(Integer.valueOf(2), "eTag2"); - - expect(blob.getMetadata()).andReturn(blobMeta).atLeastOnce(); - expect(blobMeta.getName()).andReturn(key).atLeastOnce(); - expect(blob.getPayload()).andReturn(payload).atLeastOnce(); - expect(payload.getContentMetadata()).andReturn(contentMeta).atLeastOnce(); - expect(contentMeta.getContentLength()).andReturn(Long.valueOf(chunkSize + remaining)); - expect(ablobStore.getContext()).andReturn(context).atLeastOnce(); - expect(context.unwrap(AWSS3ApiMetadata.CONTEXT_TOKEN)).andReturn(psc).atLeastOnce(); - expect(psc.getApi()).andReturn(client).atLeastOnce(); - expect(client.initiateMultipartUpload(container, new ObjectMetadataBuilder().key(key).build())).andReturn("uploadId").atLeastOnce(); - expect(slicer.slice(payload, 0, chunkSize)).andReturn(payload).atLeastOnce(); - expect(client.uploadPart(container, key, 1, uploadId, payload)).andReturn("eTag1").atLeastOnce(); - expect(slicer.slice(payload, chunkSize, remaining)).andReturn(payload).atLeastOnce(); - expect(client.uploadPart(container, key, 2, uploadId, payload)).andThrow(new RuntimeException(new TimeoutException())); - client.abortMultipartUpload(container, key, uploadId); - expectLastCall().atLeastOnce(); - - replay(ablobStore); - replay(slicer); - replay(blob); - replay(blobMeta); - replay(payload); - replay(contentMeta); - replay(context); - replay(psc); - replay(client); - replay(ometa); - - SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer); - try { - strategy.execute(container, blob, PutOptions.NONE); - fail("Should throw RuntimeException with TimeoutException cause!"); - } catch (RuntimeException rtex) { - TimeoutException timeout = Throwables2.getFirstThrowableOfType(rtex, TimeoutException.class); - if (timeout == null) { - throw rtex; - } - } - - verify(ablobStore); - verify(slicer); - verify(blob); - verify(blobMeta); - verify(payload); - verify(contentMeta); - verify(context); - verify(psc); - verify(client); - verify(ometa); - } -}