Parallel upload for BaseBlobStore

This commit is contained in:
Zack Shoylev 2016-07-07 16:39:04 -05:00
parent a515ce2f22
commit 42079e1392
2 changed files with 62 additions and 19 deletions

View File

@ -608,7 +608,6 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
}
}
// copied from BaseBlobStore
@Beta
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();

View File

@ -18,17 +18,20 @@ package org.jclouds.blobstore.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import static org.jclouds.util.Predicates2.retry;
import java.io.InputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
@ -57,10 +60,13 @@ import org.jclouds.io.PayloadSlicer;
import org.jclouds.util.Closeables2;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
public abstract class BaseBlobStore implements BlobStore {
@ -96,7 +102,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes
* {@link #list(String,org.jclouds.blobstore.options.ListContainerOptions)}
*
*
* @param container
* container name
*/
@ -107,7 +113,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes {@link BlobUtilsImpl#directoryExists}
*
*
* @param container
* container name
* @param directory
@ -120,7 +126,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes {@link BlobUtilsImpl#createDirectory}
*
*
* @param container
* container name
* @param directory
@ -141,7 +147,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes {@link #countBlobs} with the
* {@link ListContainerOptions#recursive} option.
*
*
* @param container
* container name
*/
@ -152,7 +158,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes {@link BlobUtilsImpl#countBlobs}
*
*
* @param container
* container name
*/
@ -164,7 +170,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes {@link #clearContainer} with the
* {@link ListContainerOptions#recursive} option.
*
*
* @param container
* container name
*/
@ -175,7 +181,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes {@link BlobUtilsImpl#clearContainer}
*
*
* @param container
* container name
*/
@ -186,7 +192,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes {@link BlobUtilsImpl#deleteDirectory}.
*
*
* @param container
* container name
*/
@ -198,7 +204,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes
* {@link #getBlob(String,String,org.jclouds.blobstore.options.GetOptions)}
*
*
* @param container
* container name
* @param key
@ -211,7 +217,7 @@ public abstract class BaseBlobStore implements BlobStore {
/**
* This implementation invokes {@link #deleteAndEnsurePathGone}
*
*
* @param container
* bucket name
*/
@ -320,29 +326,67 @@ public abstract class BaseBlobStore implements BlobStore {
}
}
// TODO: parallel uploads
@com.google.inject.Inject
@Named(PROPERTY_USER_THREADS)
@VisibleForTesting
ListeningExecutorService userExecutor;
/**
* Upload using a user-provided executor, or the jclouds userExecutor
*
* @param container
* @param blob
* @param overrides
* @return the multipart blob etag
*/
@Beta
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides) {
if (overrides.getUseCustomExecutor()) {
return putMultipartBlob(container, blob, overrides, overrides.getCustomExecutor());
} else {
return putMultipartBlob(container, blob, overrides, userExecutor);
}
}
@Beta
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();
MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), overrides);
try {
List<MultipartPart> parts = Lists.newArrayList();
long contentLength = blob.getMetadata().getContentMetadata().getContentLength();
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(
getMinimumMultipartPartSize(), getMaximumMultipartPartSize(), getMaximumNumberOfParts());
long partSize = algorithm.calculateChunkSize(contentLength);
int partNumber = 1;
for (Payload payload : slicer.slice(blob.getPayload(), partSize)) {
MultipartPart part = uploadMultipartPart(mpu, partNumber, payload);
parts.add(part);
++partNumber;
BlobUploader b =
new BlobUploader(mpu, partNumber++, payload);
parts.add(executor.submit(b));
}
return completeMultipartUpload(mpu, parts);
return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts)));
} catch (RuntimeException re) {
abortMultipartUpload(mpu);
throw re;
}
}
private final class BlobUploader implements Callable<MultipartPart> {
private final MultipartUpload mpu;
private final int partNumber;
private final Payload payload;
BlobUploader(MultipartUpload mpu, int partNumber, Payload payload) {
this.mpu = mpu;
this.partNumber = partNumber;
this.payload = payload;
}
@Override
public MultipartPart call() {
return uploadMultipartPart(mpu, partNumber, payload);
}
}
private static HttpResponseException returnResponseException(int code) {
HttpResponse response = HttpResponse.builder().statusCode(code).build();
// TODO: bogus endpoint