From 807d078c6f96cf98d8adaddc777ad6a872de603e Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Fri, 6 Apr 2012 17:26:19 +0400 Subject: [PATCH] First working implementation of swift multipart upload. Async client TDB. --- .../swift/CommonSwiftAsyncClient.java | 16 +- .../openstack/swift/CommonSwiftClient.java | 2 + .../swift/blobstore/SwiftAsyncBlobStore.java | 9 +- .../swift/blobstore/SwiftBlobStore.java | 2 +- .../MultipartUploadSlicingAlgorithm.java | 152 ++++++++++++++++++ .../internal/MultipartUploadStrategy.java | 3 +- .../SequentialMultipartUploadStrategy.java | 65 ++++++-- .../swift/internal/StubSwiftAsyncClient.java | 13 +- .../java/org/jclouds/aws/s3/AWSS3Client.java | 1 + 9 files changed, 242 insertions(+), 21 deletions(-) create mode 100644 apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftAsyncClient.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftAsyncClient.java index 2fff5ae24e..71aae2d781 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftAsyncClient.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftAsyncClient.java @@ -52,14 +52,7 @@ import org.jclouds.openstack.swift.functions.ParseObjectInfoFromHeaders; import org.jclouds.openstack.swift.functions.ParseObjectInfoListFromJsonResponse; import org.jclouds.openstack.swift.functions.ReturnTrueOn404FalseOn409; import org.jclouds.openstack.swift.options.ListContainerOptions; -import org.jclouds.rest.annotations.BinderParam; -import org.jclouds.rest.annotations.Endpoint; -import org.jclouds.rest.annotations.ExceptionParser; -import org.jclouds.rest.annotations.ParamParser; -import org.jclouds.rest.annotations.QueryParams; -import org.jclouds.rest.annotations.RequestFilters; -import org.jclouds.rest.annotations.ResponseParser; -import org.jclouds.rest.annotations.SkipEncoding; +import org.jclouds.rest.annotations.*; import org.jclouds.rest.functions.ReturnVoidOnNotFoundOr404; import com.google.common.util.concurrent.ListenableFuture; @@ -183,4 +176,11 @@ public interface CommonSwiftAsyncClient { @Path("/{container}/{name}") ListenableFuture removeObject(@PathParam("container") String container, @PathParam("name") String name); + @PUT + @Path("/{container}/{name}") + @ResponseParser(ParseETagHeader.class) + @Headers(keys = "X-Object-Manifest", values="{container}/{name}") + ListenableFuture putObjectManifest(@PathParam("container") String container, + @PathParam("name") String name); + } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftClient.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftClient.java index a8b5210a32..5ad663bacf 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftClient.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftClient.java @@ -26,6 +26,7 @@ import org.jclouds.blobstore.ContainerNotFoundException; import org.jclouds.blobstore.domain.PageSet; import org.jclouds.concurrent.Timeout; import org.jclouds.http.options.GetOptions; +import org.jclouds.io.Payload; import org.jclouds.openstack.swift.domain.AccountMetadata; import org.jclouds.openstack.swift.domain.ContainerMetadata; import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata; @@ -113,4 +114,5 @@ public interface CommonSwiftClient { */ boolean objectExists(String container, String name); + String putObjectManifest(String container, String name); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java index b0873c9af6..e59b86f95e 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java @@ -55,6 +55,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList; import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; +import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy; import org.jclouds.openstack.swift.domain.ContainerMetadata; import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata; import org.jclouds.openstack.swift.domain.ObjectInfo; @@ -81,6 +82,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { private final ObjectToBlobMetadata object2BlobMd; private final BlobToHttpGetOptions blob2ObjectGetOptions; private final Provider fetchBlobMetadataProvider; + //private final Provider multipartUploadStrategy; @Inject protected SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, @@ -102,6 +104,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { this.object2BlobMd = object2BlobMd; this.blob2ObjectGetOptions = blob2ObjectGetOptions; this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider"); + //this.multipartUploadStrategy = multipartUploadStrategy; } /** @@ -239,7 +242,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture putBlob(String container, Blob blob, PutOptions options) { // TODO implement options - return putBlob(container, blob); + //if (options.isMultipart()) { + // return null; //Lis multipartUploadStrategy.get().execute(container, blob, options); + //} else { + return putBlob(container, blob); + //} } @Override diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index 6cc3329861..d19ab2ae44 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -212,7 +212,7 @@ public class SwiftBlobStore extends BaseBlobStore { public String putBlob(String container, Blob blob, PutOptions options) { // TODO implement options if (options.isMultipart()) { - return multipartUploadStrategy.get().execute(container, blob, options); + return multipartUploadStrategy.get().execute(container, blob, options, blob2Object); } else { return putBlob(container, blob); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java new file mode 100644 index 0000000000..26c5979dea --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java @@ -0,0 +1,152 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * MultipartUploadSlicingAlgorithm.java + * + * + * Created by: tibor + * + * History + * + */ + +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import org.jclouds.blobstore.reference.BlobStoreConstants; +import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload; + +import javax.annotation.Resource; +import javax.inject.Named; + +public class MultipartUploadSlicingAlgorithm { + + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + + @VisibleForTesting + static final long DEFAULT_PART_SIZE = 33554432; // 32MB + + @VisibleForTesting + static final int DEFAULT_MAGNITUDE_BASE = 100; + + @Inject(optional = true) + @Named("jclouds.mpu.parts.size") + @VisibleForTesting + long defaultPartSize = DEFAULT_PART_SIZE; + + @Inject(optional = true) + @Named("jclouds.mpu.parts.magnitude") + @VisibleForTesting + int magnitudeBase = DEFAULT_MAGNITUDE_BASE; + + // calculated only once, but not from the constructor + private volatile int parts; // required number of parts with chunkSize + private volatile long chunkSize; + private volatile long remaining; // number of bytes remained for the last part + + // sequentially updated values + private volatile int part; + private volatile long chunkOffset; + private volatile long copied; + + @VisibleForTesting + protected long calculateChunkSize(long length) { + long unitPartSize = defaultPartSize; // first try with default part size + int parts = (int)(length / unitPartSize); + long partSize = unitPartSize; + int magnitude = (int) (parts / magnitudeBase); + if (magnitude > 0) { + partSize = magnitude * unitPartSize; + if (partSize > MultipartUpload.MAX_PART_SIZE) { + partSize = MultipartUpload.MAX_PART_SIZE; + unitPartSize = MultipartUpload.MAX_PART_SIZE; + } + parts = (int)(length / partSize); + if (parts * partSize < length) { + partSize = (magnitude + 1) * unitPartSize; + if (partSize > MultipartUpload.MAX_PART_SIZE) { + partSize = MultipartUpload.MAX_PART_SIZE; + unitPartSize = MultipartUpload.MAX_PART_SIZE; + } + parts = (int)(length / partSize); + } + } + if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if splits in too many parts or + // cannot be split + unitPartSize = MultipartUpload.MIN_PART_SIZE; // take the minimum part size + parts = (int)(length / unitPartSize); + } + if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if still splits in too many parts + parts = MultipartUpload.MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not + // covering + } + long remainder = length % unitPartSize; + if (remainder == 0 && parts > 0) { + parts -= 1; + } + this.chunkSize = partSize; + this.parts = parts; + this.remaining = length - partSize * parts; + logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize, + remaining, (remaining > MultipartUpload.MAX_PART_SIZE ? " overflow!" : "")); + return this.chunkSize; + } + + public long getCopied() { + return copied; + } + + public void setCopied(long copied) { + this.copied = copied; + } + + @VisibleForTesting + protected int getParts() { + return parts; + } + + protected int getNextPart() { + return ++part; + } + + protected void addCopied(long copied) { + this.copied += copied; + } + + protected long getNextChunkOffset() { + long next = chunkOffset; + chunkOffset += getChunkSize(); + return next; + } + + @VisibleForTesting + protected long getChunkSize() { + return chunkSize; + } + + @VisibleForTesting + protected long getRemaining() { + return remaining; + } + +} diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java index 40a79ad45a..c536a8e8c2 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java @@ -3,10 +3,11 @@ package org.jclouds.openstack.swift.blobstore.strategy.internal; import com.google.inject.ImplementedBy; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload; @ImplementedBy(SequentialMultipartUploadStrategy.class) public interface MultipartUploadStrategy extends MultipartUpload { - String execute(String container, Blob blob, PutOptions options); + String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index 693cee51be..e11bd41ce3 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -3,44 +3,91 @@ package org.jclouds.openstack.swift.blobstore.strategy.internal; import javax.annotation.Resource; import javax.inject.Named; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.PutOptions; import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.SwiftClient; import org.jclouds.openstack.swift.blobstore.SwiftBlobStore; +import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; +import org.jclouds.openstack.swift.domain.SwiftObject; +import org.jclouds.util.Throwables2; + +import java.util.SortedMap; import static com.google.common.base.Preconditions.checkNotNull; public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { - @Resource - @Named(BlobStoreConstants.BLOBSTORE_LOGGER) - protected Logger logger = Logger.NULL; + public static final String PART_SEPARATOR = "/"; - protected final SwiftBlobStore ablobstore; - protected final PayloadSlicer slicer; + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + protected final SwiftBlobStore ablobstore; + protected final PayloadSlicer slicer; + + @Inject public SequentialMultipartUploadStrategy(SwiftBlobStore ablobstore, PayloadSlicer slicer) { this.ablobstore = checkNotNull(ablobstore, "ablobstore"); this.slicer = checkNotNull(slicer, "slicer"); } @Override - public String execute(String container, Blob blob, PutOptions options) { + public String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object) { + System.out.println("here we go"); String key = blob.getMetadata().getName(); Payload payload = blob.getPayload(); - /*MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); + MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); algorithm .calculateChunkSize(checkNotNull( payload.getContentMetadata().getContentLength(), - "contentLength required on all uploads to amazon s3; please invoke payload.getContentMetadata().setContentLength(length) first")); + "contentLength required on all uploads to swift; please invoke payload.getContentMetadata().setContentLength(length) first")); int parts = algorithm.getParts(); long chunkSize = algorithm.getChunkSize(); if (parts > 0) { + SwiftClient client = (SwiftClient) ablobstore.getContext() + .getProviderSpecificContext().getApi(); - } */ + try { + SortedMap etags = Maps.newTreeMap(); + int part; + while ((part = algorithm.getNextPart()) <= parts) { + System.out.println("Uploading part " + part); + Payload chunkedPart = slicer.slice(payload, + algorithm.getNextChunkOffset(), chunkSize); + Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + + String.valueOf(part)).payload(chunkedPart).contentDisposition( + blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build(); + client.putObject(container, blob2Object.apply(blobPart)); + } + long remaining = algorithm.getRemaining(); + if (remaining > 0) { + System.out.println("Uploading tail."); + Payload chunkedPart = slicer.slice(payload, + algorithm.getNextChunkOffset(), remaining); + Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + + String.valueOf(part)).payload(chunkedPart).contentDisposition( + blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build(); + client.putObject(container, blob2Object.apply(blobPart)); + } + return client.putObjectManifest(container, key); + } catch (Exception ex) { + RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); + if (rtex == null) { + rtex = new RuntimeException(ex); + } + //client.abortMultipartUpload(container, key, uploadId); + throw rtex; + } + + } return "NOT IMPLEMENTED"; } } diff --git a/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java b/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java index 01ccfe458b..9a3b189990 100644 --- a/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java +++ b/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService; import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; +import javax.ws.rs.PathParam; import org.jclouds.Constants; import org.jclouds.blobstore.TransientAsyncBlobStore; @@ -167,7 +168,12 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient { return blobStore.removeBlob(container, key); } - public ListenableFuture setObjectInfo(String container, String key, Map userMetadata) { + @Override + public ListenableFuture putObjectManifest(String container, String name) { + return null; + } + + public ListenableFuture setObjectInfo(String container, String key, Map userMetadata) { throw new UnsupportedOperationException(); } @@ -179,6 +185,11 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient { return objectProvider.create(null); } + + /*public String putObjectManifest(String container, String name) { + return "stub"; + } */ + @Override public ListenableFuture objectExists(String bucketName, String key) { return immediateFuture(containerToBlobs.get(bucketName).containsKey(key)); diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java index ea1ae6548c..5426847372 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java @@ -56,6 +56,7 @@ public interface AWSS3Client extends S3Client { */ String initiateMultipartUpload(String bucketName, ObjectMetadata objectMetadata, PutObjectOptions... options); + /** * This operation aborts a multipart upload. After a multipart upload is aborted, no additional * parts can be uploaded using that upload ID. The storage consumed by any previously uploaded