From 42079e1392fb5b2b792f518812689854c375445f Mon Sep 17 00:00:00 2001 From: Zack Shoylev Date: Thu, 7 Jul 2016 16:39:04 -0500 Subject: [PATCH] Parallel upload for BaseBlobStore --- .../blobstore/RegionScopedSwiftBlobStore.java | 1 - .../blobstore/internal/BaseBlobStore.java | 80 ++++++++++++++----- 2 files changed, 62 insertions(+), 19 deletions(-) diff --git a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java index 63f4315b92..0b29f1f288 100644 --- a/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java +++ b/apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java @@ -608,7 +608,6 @@ public class RegionScopedSwiftBlobStore implements BlobStore { } } - // copied from BaseBlobStore @Beta protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) { ArrayList> parts = new ArrayList>(); diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java index 818a154174..21ae2ff943 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobStore.java @@ -18,17 +18,20 @@ package org.jclouds.blobstore.internal; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.jclouds.Constants.PROPERTY_USER_THREADS; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import static org.jclouds.util.Predicates2.retry; import java.io.InputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Date; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import javax.inject.Inject; +import javax.inject.Named; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; @@ -57,10 +60,13 @@ import org.jclouds.io.PayloadSlicer; import org.jclouds.util.Closeables2; import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; public abstract class BaseBlobStore implements BlobStore { @@ -96,7 +102,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes * {@link #list(String,org.jclouds.blobstore.options.ListContainerOptions)} - * + * * @param container * container name */ @@ -107,7 +113,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes {@link BlobUtilsImpl#directoryExists} - * + * * @param container * container name * @param directory @@ -120,7 +126,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes {@link BlobUtilsImpl#createDirectory} - * + * * @param container * container name * @param directory @@ -141,7 +147,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes {@link #countBlobs} with the * {@link ListContainerOptions#recursive} option. - * + * * @param container * container name */ @@ -152,7 +158,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes {@link BlobUtilsImpl#countBlobs} - * + * * @param container * container name */ @@ -164,7 +170,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes {@link #clearContainer} with the * {@link ListContainerOptions#recursive} option. - * + * * @param container * container name */ @@ -175,7 +181,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes {@link BlobUtilsImpl#clearContainer} - * + * * @param container * container name */ @@ -186,7 +192,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes {@link BlobUtilsImpl#deleteDirectory}. - * + * * @param container * container name */ @@ -198,7 +204,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes * {@link #getBlob(String,String,org.jclouds.blobstore.options.GetOptions)} - * + * * @param container * container name * @param key @@ -211,7 +217,7 @@ public abstract class BaseBlobStore implements BlobStore { /** * This implementation invokes {@link #deleteAndEnsurePathGone} - * + * * @param container * bucket name */ @@ -320,29 +326,67 @@ public abstract class BaseBlobStore implements BlobStore { } } - // TODO: parallel uploads + @com.google.inject.Inject + @Named(PROPERTY_USER_THREADS) + @VisibleForTesting + ListeningExecutorService userExecutor; + + /** + * Upload using a user-provided executor, or the jclouds userExecutor + * + * @param container + * @param blob + * @param overrides + * @return the multipart blob etag + */ @Beta protected String putMultipartBlob(String container, Blob blob, PutOptions overrides) { + if (overrides.getUseCustomExecutor()) { + return putMultipartBlob(container, blob, overrides, overrides.getCustomExecutor()); + } else { + return putMultipartBlob(container, blob, overrides, userExecutor); + } + } + + @Beta + protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) { + ArrayList> parts = new ArrayList>(); MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), overrides); try { - List parts = Lists.newArrayList(); long contentLength = blob.getMetadata().getContentMetadata().getContentLength(); MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm( getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts()); long partSize = algorithm.calculateChunkSize(contentLength); int partNumber = 1; for (Payload payload : slicer.slice(blob.getPayload(), partSize)) { - MultipartPart part = uploadMultipartPart(mpu, partNumber, payload); - parts.add(part); - ++partNumber; + BlobUploader b = + new BlobUploader(mpu, partNumber++, payload); + parts.add(executor.submit(b)); } - return completeMultipartUpload(mpu, parts); + return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts))); } catch (RuntimeException re) { abortMultipartUpload(mpu); throw re; } } + private final class BlobUploader implements Callable { + private final MultipartUpload mpu; + private final int partNumber; + private final Payload payload; + + BlobUploader(MultipartUpload mpu, int partNumber, Payload payload) { + this.mpu = mpu; + this.partNumber = partNumber; + this.payload = payload; + } + + @Override + public MultipartPart call() { + return uploadMultipartPart(mpu, partNumber, payload); + } + } + private static HttpResponseException returnResponseException(int code) { HttpResponse response = HttpResponse.builder().statusCode(code).build(); // TODO: bogus endpoint