mirror of https://github.com/apache/jclouds.git
Changes the upload behavior to parallel, a TODO
This commit is contained in:
parent
984b6ae8fb
commit
6bff97b6d3
|
@ -20,15 +20,19 @@ import static com.google.common.base.Preconditions.checkArgument;
|
|||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.collect.Iterables.tryFind;
|
||||
import static com.google.common.collect.Lists.transform;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
|
||||
import static org.jclouds.location.predicates.LocationPredicates.idEquals;
|
||||
import static org.jclouds.openstack.swift.v1.options.PutOptions.Builder.metadata;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
|
||||
import org.jclouds.blobstore.BlobStore;
|
||||
import org.jclouds.blobstore.BlobStoreContext;
|
||||
|
@ -76,6 +80,7 @@ import org.jclouds.openstack.swift.v1.options.UpdateContainerOptions;
|
|||
import org.jclouds.openstack.swift.v1.reference.SwiftHeaders;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Supplier;
|
||||
|
@ -89,10 +94,12 @@ import com.google.common.collect.ImmutableMap.Builder;
|
|||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.ByteSource;
|
||||
import com.google.common.net.HttpHeaders;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
@ -579,10 +586,33 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
// copied from BaseBlobStore
|
||||
@com.google.inject.Inject
|
||||
@Named(PROPERTY_USER_THREADS)
|
||||
@VisibleForTesting
|
||||
ListeningExecutorService userExecutor;
|
||||
|
||||
/**
|
||||
* Upload using a user-provided executor, or the jclouds userExecutor
|
||||
*
|
||||
* @param container
|
||||
* @param blob
|
||||
* @param overrides
|
||||
* @return the multipart blob etag
|
||||
*/
|
||||
@Beta
|
||||
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides) {
|
||||
List<MultipartPart> parts = Lists.newArrayList();
|
||||
if (overrides.getUseCustomExecutor()) {
|
||||
return putMultipartBlob(container, blob, overrides, overrides.getCustomExecutor());
|
||||
} else {
|
||||
return putMultipartBlob(container, blob, overrides, userExecutor);
|
||||
}
|
||||
}
|
||||
|
||||
// copied from BaseBlobStore
|
||||
@Beta
|
||||
protected String putMultipartBlob(String container, Blob blob, PutOptions overrides, ListeningExecutorService executor) {
|
||||
ArrayList<ListenableFuture<MultipartPart>> parts = new ArrayList<ListenableFuture<MultipartPart>>();
|
||||
|
||||
long contentLength = checkNotNull(blob.getMetadata().getContentMetadata().getContentLength(),
|
||||
"must provide content-length to use multi-part upload");
|
||||
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(
|
||||
|
@ -590,11 +620,30 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
|
|||
long partSize = algorithm.calculateChunkSize(contentLength);
|
||||
MultipartUpload mpu = initiateMultipartUpload(container, blob.getMetadata(), partSize, overrides);
|
||||
int partNumber = 1;
|
||||
|
||||
for (Payload payload : slicer.slice(blob.getPayload(), partSize)) {
|
||||
MultipartPart part = uploadMultipartPart(mpu, partNumber, payload);
|
||||
parts.add(part);
|
||||
++partNumber;
|
||||
BlobUploader b =
|
||||
new BlobUploader(mpu, partNumber++, payload);
|
||||
parts.add(executor.submit(b));
|
||||
}
|
||||
|
||||
return completeMultipartUpload(mpu, Futures.getUnchecked(Futures.allAsList(parts)));
|
||||
}
|
||||
|
||||
private final class BlobUploader implements Callable<MultipartPart> {
|
||||
private final MultipartUpload mpu;
|
||||
private final int partNumber;
|
||||
private final Payload payload;
|
||||
|
||||
BlobUploader(MultipartUpload mpu, int partNumber, Payload payload) {
|
||||
this.mpu = mpu;
|
||||
this.partNumber = partNumber;
|
||||
this.payload = payload;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultipartPart call() {
|
||||
return uploadMultipartPart(mpu, partNumber, payload);
|
||||
}
|
||||
return completeMultipartUpload(mpu, parts);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
|
||||
import org.jclouds.blobstore.domain.BlobAccess;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
|
||||
/**
|
||||
* Contains options supported in the put blob operation. <h2>
|
||||
* Usage</h2> The recommended way to instantiate a PutOptions object is to statically import
|
||||
|
@ -36,6 +40,10 @@ public class PutOptions implements Cloneable {
|
|||
|
||||
private BlobAccess blobAccess = BlobAccess.PRIVATE;
|
||||
private boolean multipart = false;
|
||||
private boolean useCustomExecutor = false;
|
||||
|
||||
// TODO: This exposes ListeningExecutorService to the user, instead of a regular ExecutorService
|
||||
private ListeningExecutorService customExecutor = MoreExecutors.sameThreadExecutor();
|
||||
|
||||
public PutOptions() {
|
||||
}
|
||||
|
@ -44,6 +52,25 @@ public class PutOptions implements Cloneable {
|
|||
this.multipart = multipart;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for clone
|
||||
* @param multipart
|
||||
* @param customExecutor
|
||||
*/
|
||||
protected PutOptions(boolean multipart, boolean useCustomExecutor, ListeningExecutorService customExecutor) {
|
||||
Preconditions.checkNotNull(customExecutor);
|
||||
this.multipart = multipart;
|
||||
this.useCustomExecutor = useCustomExecutor;
|
||||
this.customExecutor = customExecutor;
|
||||
}
|
||||
|
||||
public PutOptions(ListeningExecutorService customExecutor) {
|
||||
Preconditions.checkNotNull(customExecutor);
|
||||
this.multipart = true;
|
||||
this.useCustomExecutor = true;
|
||||
this.customExecutor = customExecutor;
|
||||
}
|
||||
|
||||
public static class ImmutablePutOptions extends PutOptions {
|
||||
private final PutOptions delegate;
|
||||
|
||||
|
@ -51,6 +78,16 @@ public class PutOptions implements Cloneable {
|
|||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListeningExecutorService getCustomExecutor() {
|
||||
return delegate.getCustomExecutor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutOptions setCustomExecutor(ListeningExecutorService customExecutor) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlobAccess getBlobAccess() {
|
||||
return delegate.getBlobAccess();
|
||||
|
@ -87,6 +124,22 @@ public class PutOptions implements Cloneable {
|
|||
return blobAccess;
|
||||
}
|
||||
|
||||
public boolean getUseCustomExecutor() {
|
||||
return useCustomExecutor;
|
||||
}
|
||||
|
||||
public ListeningExecutorService getCustomExecutor() {
|
||||
return customExecutor;
|
||||
}
|
||||
|
||||
public PutOptions setCustomExecutor(ListeningExecutorService customExecutor) {
|
||||
Preconditions.checkNotNull(customExecutor);
|
||||
this.multipart = true;
|
||||
this.useCustomExecutor = true;
|
||||
this.customExecutor = customExecutor;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PutOptions setBlobAccess(BlobAccess blobAccess) {
|
||||
this.blobAccess = checkNotNull(blobAccess);
|
||||
return this;
|
||||
|
@ -98,7 +151,7 @@ public class PutOptions implements Cloneable {
|
|||
|
||||
/**
|
||||
* split large blobs into pieces, if supported by the provider.
|
||||
*
|
||||
*
|
||||
* Equivalent to <code>multipart(true)</code>
|
||||
*/
|
||||
public PutOptions multipart() {
|
||||
|
@ -113,12 +166,25 @@ public class PutOptions implements Cloneable {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether to split large blobs into pieces, if supported by the provider, using a custom executor
|
||||
*
|
||||
* @param customExecutor User-provided ListeningExecutorService
|
||||
*/
|
||||
public PutOptions multipart(ListeningExecutorService customExecutor) {
|
||||
Preconditions.checkNotNull(customExecutor);
|
||||
this.multipart = true;
|
||||
this.useCustomExecutor = true;
|
||||
this.customExecutor = customExecutor;
|
||||
return this;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
public static PutOptions fromPutOptions(PutOptions putOptions) {
|
||||
return multipart(putOptions.multipart);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @see PutOptions#multipart()
|
||||
*/
|
||||
|
@ -130,15 +196,23 @@ public class PutOptions implements Cloneable {
|
|||
PutOptions options = new PutOptions();
|
||||
return options.multipart(val);
|
||||
}
|
||||
|
||||
public static PutOptions multipart(ListeningExecutorService customExecutor) {
|
||||
PutOptions options = new PutOptions();
|
||||
return options.multipart(customExecutor);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutOptions clone() {
|
||||
return new PutOptions(multipart);
|
||||
return new PutOptions(multipart, useCustomExecutor, customExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[multipart=" + multipart + ", blobAccess=" + blobAccess + "]";
|
||||
return "[multipart=" + multipart +
|
||||
", blobAccess=" + blobAccess +
|
||||
", useCustomExecutor=" + useCustomExecutor +
|
||||
", customExecutor=" + customExecutor + "]";
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue