From 1d5c0ed5ebded9c4d69a4f05967fa4b960665c6a Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Thu, 29 Mar 2012 10:22:46 +0400 Subject: [PATCH 1/8] Stubs for multipart upload support in swift. --- .../blobstore/CloudFilesBlobStore.java | 3 +- .../swift/blobstore/SwiftBlobStore.java | 12 ++++- .../blobstore/strategy/MultipartUpload.java | 21 +++++++++ .../internal/MultipartUploadStrategy.java | 12 +++++ .../SequentialMultipartUploadStrategy.java | 46 +++++++++++++++++++ .../HPCloudObjectStorageBlobStore.java | 2 +- 6 files changed, 92 insertions(+), 4 deletions(-) create mode 100644 apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java create mode 100644 apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java create mode 100644 apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java diff --git a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java index 42be6d884c..40ba250abd 100644 --- a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java +++ b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java @@ -42,6 +42,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; import com.google.common.base.Supplier; +import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy; /** * @@ -62,7 +63,7 @@ public class CloudFilesBlobStore extends SwiftBlobStore { Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions, - fetchBlobMetadataProvider); + fetchBlobMetadataProvider, null); this.enableCDNAndCache = enableCDNAndCache; } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index e96909ba18..6cc3329861 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -50,6 +50,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList; import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; +import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy; import org.jclouds.openstack.swift.domain.ContainerMetadata; import com.google.common.base.Function; @@ -71,6 +72,7 @@ public class SwiftBlobStore extends BaseBlobStore { private final ObjectToBlobMetadata object2BlobMd; private final BlobToHttpGetOptions blob2ObjectGetOptions; private final Provider fetchBlobMetadataProvider; + private final Provider multipartUploadStrategy; @Inject protected SwiftBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier defaultLocation, @@ -79,7 +81,8 @@ public class SwiftBlobStore extends BaseBlobStore { BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider) { + Provider fetchBlobMetadataProvider, + Provider multipartUploadStrategy) { super(context, blobUtils, defaultLocation, locations); this.sync = sync; this.container2ResourceMd = container2ResourceMd; @@ -90,6 +93,7 @@ public class SwiftBlobStore extends BaseBlobStore { this.object2BlobMd = object2BlobMd; this.blob2ObjectGetOptions = blob2ObjectGetOptions; this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider"); + this.multipartUploadStrategy = multipartUploadStrategy; } /** @@ -207,7 +211,11 @@ public class SwiftBlobStore extends BaseBlobStore { @Override public String putBlob(String container, Blob blob, PutOptions options) { // TODO implement options - return putBlob(container, blob); + if (options.isMultipart()) { + return multipartUploadStrategy.get().execute(container, blob, options); + } else { + return putBlob(container, blob); + } } /** diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java new file mode 100644 index 0000000000..839f4774e2 --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java @@ -0,0 +1,21 @@ +package org.jclouds.openstack.swift.blobstore.strategy; + +/* +@author Roman Bogorodskiy + */ + +public interface MultipartUpload { + + /* Maximum number of parts per upload */ + public static final int MAX_NUMBER_OF_PARTS = 10000; + /* Maximum number of parts returned for a list parts request */ + public static final int MAX_LIST_PARTS_RETURNED = 1000; + /* Maximum number of multipart uploads returned in a list multipart uploads request */ + public static final int MAX_LIST_MPU_RETURNED = 1000; + + /* + * part size 5 MB to 5 GB, last part can be < 5 MB + */ + public static final long MIN_PART_SIZE = 5242880L; + public static final long MAX_PART_SIZE = 5368709120L; +} diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java new file mode 100644 index 0000000000..40a79ad45a --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java @@ -0,0 +1,12 @@ +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import com.google.inject.ImplementedBy; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload; + +@ImplementedBy(SequentialMultipartUploadStrategy.class) +public interface MultipartUploadStrategy extends MultipartUpload { + + String execute(String container, Blob blob, PutOptions options); +} diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java new file mode 100644 index 0000000000..693cee51be --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -0,0 +1,46 @@ +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import javax.annotation.Resource; +import javax.inject.Named; + +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.blobstore.reference.BlobStoreConstants; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.blobstore.SwiftBlobStore; + +import static com.google.common.base.Preconditions.checkNotNull; + + +public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + + protected final SwiftBlobStore ablobstore; + protected final PayloadSlicer slicer; + + public SequentialMultipartUploadStrategy(SwiftBlobStore ablobstore, PayloadSlicer slicer) { + this.ablobstore = checkNotNull(ablobstore, "ablobstore"); + this.slicer = checkNotNull(slicer, "slicer"); + } + + @Override + public String execute(String container, Blob blob, PutOptions options) { + String key = blob.getMetadata().getName(); + Payload payload = blob.getPayload(); + /*MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); + algorithm + .calculateChunkSize(checkNotNull( + payload.getContentMetadata().getContentLength(), + "contentLength required on all uploads to amazon s3; please invoke payload.getContentMetadata().setContentLength(length) first")); + int parts = algorithm.getParts(); + long chunkSize = algorithm.getChunkSize(); + if (parts > 0) { + + } */ + return "NOT IMPLEMENTED"; + } +} diff --git a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageBlobStore.java b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageBlobStore.java index e05119ed8c..cd11feb720 100644 --- a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageBlobStore.java +++ b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageBlobStore.java @@ -62,7 +62,7 @@ public class HPCloudObjectStorageBlobStore extends SwiftBlobStore { Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions, - fetchBlobMetadataProvider); + fetchBlobMetadataProvider, null); this.enableCDNAndCache = enableCDNAndCache; } From 5f83e6af3d331821455c6e9b5d05ee84819ebdc3 Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Fri, 6 Apr 2012 17:26:19 +0400 Subject: [PATCH 2/8] First working implementation of swift multipart upload. Async client TDB. --- .../swift/CommonSwiftAsyncClient.java | 16 +- .../openstack/swift/CommonSwiftClient.java | 2 + .../swift/blobstore/SwiftAsyncBlobStore.java | 9 +- .../swift/blobstore/SwiftBlobStore.java | 2 +- .../MultipartUploadSlicingAlgorithm.java | 152 ++++++++++++++++++ .../internal/MultipartUploadStrategy.java | 3 +- .../SequentialMultipartUploadStrategy.java | 65 ++++++-- .../swift/internal/StubSwiftAsyncClient.java | 13 +- .../java/org/jclouds/aws/s3/AWSS3Client.java | 1 + 9 files changed, 242 insertions(+), 21 deletions(-) create mode 100644 apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftAsyncClient.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftAsyncClient.java index 2fff5ae24e..71aae2d781 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftAsyncClient.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftAsyncClient.java @@ -52,14 +52,7 @@ import org.jclouds.openstack.swift.functions.ParseObjectInfoFromHeaders; import org.jclouds.openstack.swift.functions.ParseObjectInfoListFromJsonResponse; import org.jclouds.openstack.swift.functions.ReturnTrueOn404FalseOn409; import org.jclouds.openstack.swift.options.ListContainerOptions; -import org.jclouds.rest.annotations.BinderParam; -import org.jclouds.rest.annotations.Endpoint; -import org.jclouds.rest.annotations.ExceptionParser; -import org.jclouds.rest.annotations.ParamParser; -import org.jclouds.rest.annotations.QueryParams; -import org.jclouds.rest.annotations.RequestFilters; -import org.jclouds.rest.annotations.ResponseParser; -import org.jclouds.rest.annotations.SkipEncoding; +import org.jclouds.rest.annotations.*; import org.jclouds.rest.functions.ReturnVoidOnNotFoundOr404; import com.google.common.util.concurrent.ListenableFuture; @@ -183,4 +176,11 @@ public interface CommonSwiftAsyncClient { @Path("/{container}/{name}") ListenableFuture removeObject(@PathParam("container") String container, @PathParam("name") String name); + @PUT + @Path("/{container}/{name}") + @ResponseParser(ParseETagHeader.class) + @Headers(keys = "X-Object-Manifest", values="{container}/{name}") + ListenableFuture putObjectManifest(@PathParam("container") String container, + @PathParam("name") String name); + } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftClient.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftClient.java index a8b5210a32..5ad663bacf 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftClient.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/CommonSwiftClient.java @@ -26,6 +26,7 @@ import org.jclouds.blobstore.ContainerNotFoundException; import org.jclouds.blobstore.domain.PageSet; import org.jclouds.concurrent.Timeout; import org.jclouds.http.options.GetOptions; +import org.jclouds.io.Payload; import org.jclouds.openstack.swift.domain.AccountMetadata; import org.jclouds.openstack.swift.domain.ContainerMetadata; import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata; @@ -113,4 +114,5 @@ public interface CommonSwiftClient { */ boolean objectExists(String container, String name); + String putObjectManifest(String container, String name); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java index b0873c9af6..e59b86f95e 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java @@ -55,6 +55,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList; import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; +import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy; import org.jclouds.openstack.swift.domain.ContainerMetadata; import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata; import org.jclouds.openstack.swift.domain.ObjectInfo; @@ -81,6 +82,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { private final ObjectToBlobMetadata object2BlobMd; private final BlobToHttpGetOptions blob2ObjectGetOptions; private final Provider fetchBlobMetadataProvider; + //private final Provider multipartUploadStrategy; @Inject protected SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, @@ -102,6 +104,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { this.object2BlobMd = object2BlobMd; this.blob2ObjectGetOptions = blob2ObjectGetOptions; this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider"); + //this.multipartUploadStrategy = multipartUploadStrategy; } /** @@ -239,7 +242,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture putBlob(String container, Blob blob, PutOptions options) { // TODO implement options - return putBlob(container, blob); + //if (options.isMultipart()) { + // return null; //Lis multipartUploadStrategy.get().execute(container, blob, options); + //} else { + return putBlob(container, blob); + //} } @Override diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index 6cc3329861..d19ab2ae44 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -212,7 +212,7 @@ public class SwiftBlobStore extends BaseBlobStore { public String putBlob(String container, Blob blob, PutOptions options) { // TODO implement options if (options.isMultipart()) { - return multipartUploadStrategy.get().execute(container, blob, options); + return multipartUploadStrategy.get().execute(container, blob, options, blob2Object); } else { return putBlob(container, blob); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java new file mode 100644 index 0000000000..26c5979dea --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadSlicingAlgorithm.java @@ -0,0 +1,152 @@ +/** + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * MultipartUploadSlicingAlgorithm.java + * + * + * Created by: tibor + * + * History + * + */ + +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import org.jclouds.blobstore.reference.BlobStoreConstants; +import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload; + +import javax.annotation.Resource; +import javax.inject.Named; + +public class MultipartUploadSlicingAlgorithm { + + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + + @VisibleForTesting + static final long DEFAULT_PART_SIZE = 33554432; // 32MB + + @VisibleForTesting + static final int DEFAULT_MAGNITUDE_BASE = 100; + + @Inject(optional = true) + @Named("jclouds.mpu.parts.size") + @VisibleForTesting + long defaultPartSize = DEFAULT_PART_SIZE; + + @Inject(optional = true) + @Named("jclouds.mpu.parts.magnitude") + @VisibleForTesting + int magnitudeBase = DEFAULT_MAGNITUDE_BASE; + + // calculated only once, but not from the constructor + private volatile int parts; // required number of parts with chunkSize + private volatile long chunkSize; + private volatile long remaining; // number of bytes remained for the last part + + // sequentially updated values + private volatile int part; + private volatile long chunkOffset; + private volatile long copied; + + @VisibleForTesting + protected long calculateChunkSize(long length) { + long unitPartSize = defaultPartSize; // first try with default part size + int parts = (int)(length / unitPartSize); + long partSize = unitPartSize; + int magnitude = (int) (parts / magnitudeBase); + if (magnitude > 0) { + partSize = magnitude * unitPartSize; + if (partSize > MultipartUpload.MAX_PART_SIZE) { + partSize = MultipartUpload.MAX_PART_SIZE; + unitPartSize = MultipartUpload.MAX_PART_SIZE; + } + parts = (int)(length / partSize); + if (parts * partSize < length) { + partSize = (magnitude + 1) * unitPartSize; + if (partSize > MultipartUpload.MAX_PART_SIZE) { + partSize = MultipartUpload.MAX_PART_SIZE; + unitPartSize = MultipartUpload.MAX_PART_SIZE; + } + parts = (int)(length / partSize); + } + } + if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if splits in too many parts or + // cannot be split + unitPartSize = MultipartUpload.MIN_PART_SIZE; // take the minimum part size + parts = (int)(length / unitPartSize); + } + if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if still splits in too many parts + parts = MultipartUpload.MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not + // covering + } + long remainder = length % unitPartSize; + if (remainder == 0 && parts > 0) { + parts -= 1; + } + this.chunkSize = partSize; + this.parts = parts; + this.remaining = length - partSize * parts; + logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize, + remaining, (remaining > MultipartUpload.MAX_PART_SIZE ? " overflow!" : "")); + return this.chunkSize; + } + + public long getCopied() { + return copied; + } + + public void setCopied(long copied) { + this.copied = copied; + } + + @VisibleForTesting + protected int getParts() { + return parts; + } + + protected int getNextPart() { + return ++part; + } + + protected void addCopied(long copied) { + this.copied += copied; + } + + protected long getNextChunkOffset() { + long next = chunkOffset; + chunkOffset += getChunkSize(); + return next; + } + + @VisibleForTesting + protected long getChunkSize() { + return chunkSize; + } + + @VisibleForTesting + protected long getRemaining() { + return remaining; + } + +} diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java index 40a79ad45a..c536a8e8c2 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/MultipartUploadStrategy.java @@ -3,10 +3,11 @@ package org.jclouds.openstack.swift.blobstore.strategy.internal; import com.google.inject.ImplementedBy; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload; @ImplementedBy(SequentialMultipartUploadStrategy.class) public interface MultipartUploadStrategy extends MultipartUpload { - String execute(String container, Blob blob, PutOptions options); + String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object); } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index 693cee51be..e11bd41ce3 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -3,44 +3,91 @@ package org.jclouds.openstack.swift.blobstore.strategy.internal; import javax.annotation.Resource; import javax.inject.Named; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.PutOptions; import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.SwiftClient; import org.jclouds.openstack.swift.blobstore.SwiftBlobStore; +import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; +import org.jclouds.openstack.swift.domain.SwiftObject; +import org.jclouds.util.Throwables2; + +import java.util.SortedMap; import static com.google.common.base.Preconditions.checkNotNull; public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy { - @Resource - @Named(BlobStoreConstants.BLOBSTORE_LOGGER) - protected Logger logger = Logger.NULL; + public static final String PART_SEPARATOR = "/"; - protected final SwiftBlobStore ablobstore; - protected final PayloadSlicer slicer; + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + protected final SwiftBlobStore ablobstore; + protected final PayloadSlicer slicer; + + @Inject public SequentialMultipartUploadStrategy(SwiftBlobStore ablobstore, PayloadSlicer slicer) { this.ablobstore = checkNotNull(ablobstore, "ablobstore"); this.slicer = checkNotNull(slicer, "slicer"); } @Override - public String execute(String container, Blob blob, PutOptions options) { + public String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object) { + System.out.println("here we go"); String key = blob.getMetadata().getName(); Payload payload = blob.getPayload(); - /*MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); + MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); algorithm .calculateChunkSize(checkNotNull( payload.getContentMetadata().getContentLength(), - "contentLength required on all uploads to amazon s3; please invoke payload.getContentMetadata().setContentLength(length) first")); + "contentLength required on all uploads to swift; please invoke payload.getContentMetadata().setContentLength(length) first")); int parts = algorithm.getParts(); long chunkSize = algorithm.getChunkSize(); if (parts > 0) { + SwiftClient client = (SwiftClient) ablobstore.getContext() + .getProviderSpecificContext().getApi(); - } */ + try { + SortedMap etags = Maps.newTreeMap(); + int part; + while ((part = algorithm.getNextPart()) <= parts) { + System.out.println("Uploading part " + part); + Payload chunkedPart = slicer.slice(payload, + algorithm.getNextChunkOffset(), chunkSize); + Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + + String.valueOf(part)).payload(chunkedPart).contentDisposition( + blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build(); + client.putObject(container, blob2Object.apply(blobPart)); + } + long remaining = algorithm.getRemaining(); + if (remaining > 0) { + System.out.println("Uploading tail."); + Payload chunkedPart = slicer.slice(payload, + algorithm.getNextChunkOffset(), remaining); + Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + + String.valueOf(part)).payload(chunkedPart).contentDisposition( + blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build(); + client.putObject(container, blob2Object.apply(blobPart)); + } + return client.putObjectManifest(container, key); + } catch (Exception ex) { + RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); + if (rtex == null) { + rtex = new RuntimeException(ex); + } + //client.abortMultipartUpload(container, key, uploadId); + throw rtex; + } + + } return "NOT IMPLEMENTED"; } } diff --git a/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java b/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java index 01ccfe458b..9a3b189990 100644 --- a/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java +++ b/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService; import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; +import javax.ws.rs.PathParam; import org.jclouds.Constants; import org.jclouds.blobstore.TransientAsyncBlobStore; @@ -167,7 +168,12 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient { return blobStore.removeBlob(container, key); } - public ListenableFuture setObjectInfo(String container, String key, Map userMetadata) { + @Override + public ListenableFuture putObjectManifest(String container, String name) { + return null; + } + + public ListenableFuture setObjectInfo(String container, String key, Map userMetadata) { throw new UnsupportedOperationException(); } @@ -179,6 +185,11 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient { return objectProvider.create(null); } + + /*public String putObjectManifest(String container, String name) { + return "stub"; + } */ + @Override public ListenableFuture objectExists(String bucketName, String key) { return immediateFuture(containerToBlobs.get(bucketName).containsKey(key)); diff --git a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java index ea1ae6548c..5426847372 100644 --- a/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java +++ b/providers/aws-s3/src/main/java/org/jclouds/aws/s3/AWSS3Client.java @@ -56,6 +56,7 @@ public interface AWSS3Client extends S3Client { */ String initiateMultipartUpload(String bucketName, ObjectMetadata objectMetadata, PutObjectOptions... options); + /** * This operation aborts a multipart upload. After a multipart upload is aborted, no additional * parts can be uploaded using that upload ID. The storage consumed by any previously uploaded From 13f6d98060db1c72c82159263400d823ad9bf15e Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Tue, 10 Apr 2012 14:07:26 +0400 Subject: [PATCH 3/8] Initial implementation of MPU for SwiftAsyncBlobStore. --- .../blobstore/CloudFilesAsyncBlobStore.java | 2 +- .../swift/blobstore/SwiftAsyncBlobStore.java | 17 +- .../swift/blobstore/SwiftBlobStore.java | 1 - .../AsyncMultipartUploadStrategy.java | 12 + .../ParallelMultipartUploadStrategy.java | 268 ++++++++++++++++++ .../HPCloudObjectStorageAsyncBlobStore.java | 2 +- 6 files changed, 291 insertions(+), 11 deletions(-) create mode 100644 apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/AsyncMultipartUploadStrategy.java create mode 100644 apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java diff --git a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java index 34b61b03de..68ad22f587 100644 --- a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java +++ b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java @@ -69,7 +69,7 @@ public class CloudFilesAsyncBlobStore extends SwiftAsyncBlobStore { Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, - blob2ObjectGetOptions, fetchBlobMetadataProvider); + blob2ObjectGetOptions, fetchBlobMetadataProvider, null); this.enableCDNAndCache = enableCDNAndCache; } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java index e59b86f95e..1ffd600bf0 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java @@ -55,6 +55,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList; import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; +import org.jclouds.openstack.swift.blobstore.strategy.internal.AsyncMultipartUploadStrategy; import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy; import org.jclouds.openstack.swift.domain.ContainerMetadata; import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata; @@ -82,7 +83,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { private final ObjectToBlobMetadata object2BlobMd; private final BlobToHttpGetOptions blob2ObjectGetOptions; private final Provider fetchBlobMetadataProvider; - //private final Provider multipartUploadStrategy; + private final Provider multipartUploadStrategy; @Inject protected SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, @@ -92,7 +93,8 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider) { + Provider fetchBlobMetadataProvider, + Provider multipartUploadStrategy) { super(context, blobUtils, service, defaultLocation, locations); this.sync = sync; this.async = async; @@ -104,7 +106,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { this.object2BlobMd = object2BlobMd; this.blob2ObjectGetOptions = blob2ObjectGetOptions; this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider"); - //this.multipartUploadStrategy = multipartUploadStrategy; + this.multipartUploadStrategy = multipartUploadStrategy; } /** @@ -241,12 +243,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture putBlob(String container, Blob blob, PutOptions options) { - // TODO implement options - //if (options.isMultipart()) { - // return null; //Lis multipartUploadStrategy.get().execute(container, blob, options); - //} else { + if (options.isMultipart()) { + return multipartUploadStrategy.get().execute(container, blob, options, blob2Object); + } else { return putBlob(container, blob); - //} + } } @Override diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index d19ab2ae44..2afc789227 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -210,7 +210,6 @@ public class SwiftBlobStore extends BaseBlobStore { */ @Override public String putBlob(String container, Blob blob, PutOptions options) { - // TODO implement options if (options.isMultipart()) { return multipartUploadStrategy.get().execute(container, blob, options, blob2Object); } else { diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/AsyncMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/AsyncMultipartUploadStrategy.java new file mode 100644 index 0000000000..017d1dac22 --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/AsyncMultipartUploadStrategy.java @@ -0,0 +1,12 @@ +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.ImplementedBy; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; + +@ImplementedBy(ParallelMultipartUploadStrategy.class) +public interface AsyncMultipartUploadStrategy { + ListenableFuture execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object); +} diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java new file mode 100644 index 0000000000..bdc86344c2 --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java @@ -0,0 +1,268 @@ +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import org.jclouds.Constants; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.internal.BlobRuntimeException; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.blobstore.reference.BlobStoreConstants; +import org.jclouds.concurrent.Futures; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.SwiftAsyncClient; +import org.jclouds.openstack.swift.SwiftClient; +import org.jclouds.openstack.swift.blobstore.SwiftAsyncBlobStore; +import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; +import org.jclouds.util.Throwables2; + +import javax.annotation.Resource; +import javax.inject.Named; +import java.util.Map; +import java.util.Queue; +import java.util.SortedMap; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStrategy { + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + + public static final String PART_SEPARATOR = "/"; + @VisibleForTesting + static final int DEFAULT_PARALLEL_DEGREE = 4; + @VisibleForTesting + static final int DEFAULT_MIN_RETRIES = 5; + @VisibleForTesting + static final int DEFAULT_MAX_PERCENT_RETRIES = 10; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.degree") + @VisibleForTesting + int parallelDegree = DEFAULT_PARALLEL_DEGREE; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.retries.min") + @VisibleForTesting + int minRetries = DEFAULT_MIN_RETRIES; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.retries.maxpercent") + @VisibleForTesting + int maxPercentRetries = DEFAULT_MAX_PERCENT_RETRIES; + + /** + * maximum duration of an blob Request + */ + @Inject(optional = true) + @Named(Constants.PROPERTY_REQUEST_TIMEOUT) + protected Long maxTime; + + private final ExecutorService ioWorkerExecutor; + + protected final SwiftAsyncBlobStore ablobstore; + protected final PayloadSlicer slicer; + + @Inject + public ParallelMultipartUploadStrategy(SwiftAsyncBlobStore ablobstore, PayloadSlicer slicer, + @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor) { + this.ablobstore = checkNotNull(ablobstore, "ablobstore"); + this.slicer = checkNotNull(slicer, "slicer"); + this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor"); + } + + + protected void prepareUploadPart(final String container, final Blob blob, final String key, + final Integer part, final Payload payload, + final long offset, final long size, final SortedMap etags, + final BlockingQueue activeParts, + final Map> futureParts, + final AtomicInteger errors, final int maxRetries, final Map errorMap, + final Queue toRetry, final CountDownLatch latch, + BlobToObject blob2Object) { + if (errors.get() > maxRetries) { + activeParts.remove(part); // remove part from the bounded-queue without blocking + latch.countDown(); + return; + } + final SwiftAsyncClient client = (SwiftAsyncClient) ablobstore.getContext() + .getProviderSpecificContext().getAsyncApi(); + Payload chunkedPart = slicer.slice(payload, offset, size); + logger.debug(String.format("async uploading part %s of %s to container %s", part, key, container)); + final long start = System.currentTimeMillis(); + String blobPartName = blob.getMetadata().getName() + PART_SEPARATOR + + String.valueOf(part); + + Blob blobPart = ablobstore.blobBuilder(blobPartName).payload(chunkedPart). + contentDisposition(blobPartName).build(); + final ListenableFuture futureETag = client.putObject(container, blob2Object.apply(blobPart)); + futureETag.addListener(new Runnable() { + @Override + public void run() { + try { + etags.put(part, futureETag.get()); + logger.debug(String.format("async uploaded part %s of %s to container %s in %sms", + part, key, container, (System.currentTimeMillis() - start))); + } catch (CancellationException e) { + errorMap.put(part, e); + String message = String.format("%s while uploading part %s - [%s,%s] to container %s with running since %dms", + e.getMessage(), part, offset, size, container, (System.currentTimeMillis() - start)); + logger.debug(message); + } catch (Exception e) { + errorMap.put(part, e); + String message = String.format("%s while uploading part %s - [%s,%s] to container %s running since %dms", + e.getMessage(), part, offset, size, container, (System.currentTimeMillis() - start)); + logger.error(message, e); + if (errors.incrementAndGet() <= maxRetries) + toRetry.add(new Part(part, offset, size)); + } finally { + activeParts.remove(part); // remove part from the bounded-queue without blocking + futureParts.remove(part); + latch.countDown(); + } + } + }, ioWorkerExecutor); + futureParts.put(part, futureETag); + } + + @Override + public ListenableFuture execute(final String container, final Blob blob, final PutOptions options, final BlobToObject blob2Object) { + return Futures.makeListenable( + ioWorkerExecutor.submit(new Callable() { + @Override + public String call() throws Exception { + String key = blob.getMetadata().getName(); + Payload payload = blob.getPayload(); + MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); + algorithm.calculateChunkSize(payload.getContentMetadata() + .getContentLength()); + int parts = algorithm.getParts(); + long chunkSize = algorithm.getChunkSize(); + long remaining = algorithm.getRemaining(); + if (parts > 0) { + SwiftClient client = (SwiftClient) ablobstore + .getContext().getProviderSpecificContext().getApi(); + final Map> futureParts = + new ConcurrentHashMap>(); + final Map errorMap = Maps.newHashMap(); + AtomicInteger errors = new AtomicInteger(0); + int maxRetries = Math.max(minRetries, parts * maxPercentRetries / 100); + int effectiveParts = remaining > 0 ? parts + 1 : parts; + try { + logger.debug(String.format("initiated multipart upload of %s to container %s" + + " consisting from %s part (possible max. retries: %d)", + key, container, effectiveParts, maxRetries)); + // we need a bounded-blocking queue to control the amount of parallel jobs + ArrayBlockingQueue activeParts = new ArrayBlockingQueue(parallelDegree); + Queue toRetry = new ConcurrentLinkedQueue(); + SortedMap etags = new ConcurrentSkipListMap(); + CountDownLatch latch = new CountDownLatch(effectiveParts); + int part; + while ((part = algorithm.getNextPart()) <= parts) { + Integer partKey = new Integer(part); + activeParts.put(partKey); + + prepareUploadPart(container, blob, key, partKey, payload, + algorithm.getNextChunkOffset(), chunkSize, etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch, + blob2Object); + } + if (remaining > 0) { + Integer partKey = new Integer(part); + activeParts.put(partKey); + prepareUploadPart(container, blob, key, partKey, payload, + algorithm.getNextChunkOffset(), remaining, etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch, + blob2Object); + } + latch.await(); + // handling retries + while (errors.get() <= maxRetries && toRetry.size() > 0) { + int atOnce = Math.min(Math.min(toRetry.size(), errors.get()), parallelDegree); + CountDownLatch retryLatch = new CountDownLatch(atOnce); + for (int i = 0; i < atOnce; i++) { + Part failedPart = toRetry.poll(); + Integer partKey = new Integer(failedPart.getPart()); + activeParts.put(partKey); + prepareUploadPart(container, blob, key, partKey, payload, + failedPart.getOffset(), failedPart.getSize(), etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, retryLatch, + blob2Object); + } + retryLatch.await(); + } + if (errors.get() > maxRetries) { + throw new BlobRuntimeException(String.format( + "Too many failed parts: %s while multipart upload of %s to container %s", + errors.get(), key, container)); + } + + String eTag = client.putObjectManifest(container, key); + logger.debug(String.format("multipart upload of %s to container %s" + + " succeffully finished with %s retries", key, container, errors.get())); + return eTag; + } catch (Exception ex) { + RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); + if (rtex == null) { + rtex = new RuntimeException(ex); + } + for (Map.Entry> entry : futureParts.entrySet()) { + entry.getValue().cancel(false); + } + /* + if (uploadId != null) { + client.abortMultipartUpload(container, key, uploadId); + } */ + throw rtex; + } + } else { + ListenableFuture futureETag = ablobstore.putBlob(container, blob, options); + return maxTime != null ? + futureETag.get(maxTime, TimeUnit.SECONDS) : futureETag.get(); + } + } + }), ioWorkerExecutor); + } + + class Part { + private int part; + private long offset; + private long size; + + Part(int part, long offset, long size) { + this.part = part; + this.offset = offset; + this.size = size; + } + + public int getPart() { + return part; + } + + public void setPart(int part) { + this.part = part; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public long getSize() { + return size; + } + + public void setSize(long size) { + this.size = size; + } + } +} diff --git a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java index ed305371de..fdaaa3ec4d 100644 --- a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java +++ b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java @@ -69,7 +69,7 @@ public class HPCloudObjectStorageAsyncBlobStore extends SwiftAsyncBlobStore { Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, - blob2ObjectGetOptions, fetchBlobMetadataProvider); + blob2ObjectGetOptions, fetchBlobMetadataProvider, null); this.enableCDNAndCache = enableCDNAndCache; } From 9d5f242e6c8f452e21afab929071da476578f626 Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Tue, 10 Apr 2012 14:28:24 +0400 Subject: [PATCH 4/8] Clean up commented out useless function. --- .../openstack/swift/internal/StubSwiftAsyncClient.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java b/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java index 9a3b189990..7178760509 100644 --- a/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java +++ b/apis/swift/src/test/java/org/jclouds/openstack/swift/internal/StubSwiftAsyncClient.java @@ -185,11 +185,6 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient { return objectProvider.create(null); } - - /*public String putObjectManifest(String container, String name) { - return "stub"; - } */ - @Override public ListenableFuture objectExists(String bucketName, String key) { return immediateFuture(containerToBlobs.get(bucketName).containsKey(key)); From 9e8bc44285b32650d07f94bfe7e83cabffc5861d Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Tue, 17 Apr 2012 15:21:26 +0400 Subject: [PATCH 5/8] Use CommonSwiftClient instead of SwiftClient in multipart code. --- .../cloudfiles/blobstore/CloudFilesAsyncBlobStore.java | 6 ++++-- .../strategy/internal/ParallelMultipartUploadStrategy.java | 3 ++- .../internal/SequentialMultipartUploadStrategy.java | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java index 68ad22f587..4d1cbb63e0 100644 --- a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java +++ b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java @@ -49,6 +49,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.util.concurrent.ListenableFuture; +import org.jclouds.openstack.swift.blobstore.strategy.internal.AsyncMultipartUploadStrategy; /** * @@ -66,10 +67,11 @@ public class CloudFilesAsyncBlobStore extends SwiftAsyncBlobStore { BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { + Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache, + Provider multipartUploadStrategy) { super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, - blob2ObjectGetOptions, fetchBlobMetadataProvider, null); + blob2ObjectGetOptions, fetchBlobMetadataProvider, multipartUploadStrategy); this.enableCDNAndCache = enableCDNAndCache; } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java index bdc86344c2..95e4fc8983 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java @@ -13,6 +13,7 @@ import org.jclouds.concurrent.Futures; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.CommonSwiftAsyncClient; import org.jclouds.openstack.swift.SwiftAsyncClient; import org.jclouds.openstack.swift.SwiftClient; import org.jclouds.openstack.swift.blobstore.SwiftAsyncBlobStore; @@ -91,7 +92,7 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra latch.countDown(); return; } - final SwiftAsyncClient client = (SwiftAsyncClient) ablobstore.getContext() + final CommonSwiftAsyncClient client = (CommonSwiftAsyncClient) ablobstore.getContext() .getProviderSpecificContext().getAsyncApi(); Payload chunkedPart = slicer.slice(payload, offset, size); logger.debug(String.format("async uploading part %s of %s to container %s", part, key, container)); diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index e11bd41ce3..d77012661b 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -12,6 +12,7 @@ import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.CommonSwiftClient; import org.jclouds.openstack.swift.SwiftClient; import org.jclouds.openstack.swift.blobstore.SwiftBlobStore; import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; @@ -52,7 +53,7 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg int parts = algorithm.getParts(); long chunkSize = algorithm.getChunkSize(); if (parts > 0) { - SwiftClient client = (SwiftClient) ablobstore.getContext() + CommonSwiftClient client = (CommonSwiftClient) ablobstore.getContext() .getProviderSpecificContext().getApi(); try { From 9d76fd1aa4c5103a7b95021bd9cabaf18c9dfcc2 Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Tue, 17 Apr 2012 18:22:34 +0400 Subject: [PATCH 6/8] Enable multipart for HPCloud. --- .../jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java | 5 +++-- .../strategy/internal/ParallelMultipartUploadStrategy.java | 5 ++--- .../blobstore/HPCloudObjectStorageAsyncBlobStore.java | 6 ++++-- .../blobstore/HPCloudObjectStorageBlobStore.java | 6 ++++-- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java index 40ba250abd..c0d8f2d099 100644 --- a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java +++ b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesBlobStore.java @@ -60,10 +60,11 @@ public class CloudFilesBlobStore extends SwiftBlobStore { BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { + Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache, + Provider multipartUploadStrategy) { super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions, - fetchBlobMetadataProvider, null); + fetchBlobMetadataProvider, multipartUploadStrategy); this.enableCDNAndCache = enableCDNAndCache; } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java index 95e4fc8983..d4331caaa5 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java @@ -14,8 +14,7 @@ import org.jclouds.io.Payload; import org.jclouds.io.PayloadSlicer; import org.jclouds.logging.Logger; import org.jclouds.openstack.swift.CommonSwiftAsyncClient; -import org.jclouds.openstack.swift.SwiftAsyncClient; -import org.jclouds.openstack.swift.SwiftClient; +import org.jclouds.openstack.swift.CommonSwiftClient; import org.jclouds.openstack.swift.blobstore.SwiftAsyncBlobStore; import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; import org.jclouds.util.Throwables2; @@ -147,7 +146,7 @@ public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStra long chunkSize = algorithm.getChunkSize(); long remaining = algorithm.getRemaining(); if (parts > 0) { - SwiftClient client = (SwiftClient) ablobstore + CommonSwiftClient client = (CommonSwiftClient) ablobstore .getContext().getProviderSpecificContext().getApi(); final Map> futureParts = new ConcurrentHashMap>(); diff --git a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java index fdaaa3ec4d..847eb686ea 100644 --- a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java +++ b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java @@ -45,6 +45,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList; import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; +import org.jclouds.openstack.swift.blobstore.strategy.internal.AsyncMultipartUploadStrategy; import com.google.common.base.Function; import com.google.common.base.Supplier; @@ -66,10 +67,11 @@ public class HPCloudObjectStorageAsyncBlobStore extends SwiftAsyncBlobStore { BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { + Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache, + Provider multipartUploadStrategy) { super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, - blob2ObjectGetOptions, fetchBlobMetadataProvider, null); + blob2ObjectGetOptions, fetchBlobMetadataProvider, multipartUploadStrategy); this.enableCDNAndCache = enableCDNAndCache; } diff --git a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageBlobStore.java b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageBlobStore.java index cd11feb720..365cb294d0 100644 --- a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageBlobStore.java +++ b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageBlobStore.java @@ -40,6 +40,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList; import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; +import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy; import com.google.common.base.Supplier; @@ -59,10 +60,11 @@ public class HPCloudObjectStorageBlobStore extends SwiftBlobStore { BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { + Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache, + Provider multipartUploadStrategy) { super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions, - fetchBlobMetadataProvider, null); + fetchBlobMetadataProvider, multipartUploadStrategy); this.enableCDNAndCache = enableCDNAndCache; } From 1534e4ff37fca523af3cbf1b06b3153000857c8a Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Wed, 18 Apr 2012 12:41:09 +0400 Subject: [PATCH 7/8] Remove deub prints and fall back to traditional upload if file is not large enough. --- .../internal/SequentialMultipartUploadStrategy.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java index d77012661b..c723a51742 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/SequentialMultipartUploadStrategy.java @@ -42,7 +42,6 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg @Override public String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object) { - System.out.println("here we go"); String key = blob.getMetadata().getName(); Payload payload = blob.getPayload(); MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); @@ -57,10 +56,8 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg .getProviderSpecificContext().getApi(); try { - SortedMap etags = Maps.newTreeMap(); int part; while ((part = algorithm.getNextPart()) <= parts) { - System.out.println("Uploading part " + part); Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), chunkSize); Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + @@ -70,7 +67,6 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg } long remaining = algorithm.getRemaining(); if (remaining > 0) { - System.out.println("Uploading tail."); Payload chunkedPart = slicer.slice(payload, algorithm.getNextChunkOffset(), remaining); Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR + @@ -84,11 +80,10 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg if (rtex == null) { rtex = new RuntimeException(ex); } - //client.abortMultipartUpload(container, key, uploadId); throw rtex; } - + } else { + return ablobstore.putBlob(container, blob, PutOptions.NONE); } - return "NOT IMPLEMENTED"; } } From 7d39b056ba1364d644d2d2d24bbd2ddd86b47ed2 Mon Sep 17 00:00:00 2001 From: Roman Bogorodskiy Date: Wed, 18 Apr 2012 14:28:47 +0400 Subject: [PATCH 8/8] Remove useless comment. --- .../openstack/swift/blobstore/strategy/MultipartUpload.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java index 839f4774e2..9c2ead8253 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/MultipartUpload.java @@ -1,9 +1,5 @@ package org.jclouds.openstack.swift.blobstore.strategy; -/* -@author Roman Bogorodskiy - */ - public interface MultipartUpload { /* Maximum number of parts per upload */