diff --git a/apis/swift/pom.xml b/apis/swift/pom.xml index 1c627544db..e40702f3c1 100644 --- a/apis/swift/pom.xml +++ b/apis/swift/pom.xml @@ -110,6 +110,11 @@ log4j test + + com.google.mockwebserver + mockwebserver + test + diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index 2afc789227..0630c584ad 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -211,7 +211,7 @@ public class SwiftBlobStore extends BaseBlobStore { @Override public String putBlob(String container, Blob blob, PutOptions options) { if (options.isMultipart()) { - return multipartUploadStrategy.get().execute(container, blob, options, blob2Object); + return multipartUploadStrategy.get().execute(container, blob); } else { return putBlob(container, blob); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java index 9c2ead8253..d5aacdf1ca 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java @@ -1,3 +1,21 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jclouds.openstack.swift.blobstore.strategy; public interface MultipartUpload { diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java index c536a8e8c2..470308acb7 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java @@ -1,13 +1,30 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jclouds.openstack.swift.blobstore.strategy.internal; -import com.google.inject.ImplementedBy; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.PutOptions; -import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload; +import com.google.inject.ImplementedBy; + @ImplementedBy(SequentialMultipartUploadStrategy.class) public interface MultipartUploadStrategy extends MultipartUpload { - String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object); + String execute(String container, Blob blob); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index 9618e25aa3..19dd2f69b5 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -1,83 +1,94 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jclouds.openstack.swift.blobstore.strategy.internal; import static com.google.common.base.Preconditions.checkNotNull; import javax.annotation.Resource; import javax.inject.Named; +import javax.inject.Provider; import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.blobstore.domain.BlobBuilder; import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; import org.jclouds.openstack.swift.CommonSwiftClient; -import org.jclouds.openstack.swift.SwiftApiMetadata; -import org.jclouds.openstack.swift.blobstore.SwiftBlobStore; import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; -import org.jclouds.util.Throwables2; import com.google.inject.Inject; - public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { - public static final String PART_SEPARATOR = "/"; + private static final String PART_SEPARATOR = "/"; - @Resource - @Named(BlobStoreConstants.BLOBSTORE_LOGGER) - protected Logger logger = Logger.NULL; + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + private Logger logger = Logger.NULL; - protected final SwiftBlobStore ablobstore; - protected final PayloadSlicer slicer; - - @Inject - public SequentialMultipartUploadStrategy(SwiftBlobStore ablobstore, PayloadSlicer slicer) { - this.ablobstore = checkNotNull(ablobstore, "ablobstore"); - this.slicer = checkNotNull(slicer, "slicer"); - } + private final CommonSwiftClient client; + private final Provider blobBuilders; + private final BlobToObject blob2Object; + private final MultipartUploadSlicingAlgorithm algorithm; + private final PayloadSlicer slicer; - @Override - public String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object) { - String key = blob.getMetadata().getName(); - Payload payload = blob.getPayload(); - MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); - algorithm - .calculateChunkSize(checkNotNull( - payload.getContentMetadata().getContentLength(), - "contentLength required on all uploads to swift; please invoke payload.getContentMetadata().setContentLength(length) first")); - int parts = algorithm.getParts(); - long chunkSize = algorithm.getChunkSize(); - if (parts > 0) { - CommonSwiftClient client = ablobstore.getContext().unwrap(SwiftApiMetadata.CONTEXT_TOKEN).getApi(); - try { - int part; - while ((part = algorithm.getNextPart()) <= parts) { - Payload chunkedPart = slicer.slice(payload, - algorithm.getNextChunkOffset(), chunkSize); - Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + - String.valueOf(part)).payload(chunkedPart).contentDisposition( - blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build(); - client.putObject(container, blob2Object.apply(blobPart)); - } - long remaining = algorithm.getRemaining(); - if (remaining > 0) { - Payload chunkedPart = slicer.slice(payload, - algorithm.getNextChunkOffset(), remaining); - Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + - String.valueOf(part)).payload(chunkedPart).contentDisposition( - blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build(); - client.putObject(container, blob2Object.apply(blobPart)); - } - return client.putObjectManifest(container, key); - } catch (Exception ex) { - RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); - if (rtex == null) { - rtex = new RuntimeException(ex); - } - throw rtex; - } - } else { - return ablobstore.putBlob(container, blob, PutOptions.NONE); - } - } + @Inject + public SequentialMultipartUploadStrategy(CommonSwiftClient client, Provider blobBuilders, + BlobToObject blob2Object, MultipartUploadSlicingAlgorithm algorithm, PayloadSlicer slicer) { + this.client = checkNotNull(client, "client"); + this.blobBuilders = checkNotNull(blobBuilders, "blobBuilders"); + this.blob2Object = checkNotNull(blob2Object, "blob2Object"); + this.algorithm = checkNotNull(algorithm, "algorithm"); + this.slicer = checkNotNull(slicer, "slicer"); + } + + @Override + public String execute(String container, Blob blob) { + String key = blob.getMetadata().getName(); + Payload payload = blob.getPayload(); + Long length = payload.getContentMetadata().getContentLength(); + checkNotNull(length, + "please invoke payload.getContentMetadata().setContentLength(length) prior to multipart upload"); + long chunkSize = algorithm.calculateChunkSize(length); + int partCount = algorithm.getParts(); + if (partCount > 0) { + int part; + while ((part = algorithm.getNextPart()) <= partCount) { + Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), chunkSize); + Blob blobPart = blobBuilders.get() + .name(key + PART_SEPARATOR + part) + .payload(chunkedPart) + .contentDisposition(key + PART_SEPARATOR + part).build(); + client.putObject(container, blob2Object.apply(blobPart)); + } + long remaining = algorithm.getRemaining(); + if (remaining > 0) { + Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), remaining); + Blob blobPart = blobBuilders.get() + .name(key + PART_SEPARATOR + part) + .payload(chunkedPart) + .contentDisposition(key + PART_SEPARATOR + part).build(); + client.putObject(container, blob2Object.apply(blobPart)); + } + return client.putObjectManifest(container, key); + } else { + return client.putObject(container, blob2Object.apply(blob)); + } + } } diff --git a/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java b/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java new file mode 100644 index 0000000000..dfa06ce804 --- /dev/null +++ b/apis/swift/src/test/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategyMockTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; +import static org.jclouds.Constants.PROPERTY_MAX_RETRIES; +import static org.jclouds.Constants.PROPERTY_SO_TIMEOUT; +import static org.testng.Assert.assertEquals; + +import java.io.IOException; +import java.net.URL; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.domain.internal.BlobBuilderImpl; +import org.jclouds.concurrent.config.ExecutorServiceModule; +import org.jclouds.openstack.keystone.v2_0.internal.KeystoneFixture; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Module; +import com.google.mockwebserver.MockResponse; +import com.google.mockwebserver.MockWebServer; +import com.google.mockwebserver.QueueDispatcher; +import com.google.mockwebserver.RecordedRequest; + +/** + * + * @author Adrian Cole + */ +@Test(singleThreaded = true) +public class SequentialMultipartUploadStrategyMockTest { + + String authRequestBody = KeystoneFixture.INSTANCE.initialAuthWithUsernameAndPassword("user", "password") + .getPayload().getRawContent().toString(); + String authResponse = KeystoneFixture.INSTANCE.responseWithAccess().getPayload().getRawContent().toString() + .replace("https://objects.jclouds.org/v1.0/40806637803162", "URL"); + String token = "Auth_4f173437e4b013bee56d1007"; + + @Test + public void testMPUDoesMultipart() throws IOException, InterruptedException { + MockWebServer server = new MockWebServer(); + AtomicReference url = setURLReplacingDispatcher(server); + server.enqueue(new MockResponse().setResponseCode(200).setBody(authResponse)); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "a00")); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "b00")); + server.enqueue(new MockResponse().setResponseCode(200).addHeader("ETag", "fff")); + server.play(); + url.set(server.getUrl("/")); + + byte[] bytes = "0123456789abcdef".getBytes(Charsets.US_ASCII); + int partSize = bytes.length / 2; + SequentialMultipartUploadStrategy api = mockSequentialMultipartUploadStrategy(url.get().toString(), partSize); + + try { + assertEquals(api.execute("container", new BlobBuilderImpl().name("foo").payload(bytes).build()), "fff"); + } finally { + RecordedRequest authRequest = server.takeRequest(); + assertEquals(authRequest.getRequestLine(), "POST /tokens HTTP/1.1"); + assertEquals(new String(authRequest.getBody()), authRequestBody); + assertEquals(authRequest.getHeader("Content-Length"), String.valueOf(authRequestBody.length())); + + RecordedRequest part1 = server.takeRequest(); + assertEquals(part1.getRequestLine(), "PUT /container/foo/1 HTTP/1.1"); + assertEquals(part1.getHeader("X-Auth-Token"), token); + assertEquals(part1.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part1.getBody()), "01234567"); + + RecordedRequest part2 = server.takeRequest(); + assertEquals(part2.getRequestLine(), "PUT /container/foo/2 HTTP/1.1"); + assertEquals(part2.getHeader("X-Auth-Token"), token); + assertEquals(part2.getHeader("Content-Length"), String.valueOf(partSize)); + assertEquals(new String(part2.getBody()), "89abcdef"); + + RecordedRequest manifest = server.takeRequest(); + assertEquals(manifest.getRequestLine(), "PUT /container/foo HTTP/1.1"); + assertEquals(manifest.getHeader("X-Auth-Token"), token); + assertEquals(manifest.getHeader("Content-Length"), "0"); + + server.shutdown(); + } + } + + private static final Set modules = ImmutableSet. of(new ExecutorServiceModule(sameThreadExecutor(), + sameThreadExecutor())); + + static SequentialMultipartUploadStrategy mockSequentialMultipartUploadStrategy(String uri, int partSize) { + Properties overrides = new Properties(); + // prevent expect-100 bug http://code.google.com/p/mockwebserver/issues/detail?id=6 + overrides.setProperty(PROPERTY_SO_TIMEOUT, "0"); + overrides.setProperty(PROPERTY_MAX_RETRIES, "1"); + overrides.setProperty("jclouds.mpu.parts.size", String.valueOf(partSize)); + return ContextBuilder.newBuilder("swift-keystone") + .credentials("user", "password").endpoint(uri) + .overrides(overrides) + .modules(modules) + .buildInjector().getInstance(SequentialMultipartUploadStrategy.class); + } + + /** + * there's no built-in way to defer evaluation of a response header, hence + * this method, which allows us to send back links to the mock server. + */ + private AtomicReference setURLReplacingDispatcher(MockWebServer server) { + final AtomicReference url = new AtomicReference(); + + final QueueDispatcher dispatcher = new QueueDispatcher() { + protected final BlockingQueue responseQueue = new LinkedBlockingQueue(); + + @Override + public MockResponse dispatch(RecordedRequest request) throws InterruptedException { + MockResponse response = responseQueue.take(); + if (response.getBody() != null) { + String newBody = new String(response.getBody()).replace("URL", url.get().toString()); + response = response.setBody(newBody); + } + return response; + } + + @Override + public void enqueueResponse(MockResponse response) { + responseQueue.add(response); + } + }; + server.setDispatcher(dispatcher); + return url; + } +}