diff --git a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java index 34b61b03de..68ad22f587 100644 --- a/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java +++ b/apis/cloudfiles/src/main/java/org/jclouds/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java @@ -69,7 +69,7 @@ public class CloudFilesAsyncBlobStore extends SwiftAsyncBlobStore { Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, - blob2ObjectGetOptions, fetchBlobMetadataProvider); + blob2ObjectGetOptions, fetchBlobMetadataProvider, null); this.enableCDNAndCache = enableCDNAndCache; } diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java index e59b86f95e..1ffd600bf0 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftAsyncBlobStore.java @@ -55,6 +55,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList; import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob; import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata; +import org.jclouds.openstack.swift.blobstore.strategy.internal.AsyncMultipartUploadStrategy; import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy; import org.jclouds.openstack.swift.domain.ContainerMetadata; import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata; @@ -82,7 +83,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { private final ObjectToBlobMetadata object2BlobMd; private final BlobToHttpGetOptions blob2ObjectGetOptions; private final Provider fetchBlobMetadataProvider; - //private final Provider multipartUploadStrategy; + private final Provider multipartUploadStrategy; @Inject protected SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils, @@ -92,7 +93,8 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions, ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions, - Provider fetchBlobMetadataProvider) { + Provider fetchBlobMetadataProvider, + Provider multipartUploadStrategy) { super(context, blobUtils, service, defaultLocation, locations); this.sync = sync; this.async = async; @@ -104,7 +106,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { this.object2BlobMd = object2BlobMd; this.blob2ObjectGetOptions = blob2ObjectGetOptions; this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider"); - //this.multipartUploadStrategy = multipartUploadStrategy; + this.multipartUploadStrategy = multipartUploadStrategy; } /** @@ -241,12 +243,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore { @Override public ListenableFuture putBlob(String container, Blob blob, PutOptions options) { - // TODO implement options - //if (options.isMultipart()) { - // return null; //Lis multipartUploadStrategy.get().execute(container, blob, options); - //} else { + if (options.isMultipart()) { + return multipartUploadStrategy.get().execute(container, blob, options, blob2Object); + } else { return putBlob(container, blob); - //} + } } @Override diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java index d19ab2ae44..2afc789227 100644 --- a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/SwiftBlobStore.java @@ -210,7 +210,6 @@ public class SwiftBlobStore extends BaseBlobStore { */ @Override public String putBlob(String container, Blob blob, PutOptions options) { - // TODO implement options if (options.isMultipart()) { return multipartUploadStrategy.get().execute(container, blob, options, blob2Object); } else { diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/AsyncMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/AsyncMultipartUploadStrategy.java new file mode 100644 index 0000000000..017d1dac22 --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/AsyncMultipartUploadStrategy.java @@ -0,0 +1,12 @@ +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.ImplementedBy; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; + +@ImplementedBy(ParallelMultipartUploadStrategy.class) +public interface AsyncMultipartUploadStrategy { + ListenableFuture execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object); +} diff --git a/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java new file mode 100644 index 0000000000..bdc86344c2 --- /dev/null +++ b/apis/swift/src/main/java/org/jclouds/openstack/swift/blobstore/strategy/internal/ParallelMultipartUploadStrategy.java @@ -0,0 +1,268 @@ +package org.jclouds.openstack.swift.blobstore.strategy.internal; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import org.jclouds.Constants; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.internal.BlobRuntimeException; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.blobstore.reference.BlobStoreConstants; +import org.jclouds.concurrent.Futures; +import org.jclouds.io.Payload; +import org.jclouds.io.PayloadSlicer; +import org.jclouds.logging.Logger; +import org.jclouds.openstack.swift.SwiftAsyncClient; +import org.jclouds.openstack.swift.SwiftClient; +import org.jclouds.openstack.swift.blobstore.SwiftAsyncBlobStore; +import org.jclouds.openstack.swift.blobstore.functions.BlobToObject; +import org.jclouds.util.Throwables2; + +import javax.annotation.Resource; +import javax.inject.Named; +import java.util.Map; +import java.util.Queue; +import java.util.SortedMap; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStrategy { + @Resource + @Named(BlobStoreConstants.BLOBSTORE_LOGGER) + protected Logger logger = Logger.NULL; + + public static final String PART_SEPARATOR = "/"; + @VisibleForTesting + static final int DEFAULT_PARALLEL_DEGREE = 4; + @VisibleForTesting + static final int DEFAULT_MIN_RETRIES = 5; + @VisibleForTesting + static final int DEFAULT_MAX_PERCENT_RETRIES = 10; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.degree") + @VisibleForTesting + int parallelDegree = DEFAULT_PARALLEL_DEGREE; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.retries.min") + @VisibleForTesting + int minRetries = DEFAULT_MIN_RETRIES; + + @Inject(optional = true) + @Named("jclouds.mpu.parallel.retries.maxpercent") + @VisibleForTesting + int maxPercentRetries = DEFAULT_MAX_PERCENT_RETRIES; + + /** + * maximum duration of an blob Request + */ + @Inject(optional = true) + @Named(Constants.PROPERTY_REQUEST_TIMEOUT) + protected Long maxTime; + + private final ExecutorService ioWorkerExecutor; + + protected final SwiftAsyncBlobStore ablobstore; + protected final PayloadSlicer slicer; + + @Inject + public ParallelMultipartUploadStrategy(SwiftAsyncBlobStore ablobstore, PayloadSlicer slicer, + @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor) { + this.ablobstore = checkNotNull(ablobstore, "ablobstore"); + this.slicer = checkNotNull(slicer, "slicer"); + this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor"); + } + + + protected void prepareUploadPart(final String container, final Blob blob, final String key, + final Integer part, final Payload payload, + final long offset, final long size, final SortedMap etags, + final BlockingQueue activeParts, + final Map> futureParts, + final AtomicInteger errors, final int maxRetries, final Map errorMap, + final Queue toRetry, final CountDownLatch latch, + BlobToObject blob2Object) { + if (errors.get() > maxRetries) { + activeParts.remove(part); // remove part from the bounded-queue without blocking + latch.countDown(); + return; + } + final SwiftAsyncClient client = (SwiftAsyncClient) ablobstore.getContext() + .getProviderSpecificContext().getAsyncApi(); + Payload chunkedPart = slicer.slice(payload, offset, size); + logger.debug(String.format("async uploading part %s of %s to container %s", part, key, container)); + final long start = System.currentTimeMillis(); + String blobPartName = blob.getMetadata().getName() + PART_SEPARATOR + + String.valueOf(part); + + Blob blobPart = ablobstore.blobBuilder(blobPartName).payload(chunkedPart). + contentDisposition(blobPartName).build(); + final ListenableFuture futureETag = client.putObject(container, blob2Object.apply(blobPart)); + futureETag.addListener(new Runnable() { + @Override + public void run() { + try { + etags.put(part, futureETag.get()); + logger.debug(String.format("async uploaded part %s of %s to container %s in %sms", + part, key, container, (System.currentTimeMillis() - start))); + } catch (CancellationException e) { + errorMap.put(part, e); + String message = String.format("%s while uploading part %s - [%s,%s] to container %s with running since %dms", + e.getMessage(), part, offset, size, container, (System.currentTimeMillis() - start)); + logger.debug(message); + } catch (Exception e) { + errorMap.put(part, e); + String message = String.format("%s while uploading part %s - [%s,%s] to container %s running since %dms", + e.getMessage(), part, offset, size, container, (System.currentTimeMillis() - start)); + logger.error(message, e); + if (errors.incrementAndGet() <= maxRetries) + toRetry.add(new Part(part, offset, size)); + } finally { + activeParts.remove(part); // remove part from the bounded-queue without blocking + futureParts.remove(part); + latch.countDown(); + } + } + }, ioWorkerExecutor); + futureParts.put(part, futureETag); + } + + @Override + public ListenableFuture execute(final String container, final Blob blob, final PutOptions options, final BlobToObject blob2Object) { + return Futures.makeListenable( + ioWorkerExecutor.submit(new Callable() { + @Override + public String call() throws Exception { + String key = blob.getMetadata().getName(); + Payload payload = blob.getPayload(); + MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm(); + algorithm.calculateChunkSize(payload.getContentMetadata() + .getContentLength()); + int parts = algorithm.getParts(); + long chunkSize = algorithm.getChunkSize(); + long remaining = algorithm.getRemaining(); + if (parts > 0) { + SwiftClient client = (SwiftClient) ablobstore + .getContext().getProviderSpecificContext().getApi(); + final Map> futureParts = + new ConcurrentHashMap>(); + final Map errorMap = Maps.newHashMap(); + AtomicInteger errors = new AtomicInteger(0); + int maxRetries = Math.max(minRetries, parts * maxPercentRetries / 100); + int effectiveParts = remaining > 0 ? parts + 1 : parts; + try { + logger.debug(String.format("initiated multipart upload of %s to container %s" + + " consisting from %s part (possible max. retries: %d)", + key, container, effectiveParts, maxRetries)); + // we need a bounded-blocking queue to control the amount of parallel jobs + ArrayBlockingQueue activeParts = new ArrayBlockingQueue(parallelDegree); + Queue toRetry = new ConcurrentLinkedQueue(); + SortedMap etags = new ConcurrentSkipListMap(); + CountDownLatch latch = new CountDownLatch(effectiveParts); + int part; + while ((part = algorithm.getNextPart()) <= parts) { + Integer partKey = new Integer(part); + activeParts.put(partKey); + + prepareUploadPart(container, blob, key, partKey, payload, + algorithm.getNextChunkOffset(), chunkSize, etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch, + blob2Object); + } + if (remaining > 0) { + Integer partKey = new Integer(part); + activeParts.put(partKey); + prepareUploadPart(container, blob, key, partKey, payload, + algorithm.getNextChunkOffset(), remaining, etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch, + blob2Object); + } + latch.await(); + // handling retries + while (errors.get() <= maxRetries && toRetry.size() > 0) { + int atOnce = Math.min(Math.min(toRetry.size(), errors.get()), parallelDegree); + CountDownLatch retryLatch = new CountDownLatch(atOnce); + for (int i = 0; i < atOnce; i++) { + Part failedPart = toRetry.poll(); + Integer partKey = new Integer(failedPart.getPart()); + activeParts.put(partKey); + prepareUploadPart(container, blob, key, partKey, payload, + failedPart.getOffset(), failedPart.getSize(), etags, + activeParts, futureParts, errors, maxRetries, errorMap, toRetry, retryLatch, + blob2Object); + } + retryLatch.await(); + } + if (errors.get() > maxRetries) { + throw new BlobRuntimeException(String.format( + "Too many failed parts: %s while multipart upload of %s to container %s", + errors.get(), key, container)); + } + + String eTag = client.putObjectManifest(container, key); + logger.debug(String.format("multipart upload of %s to container %s" + + " succeffully finished with %s retries", key, container, errors.get())); + return eTag; + } catch (Exception ex) { + RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class); + if (rtex == null) { + rtex = new RuntimeException(ex); + } + for (Map.Entry> entry : futureParts.entrySet()) { + entry.getValue().cancel(false); + } + /* + if (uploadId != null) { + client.abortMultipartUpload(container, key, uploadId); + } */ + throw rtex; + } + } else { + ListenableFuture futureETag = ablobstore.putBlob(container, blob, options); + return maxTime != null ? + futureETag.get(maxTime, TimeUnit.SECONDS) : futureETag.get(); + } + } + }), ioWorkerExecutor); + } + + class Part { + private int part; + private long offset; + private long size; + + Part(int part, long offset, long size) { + this.part = part; + this.offset = offset; + this.size = size; + } + + public int getPart() { + return part; + } + + public void setPart(int part) { + this.part = part; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public long getSize() { + return size; + } + + public void setSize(long size) { + this.size = size; + } + } +} diff --git a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java index ed305371de..fdaaa3ec4d 100644 --- a/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java +++ b/providers/hpcloud-objectstorage/src/main/java/org/jclouds/hpcloud/objectstorage/blobstore/HPCloudObjectStorageAsyncBlobStore.java @@ -69,7 +69,7 @@ public class HPCloudObjectStorageAsyncBlobStore extends SwiftAsyncBlobStore { Provider fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) { super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd, container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd, - blob2ObjectGetOptions, fetchBlobMetadataProvider); + blob2ObjectGetOptions, fetchBlobMetadataProvider, null); this.enableCDNAndCache = enableCDNAndCache; }