mirror of https://github.com/apache/jclouds.git
Initial implementation of MPU for SwiftAsyncBlobStore.
This commit is contained in:
parent
807d078c6f
commit
c6b7d510b2
|
@ -69,7 +69,7 @@ public class CloudFilesAsyncBlobStore extends SwiftAsyncBlobStore {
|
|||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) {
|
||||
super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd,
|
||||
container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd,
|
||||
blob2ObjectGetOptions, fetchBlobMetadataProvider);
|
||||
blob2ObjectGetOptions, fetchBlobMetadataProvider, null);
|
||||
this.enableCDNAndCache = enableCDNAndCache;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList;
|
|||
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||
import org.jclouds.openstack.swift.blobstore.strategy.internal.AsyncMultipartUploadStrategy;
|
||||
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
||||
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
||||
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
||||
|
@ -82,7 +83,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
private final ObjectToBlobMetadata object2BlobMd;
|
||||
private final BlobToHttpGetOptions blob2ObjectGetOptions;
|
||||
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
|
||||
//private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
|
||||
private final Provider<AsyncMultipartUploadStrategy> multipartUploadStrategy;
|
||||
|
||||
@Inject
|
||||
protected SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
||||
|
@ -92,7 +93,8 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
|
||||
Provider<AsyncMultipartUploadStrategy> multipartUploadStrategy) {
|
||||
super(context, blobUtils, service, defaultLocation, locations);
|
||||
this.sync = sync;
|
||||
this.async = async;
|
||||
|
@ -104,7 +106,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
this.object2BlobMd = object2BlobMd;
|
||||
this.blob2ObjectGetOptions = blob2ObjectGetOptions;
|
||||
this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider");
|
||||
//this.multipartUploadStrategy = multipartUploadStrategy;
|
||||
this.multipartUploadStrategy = multipartUploadStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -241,12 +243,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
|||
|
||||
@Override
|
||||
public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
|
||||
// TODO implement options
|
||||
//if (options.isMultipart()) {
|
||||
// return null; //Lis multipartUploadStrategy.get().execute(container, blob, options);
|
||||
//} else {
|
||||
if (options.isMultipart()) {
|
||||
return multipartUploadStrategy.get().execute(container, blob, options, blob2Object);
|
||||
} else {
|
||||
return putBlob(container, blob);
|
||||
//}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -210,7 +210,6 @@ public class SwiftBlobStore extends BaseBlobStore {
|
|||
*/
|
||||
@Override
|
||||
public String putBlob(String container, Blob blob, PutOptions options) {
|
||||
// TODO implement options
|
||||
if (options.isMultipart()) {
|
||||
return multipartUploadStrategy.get().execute(container, blob, options, blob2Object);
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.inject.ImplementedBy;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.options.PutOptions;
|
||||
import org.jclouds.openstack.swift.blobstore.functions.BlobToObject;
|
||||
|
||||
@ImplementedBy(ParallelMultipartUploadStrategy.class)
|
||||
public interface AsyncMultipartUploadStrategy {
|
||||
ListenableFuture<String> execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object);
|
||||
}
|
|
@ -0,0 +1,268 @@
|
|||
package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.inject.Inject;
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.blobstore.domain.Blob;
|
||||
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
||||
import org.jclouds.blobstore.options.PutOptions;
|
||||
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||
import org.jclouds.concurrent.Futures;
|
||||
import org.jclouds.io.Payload;
|
||||
import org.jclouds.io.PayloadSlicer;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.openstack.swift.SwiftAsyncClient;
|
||||
import org.jclouds.openstack.swift.SwiftClient;
|
||||
import org.jclouds.openstack.swift.blobstore.SwiftAsyncBlobStore;
|
||||
import org.jclouds.openstack.swift.blobstore.functions.BlobToObject;
|
||||
import org.jclouds.util.Throwables2;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Named;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStrategy {
|
||||
@Resource
|
||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
||||
public static final String PART_SEPARATOR = "/";
|
||||
@VisibleForTesting
|
||||
static final int DEFAULT_PARALLEL_DEGREE = 4;
|
||||
@VisibleForTesting
|
||||
static final int DEFAULT_MIN_RETRIES = 5;
|
||||
@VisibleForTesting
|
||||
static final int DEFAULT_MAX_PERCENT_RETRIES = 10;
|
||||
|
||||
@Inject(optional = true)
|
||||
@Named("jclouds.mpu.parallel.degree")
|
||||
@VisibleForTesting
|
||||
int parallelDegree = DEFAULT_PARALLEL_DEGREE;
|
||||
|
||||
@Inject(optional = true)
|
||||
@Named("jclouds.mpu.parallel.retries.min")
|
||||
@VisibleForTesting
|
||||
int minRetries = DEFAULT_MIN_RETRIES;
|
||||
|
||||
@Inject(optional = true)
|
||||
@Named("jclouds.mpu.parallel.retries.maxpercent")
|
||||
@VisibleForTesting
|
||||
int maxPercentRetries = DEFAULT_MAX_PERCENT_RETRIES;
|
||||
|
||||
/**
|
||||
* maximum duration of an blob Request
|
||||
*/
|
||||
@Inject(optional = true)
|
||||
@Named(Constants.PROPERTY_REQUEST_TIMEOUT)
|
||||
protected Long maxTime;
|
||||
|
||||
private final ExecutorService ioWorkerExecutor;
|
||||
|
||||
protected final SwiftAsyncBlobStore ablobstore;
|
||||
protected final PayloadSlicer slicer;
|
||||
|
||||
@Inject
|
||||
public ParallelMultipartUploadStrategy(SwiftAsyncBlobStore ablobstore, PayloadSlicer slicer,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor) {
|
||||
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
|
||||
this.slicer = checkNotNull(slicer, "slicer");
|
||||
this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor");
|
||||
}
|
||||
|
||||
|
||||
protected void prepareUploadPart(final String container, final Blob blob, final String key,
|
||||
final Integer part, final Payload payload,
|
||||
final long offset, final long size, final SortedMap<Integer, String> etags,
|
||||
final BlockingQueue<Integer> activeParts,
|
||||
final Map<Integer, ListenableFuture<String>> futureParts,
|
||||
final AtomicInteger errors, final int maxRetries, final Map<Integer, Exception> errorMap,
|
||||
final Queue<Part> toRetry, final CountDownLatch latch,
|
||||
BlobToObject blob2Object) {
|
||||
if (errors.get() > maxRetries) {
|
||||
activeParts.remove(part); // remove part from the bounded-queue without blocking
|
||||
latch.countDown();
|
||||
return;
|
||||
}
|
||||
final SwiftAsyncClient client = (SwiftAsyncClient) ablobstore.getContext()
|
||||
.getProviderSpecificContext().getAsyncApi();
|
||||
Payload chunkedPart = slicer.slice(payload, offset, size);
|
||||
logger.debug(String.format("async uploading part %s of %s to container %s", part, key, container));
|
||||
final long start = System.currentTimeMillis();
|
||||
String blobPartName = blob.getMetadata().getName() + PART_SEPARATOR +
|
||||
String.valueOf(part);
|
||||
|
||||
Blob blobPart = ablobstore.blobBuilder(blobPartName).payload(chunkedPart).
|
||||
contentDisposition(blobPartName).build();
|
||||
final ListenableFuture<String> futureETag = client.putObject(container, blob2Object.apply(blobPart));
|
||||
futureETag.addListener(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
etags.put(part, futureETag.get());
|
||||
logger.debug(String.format("async uploaded part %s of %s to container %s in %sms",
|
||||
part, key, container, (System.currentTimeMillis() - start)));
|
||||
} catch (CancellationException e) {
|
||||
errorMap.put(part, e);
|
||||
String message = String.format("%s while uploading part %s - [%s,%s] to container %s with running since %dms",
|
||||
e.getMessage(), part, offset, size, container, (System.currentTimeMillis() - start));
|
||||
logger.debug(message);
|
||||
} catch (Exception e) {
|
||||
errorMap.put(part, e);
|
||||
String message = String.format("%s while uploading part %s - [%s,%s] to container %s running since %dms",
|
||||
e.getMessage(), part, offset, size, container, (System.currentTimeMillis() - start));
|
||||
logger.error(message, e);
|
||||
if (errors.incrementAndGet() <= maxRetries)
|
||||
toRetry.add(new Part(part, offset, size));
|
||||
} finally {
|
||||
activeParts.remove(part); // remove part from the bounded-queue without blocking
|
||||
futureParts.remove(part);
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}, ioWorkerExecutor);
|
||||
futureParts.put(part, futureETag);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<String> execute(final String container, final Blob blob, final PutOptions options, final BlobToObject blob2Object) {
|
||||
return Futures.makeListenable(
|
||||
ioWorkerExecutor.submit(new Callable<String>() {
|
||||
@Override
|
||||
public String call() throws Exception {
|
||||
String key = blob.getMetadata().getName();
|
||||
Payload payload = blob.getPayload();
|
||||
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
|
||||
algorithm.calculateChunkSize(payload.getContentMetadata()
|
||||
.getContentLength());
|
||||
int parts = algorithm.getParts();
|
||||
long chunkSize = algorithm.getChunkSize();
|
||||
long remaining = algorithm.getRemaining();
|
||||
if (parts > 0) {
|
||||
SwiftClient client = (SwiftClient) ablobstore
|
||||
.getContext().getProviderSpecificContext().getApi();
|
||||
final Map<Integer, ListenableFuture<String>> futureParts =
|
||||
new ConcurrentHashMap<Integer, ListenableFuture<String>>();
|
||||
final Map<Integer, Exception> errorMap = Maps.newHashMap();
|
||||
AtomicInteger errors = new AtomicInteger(0);
|
||||
int maxRetries = Math.max(minRetries, parts * maxPercentRetries / 100);
|
||||
int effectiveParts = remaining > 0 ? parts + 1 : parts;
|
||||
try {
|
||||
logger.debug(String.format("initiated multipart upload of %s to container %s" +
|
||||
" consisting from %s part (possible max. retries: %d)",
|
||||
key, container, effectiveParts, maxRetries));
|
||||
// we need a bounded-blocking queue to control the amount of parallel jobs
|
||||
ArrayBlockingQueue<Integer> activeParts = new ArrayBlockingQueue<Integer>(parallelDegree);
|
||||
Queue<Part> toRetry = new ConcurrentLinkedQueue<Part>();
|
||||
SortedMap<Integer, String> etags = new ConcurrentSkipListMap<Integer, String>();
|
||||
CountDownLatch latch = new CountDownLatch(effectiveParts);
|
||||
int part;
|
||||
while ((part = algorithm.getNextPart()) <= parts) {
|
||||
Integer partKey = new Integer(part);
|
||||
activeParts.put(partKey);
|
||||
|
||||
prepareUploadPart(container, blob, key, partKey, payload,
|
||||
algorithm.getNextChunkOffset(), chunkSize, etags,
|
||||
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch,
|
||||
blob2Object);
|
||||
}
|
||||
if (remaining > 0) {
|
||||
Integer partKey = new Integer(part);
|
||||
activeParts.put(partKey);
|
||||
prepareUploadPart(container, blob, key, partKey, payload,
|
||||
algorithm.getNextChunkOffset(), remaining, etags,
|
||||
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch,
|
||||
blob2Object);
|
||||
}
|
||||
latch.await();
|
||||
// handling retries
|
||||
while (errors.get() <= maxRetries && toRetry.size() > 0) {
|
||||
int atOnce = Math.min(Math.min(toRetry.size(), errors.get()), parallelDegree);
|
||||
CountDownLatch retryLatch = new CountDownLatch(atOnce);
|
||||
for (int i = 0; i < atOnce; i++) {
|
||||
Part failedPart = toRetry.poll();
|
||||
Integer partKey = new Integer(failedPart.getPart());
|
||||
activeParts.put(partKey);
|
||||
prepareUploadPart(container, blob, key, partKey, payload,
|
||||
failedPart.getOffset(), failedPart.getSize(), etags,
|
||||
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, retryLatch,
|
||||
blob2Object);
|
||||
}
|
||||
retryLatch.await();
|
||||
}
|
||||
if (errors.get() > maxRetries) {
|
||||
throw new BlobRuntimeException(String.format(
|
||||
"Too many failed parts: %s while multipart upload of %s to container %s",
|
||||
errors.get(), key, container));
|
||||
}
|
||||
|
||||
String eTag = client.putObjectManifest(container, key);
|
||||
logger.debug(String.format("multipart upload of %s to container %s" +
|
||||
" succeffully finished with %s retries", key, container, errors.get()));
|
||||
return eTag;
|
||||
} catch (Exception ex) {
|
||||
RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class);
|
||||
if (rtex == null) {
|
||||
rtex = new RuntimeException(ex);
|
||||
}
|
||||
for (Map.Entry<Integer, ListenableFuture<String>> entry : futureParts.entrySet()) {
|
||||
entry.getValue().cancel(false);
|
||||
}
|
||||
/*
|
||||
if (uploadId != null) {
|
||||
client.abortMultipartUpload(container, key, uploadId);
|
||||
} */
|
||||
throw rtex;
|
||||
}
|
||||
} else {
|
||||
ListenableFuture<String> futureETag = ablobstore.putBlob(container, blob, options);
|
||||
return maxTime != null ?
|
||||
futureETag.get(maxTime, TimeUnit.SECONDS) : futureETag.get();
|
||||
}
|
||||
}
|
||||
}), ioWorkerExecutor);
|
||||
}
|
||||
|
||||
class Part {
|
||||
private int part;
|
||||
private long offset;
|
||||
private long size;
|
||||
|
||||
Part(int part, long offset, long size) {
|
||||
this.part = part;
|
||||
this.offset = offset;
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
public int getPart() {
|
||||
return part;
|
||||
}
|
||||
|
||||
public void setPart(int part) {
|
||||
this.part = part;
|
||||
}
|
||||
|
||||
public long getOffset() {
|
||||
return offset;
|
||||
}
|
||||
|
||||
public void setOffset(long offset) {
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
public long getSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public void setSize(long size) {
|
||||
this.size = size;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -69,7 +69,7 @@ public class HPCloudObjectStorageAsyncBlobStore extends SwiftAsyncBlobStore {
|
|||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) {
|
||||
super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd,
|
||||
container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd,
|
||||
blob2ObjectGetOptions, fetchBlobMetadataProvider);
|
||||
blob2ObjectGetOptions, fetchBlobMetadataProvider, null);
|
||||
this.enableCDNAndCache = enableCDNAndCache;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue