mirror of https://github.com/apache/jclouds.git
Issue 430: ParallelMultipartUploadStrategy added.
This commit is contained in:
parent
f0b141f77d
commit
8767a8100b
|
@ -29,6 +29,7 @@ import javax.inject.Provider;
|
||||||
import org.jclouds.Constants;
|
import org.jclouds.Constants;
|
||||||
import org.jclouds.aws.s3.AWSS3AsyncClient;
|
import org.jclouds.aws.s3.AWSS3AsyncClient;
|
||||||
import org.jclouds.aws.s3.AWSS3Client;
|
import org.jclouds.aws.s3.AWSS3Client;
|
||||||
|
import org.jclouds.aws.s3.blobstore.strategy.AsyncMultipartUploadStrategy;
|
||||||
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
||||||
import org.jclouds.blobstore.BlobStoreContext;
|
import org.jclouds.blobstore.BlobStoreContext;
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
@ -55,7 +56,7 @@ import com.google.common.util.concurrent.ListenableFuture;
|
||||||
*/
|
*/
|
||||||
public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
|
public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
|
||||||
|
|
||||||
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
|
private final Provider<AsyncMultipartUploadStrategy> multipartUploadStrategy;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
public AWSS3AsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
||||||
|
@ -65,7 +66,7 @@ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
|
||||||
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
|
BucketToResourceList bucket2ResourceList, ObjectToBlob object2Blob,
|
||||||
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
|
BlobToHttpGetOptions blob2ObjectGetOptions, BlobToObject blob2Object, ObjectToBlobMetadata object2BlobMd,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
|
||||||
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
Provider<AsyncMultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
|
super(context, blobUtils, service, defaultLocation, locations, async, sync, bucket2ResourceMd,
|
||||||
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object,
|
container2BucketListOptions, bucket2ResourceList, object2Blob, blob2ObjectGetOptions, blob2Object,
|
||||||
object2BlobMd, fetchBlobMetadataProvider);
|
object2BlobMd, fetchBlobMetadataProvider);
|
||||||
|
@ -74,9 +75,8 @@ public class AWSS3AsyncBlobStore extends S3AsyncBlobStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
public ListenableFuture<String> putBlobMultipart(String container, Blob blob) {
|
||||||
// TODO: make this better
|
|
||||||
// need to use a provider if the strategy object is stateful
|
// need to use a provider if the strategy object is stateful
|
||||||
return Futures.immediateFuture(multipartUploadStrategy.get().execute(container, blob));
|
return multipartUploadStrategy.get().execute(container, blob);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,9 @@ import org.jclouds.aws.s3.AWSS3AsyncClient;
|
||||||
import org.jclouds.aws.s3.AWSS3Client;
|
import org.jclouds.aws.s3.AWSS3Client;
|
||||||
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
|
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
|
||||||
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
|
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
|
||||||
|
import org.jclouds.aws.s3.blobstore.strategy.AsyncMultipartUploadStrategy;
|
||||||
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
||||||
|
import org.jclouds.aws.s3.blobstore.strategy.internal.ParallelMultipartUploadStrategy;
|
||||||
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
|
import org.jclouds.aws.s3.blobstore.strategy.internal.SequentialMultipartUploadStrategy;
|
||||||
import org.jclouds.blobstore.BlobStoreContext;
|
import org.jclouds.blobstore.BlobStoreContext;
|
||||||
import org.jclouds.blobstore.internal.BlobStoreContextImpl;
|
import org.jclouds.blobstore.internal.BlobStoreContextImpl;
|
||||||
|
@ -47,6 +49,7 @@ public class AWSS3BlobStoreContextModule extends S3BlobStoreContextModule {
|
||||||
bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
|
bind(S3AsyncBlobStore.class).to(AWSS3AsyncBlobStore.class).in(Scopes.SINGLETON);
|
||||||
bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
|
bind(S3BlobStore.class).to(AWSS3BlobStore.class).in(Scopes.SINGLETON);
|
||||||
bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class);
|
bind(MultipartUploadStrategy.class).to(SequentialMultipartUploadStrategy.class);
|
||||||
|
bind(AsyncMultipartUploadStrategy.class).to(ParallelMultipartUploadStrategy.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
||||||
|
*
|
||||||
|
* ====================================================================
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* ====================================================================
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.jclouds.aws.s3.blobstore.strategy;
|
||||||
|
|
||||||
|
import org.jclouds.aws.s3.blobstore.strategy.internal.ParallelMultipartUploadStrategy;
|
||||||
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.inject.ImplementedBy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see <a href="http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?qfacts.html"
|
||||||
|
*
|
||||||
|
* @author Tibor Kiss
|
||||||
|
*/
|
||||||
|
@ImplementedBy(ParallelMultipartUploadStrategy.class)
|
||||||
|
public interface AsyncMultipartUploadStrategy extends MultipartUpload {
|
||||||
|
|
||||||
|
ListenableFuture<String> execute(String container, Blob blob);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
||||||
|
*
|
||||||
|
* ====================================================================
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* ====================================================================
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.jclouds.aws.s3.blobstore.strategy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see <a href="http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?qfacts.html"
|
||||||
|
*
|
||||||
|
* @author Tibor Kiss
|
||||||
|
*/
|
||||||
|
public interface MultipartUpload {
|
||||||
|
|
||||||
|
/* Maximum number of parts per upload */
|
||||||
|
public static final int MAX_NUMBER_OF_PARTS = 10000;
|
||||||
|
/* Maximum number of parts returned for a list parts request */
|
||||||
|
public static final int MAX_LIST_PARTS_RETURNED = 1000;
|
||||||
|
/* Maximum number of multipart uploads returned in a list multipart uploads request */
|
||||||
|
public static final int MAX_LIST_MPU_RETURNED = 1000;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* part size 5 MB to 5 GB, last part can be < 5 MB
|
||||||
|
*/
|
||||||
|
public static final long MIN_PART_SIZE = 5242880L;
|
||||||
|
public static final long MAX_PART_SIZE = 5368709120L;
|
||||||
|
}
|
|
@ -30,21 +30,7 @@ import com.google.inject.ImplementedBy;
|
||||||
* @author Tibor Kiss
|
* @author Tibor Kiss
|
||||||
*/
|
*/
|
||||||
@ImplementedBy(SequentialMultipartUploadStrategy.class)
|
@ImplementedBy(SequentialMultipartUploadStrategy.class)
|
||||||
public interface MultipartUploadStrategy {
|
public interface MultipartUploadStrategy extends MultipartUpload {
|
||||||
|
|
||||||
/* Maximum number of parts per upload */
|
|
||||||
public static final int MAX_NUMBER_OF_PARTS = 10000;
|
|
||||||
/* Maximum number of parts returned for a list parts request */
|
|
||||||
public static final int MAX_LIST_PARTS_RETURNED = 1000;
|
|
||||||
/* Maximum number of multipart uploads returned in a list multipart uploads request */
|
|
||||||
public static final int MAX_LIST_MPU_RETURNED = 1000;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* part size 5 MB to 5 GB, last part can be < 5 MB
|
|
||||||
*/
|
|
||||||
public static final long MIN_PART_SIZE = 5242880L;
|
|
||||||
public static final long MAX_PART_SIZE = 5368709120L;
|
|
||||||
|
|
||||||
String execute(String container, Blob blob);
|
|
||||||
|
|
||||||
|
String execute(String container, Blob blob);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,135 @@
|
||||||
|
/*
|
||||||
|
* MultipartUploadSlicingAlgorithm.java
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* Created by: tibor
|
||||||
|
*
|
||||||
|
* History
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.jclouds.aws.s3.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import javax.inject.Named;
|
||||||
|
|
||||||
|
import org.jclouds.aws.s3.blobstore.strategy.MultipartUpload;
|
||||||
|
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||||
|
import org.jclouds.logging.Logger;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
|
||||||
|
public class MultipartUploadSlicingAlgorithm {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
|
protected Logger logger = Logger.NULL;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final long DEFAULT_PART_SIZE = 33554432; // 32MB
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int DEFAULT_MAGNITUDE_BASE = 100;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parts.size")
|
||||||
|
@VisibleForTesting
|
||||||
|
long defaultPartSize = DEFAULT_PART_SIZE;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parts.magnitude")
|
||||||
|
@VisibleForTesting
|
||||||
|
int magnitudeBase = DEFAULT_MAGNITUDE_BASE;
|
||||||
|
|
||||||
|
// calculated only once, but not from the constructor
|
||||||
|
private volatile int parts; // required number of parts with chunkSize
|
||||||
|
private volatile long chunkSize;
|
||||||
|
private volatile long remaining; // number of bytes remained for the last part
|
||||||
|
|
||||||
|
// sequentially updated values
|
||||||
|
private volatile int part;
|
||||||
|
private volatile long chunkOffset;
|
||||||
|
private volatile long copied;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected long calculateChunkSize(long length) {
|
||||||
|
long unitPartSize = defaultPartSize; // first try with default part size
|
||||||
|
int parts = (int)(length / unitPartSize);
|
||||||
|
long partSize = unitPartSize;
|
||||||
|
int magnitude = (int) (parts / magnitudeBase);
|
||||||
|
if (magnitude > 0) {
|
||||||
|
partSize = magnitude * unitPartSize;
|
||||||
|
if (partSize > MultipartUpload.MAX_PART_SIZE) {
|
||||||
|
partSize = MultipartUpload.MAX_PART_SIZE;
|
||||||
|
unitPartSize = MultipartUpload.MAX_PART_SIZE;
|
||||||
|
}
|
||||||
|
parts = (int)(length / partSize);
|
||||||
|
if (parts * partSize < length) {
|
||||||
|
partSize = (magnitude + 1) * unitPartSize;
|
||||||
|
if (partSize > MultipartUpload.MAX_PART_SIZE) {
|
||||||
|
partSize = MultipartUpload.MAX_PART_SIZE;
|
||||||
|
unitPartSize = MultipartUpload.MAX_PART_SIZE;
|
||||||
|
}
|
||||||
|
parts = (int)(length / partSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if splits in too many parts or
|
||||||
|
// cannot be split
|
||||||
|
unitPartSize = MultipartUpload.MIN_PART_SIZE; // take the minimum part size
|
||||||
|
parts = (int)(length / unitPartSize);
|
||||||
|
}
|
||||||
|
if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if still splits in too many parts
|
||||||
|
parts = MultipartUpload.MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not
|
||||||
|
// covering
|
||||||
|
}
|
||||||
|
long remainder = length % unitPartSize;
|
||||||
|
if (remainder == 0 && parts > 0) {
|
||||||
|
parts -= 1;
|
||||||
|
}
|
||||||
|
this.chunkSize = partSize;
|
||||||
|
this.parts = parts;
|
||||||
|
this.remaining = length - partSize * parts;
|
||||||
|
logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
|
||||||
|
remaining, (remaining > MultipartUpload.MAX_PART_SIZE ? " overflow!" : ""));
|
||||||
|
return this.chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCopied() {
|
||||||
|
return copied;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCopied(long copied) {
|
||||||
|
this.copied = copied;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected int getParts() {
|
||||||
|
return parts;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getNextPart() {
|
||||||
|
return ++part;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addCopied(long copied) {
|
||||||
|
this.copied += copied;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getNextChunkOffset() {
|
||||||
|
long next = chunkOffset;
|
||||||
|
chunkOffset += getChunkSize();
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected long getChunkSize() {
|
||||||
|
return chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected long getRemaining() {
|
||||||
|
return remaining;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,287 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
|
||||||
|
*
|
||||||
|
* ====================================================================
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* ====================================================================
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.jclouds.aws.s3.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.SortedMap;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import javax.inject.Named;
|
||||||
|
|
||||||
|
import org.jclouds.Constants;
|
||||||
|
import org.jclouds.aws.s3.AWSS3AsyncClient;
|
||||||
|
import org.jclouds.aws.s3.AWSS3Client;
|
||||||
|
import org.jclouds.aws.s3.blobstore.AWSS3AsyncBlobStore;
|
||||||
|
import org.jclouds.aws.s3.blobstore.strategy.AsyncMultipartUploadStrategy;
|
||||||
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
||||||
|
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||||
|
import org.jclouds.concurrent.Futures;
|
||||||
|
import org.jclouds.io.Payload;
|
||||||
|
import org.jclouds.io.PayloadSlicer;
|
||||||
|
import org.jclouds.logging.Logger;
|
||||||
|
import org.jclouds.s3.domain.ObjectMetadataBuilder;
|
||||||
|
import org.jclouds.util.Throwables2;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
|
||||||
|
public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStrategy {
|
||||||
|
@Resource
|
||||||
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
|
protected Logger logger = Logger.NULL;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int DEFAULT_PARALLEL_DEGREE = 4;
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int DEFAULT_MIN_RETRIES = 5;
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int DEFAULT_MAX_PERCENT_RETRIES = 10;
|
||||||
|
|
||||||
|
private final ExecutorService ioWorkerExecutor;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parallel.degree")
|
||||||
|
@VisibleForTesting
|
||||||
|
int parallelDegree = DEFAULT_PARALLEL_DEGREE;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parallel.retries.min")
|
||||||
|
@VisibleForTesting
|
||||||
|
int minRetries = DEFAULT_MIN_RETRIES;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parallel.retries.maxpercent")
|
||||||
|
@VisibleForTesting
|
||||||
|
int maxPercentRetries = DEFAULT_MAX_PERCENT_RETRIES;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* maximum duration of an blob Request
|
||||||
|
*/
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named(Constants.PROPERTY_REQUEST_TIMEOUT)
|
||||||
|
protected Long maxTime;
|
||||||
|
|
||||||
|
protected final AWSS3AsyncBlobStore ablobstore;
|
||||||
|
protected final PayloadSlicer slicer;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public ParallelMultipartUploadStrategy(AWSS3AsyncBlobStore ablobstore, PayloadSlicer slicer,
|
||||||
|
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor) {
|
||||||
|
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
|
||||||
|
this.slicer = checkNotNull(slicer, "slicer");
|
||||||
|
this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void prepareUploadPart(final String container, final String key,
|
||||||
|
final String uploadId, final Integer part, final Payload payload,
|
||||||
|
final long offset, final long size, final SortedMap<Integer, String> etags,
|
||||||
|
final BlockingQueue<Integer> activeParts,
|
||||||
|
final Map<Integer, ListenableFuture<String>> futureParts,
|
||||||
|
final AtomicInteger errors, final int maxRetries, final Map<Integer, Exception> errorMap,
|
||||||
|
final Queue<Part> toRetry, final CountDownLatch latch) {
|
||||||
|
if (errors.get() > maxRetries) {
|
||||||
|
activeParts.remove(part); // remove part from the bounded-queue without blocking
|
||||||
|
latch.countDown();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final AWSS3AsyncClient client = (AWSS3AsyncClient) ablobstore.getContext()
|
||||||
|
.getProviderSpecificContext().getAsyncApi();
|
||||||
|
Payload chunkedPart = slicer.slice(payload, offset, size);
|
||||||
|
logger.debug(String.format("async uploading part %s of %s to container %s with uploadId %s", part, key, container, uploadId));
|
||||||
|
final long start = System.currentTimeMillis();
|
||||||
|
final ListenableFuture<String> futureETag = client.uploadPart(container, key, part, uploadId, chunkedPart);
|
||||||
|
futureETag.addListener(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
etags.put(part, futureETag.get());
|
||||||
|
logger.debug(String.format("async uploaded part %s of %s to container %s in %sms with uploadId %s",
|
||||||
|
part, key, container, (System.currentTimeMillis()-start), uploadId));
|
||||||
|
} catch (CancellationException e) {
|
||||||
|
errorMap.put(part, e);
|
||||||
|
String message = String.format("%s while uploading part %s - [%s,%s] to container %s with uploadId: %s running since %dms",
|
||||||
|
e.getMessage(), part, offset, size, container, uploadId, (System.currentTimeMillis()-start));
|
||||||
|
logger.debug(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
errorMap.put(part, e);
|
||||||
|
String message = String.format("%s while uploading part %s - [%s,%s] to container %s with uploadId: %s running since %dms",
|
||||||
|
e.getMessage(), part, offset, size, container, uploadId, (System.currentTimeMillis()-start));
|
||||||
|
logger.error(message, e);
|
||||||
|
if (errors.incrementAndGet() <= maxRetries)
|
||||||
|
toRetry.add(new Part(part, offset, size));
|
||||||
|
} finally {
|
||||||
|
activeParts.remove(part); // remove part from the bounded-queue without blocking
|
||||||
|
futureParts.remove(part);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, ioWorkerExecutor);
|
||||||
|
futureParts.put(part, futureETag);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListenableFuture<String> execute(final String container, final Blob blob) {
|
||||||
|
return Futures.makeListenable(
|
||||||
|
ioWorkerExecutor.submit(new Callable<String>() {
|
||||||
|
@Override
|
||||||
|
public String call() throws Exception {
|
||||||
|
String key = blob.getMetadata().getName();
|
||||||
|
Payload payload = blob.getPayload();
|
||||||
|
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
|
||||||
|
algorithm.calculateChunkSize(payload.getContentMetadata()
|
||||||
|
.getContentLength());
|
||||||
|
int parts = algorithm.getParts();
|
||||||
|
long chunkSize = algorithm.getChunkSize();
|
||||||
|
long remaining = algorithm.getRemaining();
|
||||||
|
if (parts > 0) {
|
||||||
|
AWSS3Client client = (AWSS3Client) ablobstore
|
||||||
|
.getContext().getProviderSpecificContext().getApi();
|
||||||
|
String uploadId = null;
|
||||||
|
final Map<Integer, ListenableFuture<String>> futureParts =
|
||||||
|
new ConcurrentHashMap<Integer, ListenableFuture<String>>();
|
||||||
|
final Map<Integer, Exception> errorMap = Maps.newHashMap();
|
||||||
|
AtomicInteger errors = new AtomicInteger(0);
|
||||||
|
int maxRetries = Math.max(minRetries, parts * maxPercentRetries / 100);
|
||||||
|
int effectiveParts = remaining > 0 ? parts + 1 : parts;
|
||||||
|
try {
|
||||||
|
uploadId = client.initiateMultipartUpload(container,
|
||||||
|
ObjectMetadataBuilder.create().key(key).build()); // TODO md5
|
||||||
|
logger.debug(String.format("initiated multipart upload of %s to container %s" +
|
||||||
|
" with uploadId %s consisting from %s part (possible max. retries: %d)",
|
||||||
|
key, container, uploadId, effectiveParts, maxRetries));
|
||||||
|
// we need a bounded-blocking queue to control the amount of parallel jobs
|
||||||
|
ArrayBlockingQueue<Integer> activeParts = new ArrayBlockingQueue<Integer>(parallelDegree);
|
||||||
|
Queue<Part> toRetry = new ConcurrentLinkedQueue<Part>();
|
||||||
|
SortedMap<Integer, String> etags = new ConcurrentSkipListMap<Integer, String>();
|
||||||
|
CountDownLatch latch = new CountDownLatch(effectiveParts);
|
||||||
|
int part;
|
||||||
|
while ((part = algorithm.getNextPart()) <= parts) {
|
||||||
|
Integer partKey = new Integer(part);
|
||||||
|
activeParts.put(partKey);
|
||||||
|
prepareUploadPart(container, key, uploadId, partKey, payload,
|
||||||
|
algorithm.getNextChunkOffset(), chunkSize, etags,
|
||||||
|
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch);
|
||||||
|
}
|
||||||
|
if (remaining > 0) {
|
||||||
|
Integer partKey = new Integer(part);
|
||||||
|
activeParts.put(partKey);
|
||||||
|
prepareUploadPart(container, key, uploadId, partKey, payload,
|
||||||
|
algorithm.getNextChunkOffset(), remaining, etags,
|
||||||
|
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch);
|
||||||
|
}
|
||||||
|
latch.await(1, TimeUnit.MINUTES);
|
||||||
|
// handling retries
|
||||||
|
while (errors.get() <= maxRetries && toRetry.size() > 0) {
|
||||||
|
int atOnce = Math.min(Math.min(toRetry.size(), errors.get()), parallelDegree);
|
||||||
|
CountDownLatch retryLatch = new CountDownLatch(atOnce);
|
||||||
|
for (int i = 0; i < atOnce; i++) {
|
||||||
|
Part failedPart = toRetry.poll();
|
||||||
|
Integer partKey = new Integer(failedPart.getPart());
|
||||||
|
activeParts.put(partKey);
|
||||||
|
prepareUploadPart(container, key, uploadId, partKey, payload,
|
||||||
|
failedPart.getOffset(), failedPart.getSize(), etags,
|
||||||
|
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, retryLatch);
|
||||||
|
}
|
||||||
|
retryLatch.await(1, TimeUnit.MINUTES);
|
||||||
|
}
|
||||||
|
if (errors.get() > maxRetries) {
|
||||||
|
throw new BlobRuntimeException(String.format(
|
||||||
|
"Too many failed parts: %s while multipart upload of %s to container %s with uploadId %s",
|
||||||
|
errors.get(), key, container, uploadId));
|
||||||
|
}
|
||||||
|
String eTag = client.completeMultipartUpload(container, key, uploadId, etags);
|
||||||
|
logger.debug(String.format("multipart upload of %s to container %s with uploadId %s" +
|
||||||
|
" succeffully finished with %s retries", key, container, uploadId, errors.get()));
|
||||||
|
return eTag;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class);
|
||||||
|
if (rtex == null) {
|
||||||
|
rtex = new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
for (Map.Entry<Integer, ListenableFuture<String>> entry : futureParts.entrySet()) {
|
||||||
|
entry.getValue().cancel(false);
|
||||||
|
}
|
||||||
|
if (uploadId != null) {
|
||||||
|
client.abortMultipartUpload(container, key, uploadId);
|
||||||
|
}
|
||||||
|
throw rtex;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ListenableFuture<String> futureETag = ablobstore.putBlob(container, blob);
|
||||||
|
return maxTime != null ?
|
||||||
|
futureETag.get(maxTime,TimeUnit.SECONDS) : futureETag.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}), ioWorkerExecutor);
|
||||||
|
}
|
||||||
|
|
||||||
|
class Part {
|
||||||
|
private int part;
|
||||||
|
private long offset;
|
||||||
|
private long size;
|
||||||
|
|
||||||
|
Part(int part, long offset, long size) {
|
||||||
|
this.part = part;
|
||||||
|
this.offset = offset;
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPart() {
|
||||||
|
return part;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPart(int part) {
|
||||||
|
this.part = part;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOffset() {
|
||||||
|
return offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOffset(long offset) {
|
||||||
|
this.offset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSize() {
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSize(long size) {
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,9 +22,7 @@ package org.jclouds.aws.s3.blobstore.strategy.internal;
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
@ -41,7 +39,6 @@ import org.jclouds.logging.Logger;
|
||||||
import org.jclouds.s3.domain.ObjectMetadataBuilder;
|
import org.jclouds.s3.domain.ObjectMetadataBuilder;
|
||||||
import org.jclouds.util.Throwables2;
|
import org.jclouds.util.Throwables2;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
|
||||||
|
@ -62,126 +59,24 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg
|
||||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
protected Logger logger = Logger.NULL;
|
protected Logger logger = Logger.NULL;
|
||||||
|
|
||||||
@VisibleForTesting
|
protected final AWSS3BlobStore ablobstore;
|
||||||
static final long DEFAULT_PART_SIZE = 33554432; // 32MB
|
protected final PayloadSlicer slicer;
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static final int DEFAULT_MAGNITUDE_BASE = 100;
|
|
||||||
|
|
||||||
@Inject(optional = true)
|
|
||||||
@Named("jclouds.mpu.parts.size")
|
|
||||||
@VisibleForTesting
|
|
||||||
long defaultPartSize = DEFAULT_PART_SIZE;
|
|
||||||
|
|
||||||
@Inject(optional = true)
|
|
||||||
@Named("jclouds.mpu.parts.magnitude")
|
|
||||||
@VisibleForTesting
|
|
||||||
int magnitudeBase = DEFAULT_MAGNITUDE_BASE;
|
|
||||||
|
|
||||||
private final AWSS3BlobStore ablobstore;
|
|
||||||
private final PayloadSlicer slicer;
|
|
||||||
|
|
||||||
// calculated only once, but not from the constructor
|
|
||||||
private volatile long parts; // required number of parts with chunkSize
|
|
||||||
private volatile long chunkSize;
|
|
||||||
private volatile long remaining; // number of bytes remained for the last part
|
|
||||||
|
|
||||||
// sequentially updated values
|
|
||||||
private volatile int part;
|
|
||||||
private volatile long chunkOffset;
|
|
||||||
private volatile long copied;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) {
|
public SequentialMultipartUploadStrategy(AWSS3BlobStore ablobstore, PayloadSlicer slicer) {
|
||||||
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
|
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
|
||||||
this.slicer = checkNotNull(slicer, "slicer");
|
this.slicer = checkNotNull(slicer, "slicer");
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
protected void prepareUploadPart(String container, String key, String uploadId, int part,
|
||||||
protected long calculateChunkSize(long length) {
|
Payload payload, long offset, long size, SortedMap<Integer, String> etags) {
|
||||||
long unitPartSize = defaultPartSize; // first try with default part size
|
AWSS3Client client = (AWSS3Client) ablobstore.getContext()
|
||||||
long parts = length / unitPartSize;
|
.getProviderSpecificContext().getApi();
|
||||||
long partSize = unitPartSize;
|
Payload chunkedPart = slicer.slice(payload, offset, size);
|
||||||
int magnitude = (int) (parts / magnitudeBase);
|
|
||||||
if (magnitude > 0) {
|
|
||||||
partSize = magnitude * unitPartSize;
|
|
||||||
if (partSize > MAX_PART_SIZE) {
|
|
||||||
partSize = MAX_PART_SIZE;
|
|
||||||
unitPartSize = MAX_PART_SIZE;
|
|
||||||
}
|
|
||||||
parts = length / partSize;
|
|
||||||
if (parts * partSize < length) {
|
|
||||||
partSize = (magnitude + 1) * unitPartSize;
|
|
||||||
if (partSize > MAX_PART_SIZE) {
|
|
||||||
partSize = MAX_PART_SIZE;
|
|
||||||
unitPartSize = MAX_PART_SIZE;
|
|
||||||
}
|
|
||||||
parts = length / partSize;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (parts > MAX_NUMBER_OF_PARTS) { // if splits in too many parts or
|
|
||||||
// cannot be split
|
|
||||||
unitPartSize = MIN_PART_SIZE; // take the minimum part size
|
|
||||||
parts = length / unitPartSize;
|
|
||||||
}
|
|
||||||
if (parts > MAX_NUMBER_OF_PARTS) { // if still splits in too many parts
|
|
||||||
parts = MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not
|
|
||||||
// covering
|
|
||||||
}
|
|
||||||
long remainder = length % unitPartSize;
|
|
||||||
if (remainder == 0 && parts > 0) {
|
|
||||||
parts -= 1;
|
|
||||||
}
|
|
||||||
this.chunkSize = partSize;
|
|
||||||
this.parts = parts;
|
|
||||||
this.remaining = length - partSize * parts;
|
|
||||||
logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
|
|
||||||
remaining, (remaining > MAX_PART_SIZE ? " overflow!" : ""));
|
|
||||||
return this.chunkSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getCopied() {
|
|
||||||
return copied;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCopied(long copied) {
|
|
||||||
this.copied = copied;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected long getParts() {
|
|
||||||
return parts;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected int getNextPart() {
|
|
||||||
return ++part;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void addCopied(long copied) {
|
|
||||||
this.copied += copied;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected long getNextChunkOffset() {
|
|
||||||
long next = chunkOffset;
|
|
||||||
chunkOffset += getChunkSize();
|
|
||||||
return next;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected long getChunkSize() {
|
|
||||||
return chunkSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected long getRemaining() {
|
|
||||||
return remaining;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String prepareUploadPart(AWSS3Client client, String container, String key, String uploadId, int part,
|
|
||||||
Payload chunkedPart) {
|
|
||||||
String eTag = null;
|
String eTag = null;
|
||||||
try {
|
try {
|
||||||
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
|
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
|
||||||
|
etags.put(new Integer(part), eTag);
|
||||||
} catch (KeyNotFoundException e) {
|
} catch (KeyNotFoundException e) {
|
||||||
// note that because of eventual consistency, the upload id may not be present yet
|
// note that because of eventual consistency, the upload id may not be present yet
|
||||||
// we may wish to add this condition to the retry handler
|
// we may wish to add this condition to the retry handler
|
||||||
|
@ -189,15 +84,18 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg
|
||||||
// we may also choose to implement ListParts and wait for the uploadId to become
|
// we may also choose to implement ListParts and wait for the uploadId to become
|
||||||
// available there.
|
// available there.
|
||||||
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
|
eTag = client.uploadPart(container, key, part, uploadId, chunkedPart);
|
||||||
|
etags.put(new Integer(part), eTag);
|
||||||
}
|
}
|
||||||
return eTag;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String execute(String container, Blob blob) {
|
public String execute(String container, Blob blob) {
|
||||||
String key = blob.getMetadata().getName();
|
String key = blob.getMetadata().getName();
|
||||||
calculateChunkSize(blob.getPayload().getContentMetadata().getContentLength());
|
Payload payload = blob.getPayload();
|
||||||
long parts = getParts();
|
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
|
||||||
|
algorithm.calculateChunkSize(payload.getContentMetadata().getContentLength());
|
||||||
|
int parts = algorithm.getParts();
|
||||||
|
long chunkSize = algorithm.getChunkSize();
|
||||||
if (parts > 0) {
|
if (parts > 0) {
|
||||||
AWSS3Client client = (AWSS3Client) ablobstore.getContext()
|
AWSS3Client client = (AWSS3Client) ablobstore.getContext()
|
||||||
.getProviderSpecificContext().getApi();
|
.getProviderSpecificContext().getApi();
|
||||||
|
@ -206,18 +104,14 @@ public class SequentialMultipartUploadStrategy implements MultipartUploadStrateg
|
||||||
try {
|
try {
|
||||||
SortedMap<Integer, String> etags = Maps.newTreeMap();
|
SortedMap<Integer, String> etags = Maps.newTreeMap();
|
||||||
int part;
|
int part;
|
||||||
while ((part = getNextPart()) <= getParts()) {
|
while ((part = algorithm.getNextPart()) <= parts) {
|
||||||
String eTag = prepareUploadPart(client, container, key,
|
prepareUploadPart(container, key, uploadId, part, payload,
|
||||||
uploadId, part, slicer.slice(blob.getPayload(),
|
algorithm.getNextChunkOffset(), chunkSize, etags);
|
||||||
getNextChunkOffset(), chunkSize));
|
|
||||||
etags.put(new Integer(part), eTag);
|
|
||||||
}
|
}
|
||||||
long remaining = getRemaining();
|
long remaining = algorithm.getRemaining();
|
||||||
if (remaining > 0) {
|
if (remaining > 0) {
|
||||||
String eTag = prepareUploadPart(client, container, key,
|
prepareUploadPart(container, key, uploadId, part, payload,
|
||||||
uploadId, part, slicer.slice(blob.getPayload(),
|
algorithm.getNextChunkOffset(), remaining, etags);
|
||||||
getNextChunkOffset(), remaining));
|
|
||||||
etags.put(new Integer(part), eTag);
|
|
||||||
}
|
}
|
||||||
return client.completeMultipartUpload(container, key, uploadId, etags);
|
return client.completeMultipartUpload(container, key, uploadId, etags);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
|
|
|
@ -19,13 +19,7 @@
|
||||||
|
|
||||||
package org.jclouds.aws.s3.blobstore.strategy.internal;
|
package org.jclouds.aws.s3.blobstore.strategy.internal;
|
||||||
|
|
||||||
import static org.easymock.classextension.EasyMock.createMock;
|
|
||||||
import static org.easymock.classextension.EasyMock.replay;
|
|
||||||
import static org.easymock.classextension.EasyMock.verify;
|
|
||||||
|
|
||||||
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
|
|
||||||
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
||||||
import org.jclouds.io.PayloadSlicer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Print out on the console some graph data regarding the partitioning algorithm.
|
* Print out on the console some graph data regarding the partitioning algorithm.
|
||||||
|
@ -34,46 +28,37 @@ import org.jclouds.io.PayloadSlicer;
|
||||||
*/
|
*/
|
||||||
public class MpuGraphData {
|
public class MpuGraphData {
|
||||||
|
|
||||||
private static void calculate(long length, SequentialMultipartUploadStrategy strategy) {
|
private static void calculate(long length, MultipartUploadSlicingAlgorithm algorithm) {
|
||||||
System.out.println("" + length + " " + strategy.getParts() + " "
|
System.out.println("" + length + " " + algorithm.getParts() + " "
|
||||||
+ strategy.calculateChunkSize(length) + " " + + strategy.getRemaining());
|
+ algorithm.calculateChunkSize(length) + " " + + algorithm.getRemaining());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void foreach(long from, long to1, long to2, long to3, SequentialMultipartUploadStrategy strategy) {
|
private static void foreach(long from, long to1, long to2, long to3, MultipartUploadSlicingAlgorithm algorithm) {
|
||||||
long i = 0L, step = 1L;
|
long i = 0L, step = 1L;
|
||||||
System.out.println("=== {" + from + "," + to1 + "} ===");
|
System.out.println("=== {" + from + "," + to1 + "} ===");
|
||||||
for (; i < to1 - from; step += i, i += step) {
|
for (; i < to1 - from; step += i, i += step) {
|
||||||
calculate(i + from, strategy);
|
calculate(i + from, algorithm);
|
||||||
}
|
}
|
||||||
calculate(to1, strategy);
|
calculate(to1, algorithm);
|
||||||
System.out.println("=== {" + (to1 + 1) + "," + to2 + "} ===");
|
System.out.println("=== {" + (to1 + 1) + "," + to2 + "} ===");
|
||||||
for (; i < to2 - to1; step += i / 20, i += step) {
|
for (; i < to2 - to1; step += i / 20, i += step) {
|
||||||
calculate(i + from, strategy);
|
calculate(i + from, algorithm);
|
||||||
}
|
}
|
||||||
calculate(to2, strategy);
|
calculate(to2, algorithm);
|
||||||
System.out.println("=== {" + (to2 + 1) + "," + to3 + "} ===");
|
System.out.println("=== {" + (to2 + 1) + "," + to3 + "} ===");
|
||||||
for (; i < to3 - to2; step += i / 40, i += step) {
|
for (; i < to3 - to2; step += i / 40, i += step) {
|
||||||
calculate(i + from, strategy);
|
calculate(i + from, algorithm);
|
||||||
}
|
}
|
||||||
calculate(to3, strategy);
|
calculate(to3, algorithm);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class);
|
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
|
||||||
PayloadSlicer slicer = createMock(PayloadSlicer.class);
|
|
||||||
|
|
||||||
replay(ablobStore);
|
|
||||||
replay(slicer);
|
|
||||||
|
|
||||||
SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer);
|
|
||||||
foreach(1L,
|
foreach(1L,
|
||||||
strategy.defaultPartSize * strategy.magnitudeBase,
|
algorithm.defaultPartSize * algorithm.magnitudeBase,
|
||||||
MultipartUploadStrategy.MAX_PART_SIZE * strategy.magnitudeBase,
|
MultipartUploadStrategy.MAX_PART_SIZE * algorithm.magnitudeBase,
|
||||||
MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_PARTS,
|
MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_PARTS,
|
||||||
strategy);
|
algorithm);
|
||||||
|
|
||||||
verify(ablobStore);
|
|
||||||
verify(slicer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,18 +19,13 @@
|
||||||
|
|
||||||
package org.jclouds.aws.s3.blobstore.strategy.internal;
|
package org.jclouds.aws.s3.blobstore.strategy.internal;
|
||||||
|
|
||||||
import static org.easymock.classextension.EasyMock.createMock;
|
|
||||||
import static org.easymock.classextension.EasyMock.replay;
|
|
||||||
import static org.easymock.classextension.EasyMock.verify;
|
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
|
|
||||||
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
|
|
||||||
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
import org.jclouds.aws.s3.blobstore.strategy.MultipartUploadStrategy;
|
||||||
import org.jclouds.io.PayloadSlicer;
|
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests behavior of {@code SequentialMultipartUploadStrategy} from the perspective of
|
* Tests behavior of {@code MultipartUploadSlicingAlgorithm} from the perspective of
|
||||||
* partitioning algorithm
|
* partitioning algorithm
|
||||||
*
|
*
|
||||||
* @author Tibor Kiss
|
* @author Tibor Kiss
|
||||||
|
@ -40,100 +35,76 @@ public class MpuPartitioningAlgorithmTest {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Below 1 parts the MPU is not used.
|
* Below 1 parts the MPU is not used.
|
||||||
* When we have more than {@code SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE} bytes data,
|
* When we have more than {@code MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE} bytes data,
|
||||||
* the MPU starts to become active.
|
* the MPU starts to become active.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLowerLimitFromWhereMultipartBecomeActive() {
|
public void testLowerLimitFromWhereMultipartBecomeActive() {
|
||||||
AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class);
|
MultipartUploadSlicingAlgorithm strategy = new MultipartUploadSlicingAlgorithm();
|
||||||
PayloadSlicer slicer = createMock(PayloadSlicer.class);
|
|
||||||
|
|
||||||
replay(ablobStore);
|
|
||||||
replay(slicer);
|
|
||||||
|
|
||||||
SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer);
|
|
||||||
|
|
||||||
// exactly the MIN_PART_SIZE
|
// exactly the MIN_PART_SIZE
|
||||||
long length = MultipartUploadStrategy.MIN_PART_SIZE;
|
long length = MultipartUploadStrategy.MIN_PART_SIZE;
|
||||||
long chunkSize = strategy.calculateChunkSize(length);
|
long chunkSize = strategy.calculateChunkSize(length);
|
||||||
assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE);
|
assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE);
|
||||||
assertEquals(strategy.getParts(), 0);
|
assertEquals(strategy.getParts(), 0);
|
||||||
assertEquals(strategy.getRemaining(), length);
|
assertEquals(strategy.getRemaining(), length);
|
||||||
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
||||||
|
|
||||||
// below DEFAULT_PART_SIZE
|
// below DEFAULT_PART_SIZE
|
||||||
length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE;
|
length = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE;
|
||||||
chunkSize = strategy.calculateChunkSize(length);
|
chunkSize = strategy.calculateChunkSize(length);
|
||||||
assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE);
|
assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE);
|
||||||
assertEquals(strategy.getParts(), 0);
|
assertEquals(strategy.getParts(), 0);
|
||||||
assertEquals(strategy.getRemaining(), length);
|
assertEquals(strategy.getRemaining(), length);
|
||||||
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
||||||
|
|
||||||
// exactly the DEFAULT_PART_SIZE
|
// exactly the DEFAULT_PART_SIZE
|
||||||
length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE + 1;
|
length = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE + 1;
|
||||||
chunkSize = strategy.calculateChunkSize(length);
|
chunkSize = strategy.calculateChunkSize(length);
|
||||||
assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE);
|
assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE);
|
||||||
assertEquals(strategy.getParts(), 1);
|
assertEquals(strategy.getParts(), 1);
|
||||||
assertEquals(strategy.getRemaining(), 1);
|
assertEquals(strategy.getRemaining(), 1);
|
||||||
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
||||||
|
|
||||||
verify(ablobStore);
|
|
||||||
verify(slicer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Phase 1 of the algorithm.
|
* Phase 1 of the algorithm.
|
||||||
* ChunkSize does not grow from a {@code MultipartUploadStrategy.DEFAULT_PART_SIZE}
|
* ChunkSize does not grow from a {@code MultipartUploadStrategy.DEFAULT_PART_SIZE}
|
||||||
* until we reach {@code SequentialMultipartUploadStrategy.MAGNITUDE_BASE} number of parts.
|
* until we reach {@code MultipartUploadSlicingAlgorithm.MAGNITUDE_BASE} number of parts.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testWhenChunkSizeHasToStartGrowing() {
|
public void testWhenChunkSizeHasToStartGrowing() {
|
||||||
AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class);
|
MultipartUploadSlicingAlgorithm strategy = new MultipartUploadSlicingAlgorithm();
|
||||||
PayloadSlicer slicer = createMock(PayloadSlicer.class);
|
|
||||||
|
|
||||||
replay(ablobStore);
|
|
||||||
replay(slicer);
|
|
||||||
|
|
||||||
SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer);
|
|
||||||
// upper limit while we still have exactly defaultPartSize chunkSize
|
// upper limit while we still have exactly defaultPartSize chunkSize
|
||||||
long length = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE * SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE;
|
long length = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE * MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE;
|
||||||
long chunkSize = strategy.calculateChunkSize(length);
|
long chunkSize = strategy.calculateChunkSize(length);
|
||||||
assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE);
|
assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE);
|
||||||
assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE - 1);
|
assertEquals(strategy.getParts(), MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE - 1);
|
||||||
assertEquals(strategy.getRemaining(), SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE);
|
assertEquals(strategy.getRemaining(), MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE);
|
||||||
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
||||||
|
|
||||||
// then chunkSize is increasing
|
// then chunkSize is increasing
|
||||||
length += 1;
|
length += 1;
|
||||||
chunkSize = strategy.calculateChunkSize(length);
|
chunkSize = strategy.calculateChunkSize(length);
|
||||||
assertEquals(chunkSize, SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE * 2);
|
assertEquals(chunkSize, MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE * 2);
|
||||||
assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE / 2);
|
assertEquals(strategy.getParts(), MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE / 2);
|
||||||
assertEquals(strategy.getRemaining(), 1);
|
assertEquals(strategy.getRemaining(), 1);
|
||||||
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
||||||
|
|
||||||
verify(ablobStore);
|
|
||||||
verify(slicer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Phase 2 of the algorithm.
|
* Phase 2 of the algorithm.
|
||||||
* The number of parts does not grow from {@code SequentialMultipartUploadStrategy.MAGNITUDE_BASE}
|
* The number of parts does not grow from {@code MultipartUploadSlicingAlgorithm.MAGNITUDE_BASE}
|
||||||
* until we reach the {@code MultipartUploadStrategy.MAX_PART_SIZE}.
|
* until we reach the {@code MultipartUploadStrategy.MAX_PART_SIZE}.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testWhenPartsHasToStartGrowingFromMagnitudeBase() {
|
public void testWhenPartsHasToStartGrowingFromMagnitudeBase() {
|
||||||
AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class);
|
MultipartUploadSlicingAlgorithm strategy = new MultipartUploadSlicingAlgorithm();
|
||||||
PayloadSlicer slicer = createMock(PayloadSlicer.class);
|
|
||||||
|
|
||||||
replay(ablobStore);
|
|
||||||
replay(slicer);
|
|
||||||
|
|
||||||
SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer);
|
|
||||||
// upper limit while we still have exactly MAGNITUDE_BASE parts (together with the remaining)
|
// upper limit while we still have exactly MAGNITUDE_BASE parts (together with the remaining)
|
||||||
long length = MultipartUploadStrategy.MAX_PART_SIZE * SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE;
|
long length = MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE;
|
||||||
long chunkSize = strategy.calculateChunkSize(length);
|
long chunkSize = strategy.calculateChunkSize(length);
|
||||||
assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE);
|
assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE);
|
||||||
assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE - 1);
|
assertEquals(strategy.getParts(), MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE - 1);
|
||||||
assertEquals(strategy.getRemaining(), MultipartUploadStrategy.MAX_PART_SIZE);
|
assertEquals(strategy.getRemaining(), MultipartUploadStrategy.MAX_PART_SIZE);
|
||||||
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
||||||
|
|
||||||
|
@ -141,12 +112,9 @@ public class MpuPartitioningAlgorithmTest {
|
||||||
length += 1;
|
length += 1;
|
||||||
chunkSize = strategy.calculateChunkSize(length);
|
chunkSize = strategy.calculateChunkSize(length);
|
||||||
assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE);
|
assertEquals(chunkSize, MultipartUploadStrategy.MAX_PART_SIZE);
|
||||||
assertEquals(strategy.getParts(), SequentialMultipartUploadStrategy.DEFAULT_MAGNITUDE_BASE);
|
assertEquals(strategy.getParts(), MultipartUploadSlicingAlgorithm.DEFAULT_MAGNITUDE_BASE);
|
||||||
assertEquals(strategy.getRemaining(), 1);
|
assertEquals(strategy.getRemaining(), 1);
|
||||||
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
||||||
|
|
||||||
verify(ablobStore);
|
|
||||||
verify(slicer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -156,13 +124,7 @@ public class MpuPartitioningAlgorithmTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testWhenPartsExceedsMaxNumberOfParts() {
|
public void testWhenPartsExceedsMaxNumberOfParts() {
|
||||||
AWSS3BlobStore ablobStore = createMock(AWSS3BlobStore.class);
|
MultipartUploadSlicingAlgorithm strategy = new MultipartUploadSlicingAlgorithm();
|
||||||
PayloadSlicer slicer = createMock(PayloadSlicer.class);
|
|
||||||
|
|
||||||
replay(ablobStore);
|
|
||||||
replay(slicer);
|
|
||||||
|
|
||||||
SequentialMultipartUploadStrategy strategy = new SequentialMultipartUploadStrategy(ablobStore, slicer);
|
|
||||||
// upper limit while we still have exactly MAX_NUMBER_OF_PARTS parts (together with the remaining)
|
// upper limit while we still have exactly MAX_NUMBER_OF_PARTS parts (together with the remaining)
|
||||||
long length = MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_PARTS;
|
long length = MultipartUploadStrategy.MAX_PART_SIZE * MultipartUploadStrategy.MAX_NUMBER_OF_PARTS;
|
||||||
long chunkSize = strategy.calculateChunkSize(length);
|
long chunkSize = strategy.calculateChunkSize(length);
|
||||||
|
@ -178,8 +140,5 @@ public class MpuPartitioningAlgorithmTest {
|
||||||
assertEquals(strategy.getParts(), MultipartUploadStrategy.MAX_NUMBER_OF_PARTS);
|
assertEquals(strategy.getParts(), MultipartUploadStrategy.MAX_NUMBER_OF_PARTS);
|
||||||
assertEquals(strategy.getRemaining(), 1);
|
assertEquals(strategy.getRemaining(), 1);
|
||||||
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
assertEquals(chunkSize * strategy.getParts() + strategy.getRemaining(), length);
|
||||||
|
|
||||||
verify(ablobStore);
|
|
||||||
verify(slicer);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,17 +27,13 @@ import static org.easymock.classextension.EasyMock.verify;
|
||||||
import static org.testng.Assert.fail;
|
import static org.testng.Assert.fail;
|
||||||
|
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.easymock.EasyMock;
|
|
||||||
import org.jclouds.aws.s3.AWSS3Client;
|
import org.jclouds.aws.s3.AWSS3Client;
|
||||||
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
|
import org.jclouds.aws.s3.blobstore.AWSS3BlobStore;
|
||||||
import org.jclouds.blobstore.BlobStoreContext;
|
import org.jclouds.blobstore.BlobStoreContext;
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
import org.jclouds.blobstore.domain.MutableBlobMetadata;
|
import org.jclouds.blobstore.domain.MutableBlobMetadata;
|
||||||
import org.jclouds.concurrent.Timeout;
|
|
||||||
import org.jclouds.io.MutableContentMetadata;
|
import org.jclouds.io.MutableContentMetadata;
|
||||||
import org.jclouds.io.Payload;
|
import org.jclouds.io.Payload;
|
||||||
import org.jclouds.io.PayloadSlicer;
|
import org.jclouds.io.PayloadSlicer;
|
||||||
|
@ -74,7 +70,7 @@ public class SequentialMultipartUploadStrategyTest {
|
||||||
AWSS3Client client = createMock(AWSS3Client.class);
|
AWSS3Client client = createMock(AWSS3Client.class);
|
||||||
ObjectMetadata ometa = createMock(ObjectMetadata.class);
|
ObjectMetadata ometa = createMock(ObjectMetadata.class);
|
||||||
String uploadId = "uploadId";
|
String uploadId = "uploadId";
|
||||||
long chunkSize = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE;
|
long chunkSize = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE;
|
||||||
long remaining = 100L;
|
long remaining = 100L;
|
||||||
SortedMap<Integer, String> etags = Maps.newTreeMap();
|
SortedMap<Integer, String> etags = Maps.newTreeMap();
|
||||||
etags.put(new Integer(1), "eTag1");
|
etags.put(new Integer(1), "eTag1");
|
||||||
|
@ -137,7 +133,7 @@ public class SequentialMultipartUploadStrategyTest {
|
||||||
AWSS3Client client = createMock(AWSS3Client.class);
|
AWSS3Client client = createMock(AWSS3Client.class);
|
||||||
ObjectMetadata ometa = createMock(ObjectMetadata.class);
|
ObjectMetadata ometa = createMock(ObjectMetadata.class);
|
||||||
String uploadId = "uploadId";
|
String uploadId = "uploadId";
|
||||||
long chunkSize = SequentialMultipartUploadStrategy.DEFAULT_PART_SIZE;
|
long chunkSize = MultipartUploadSlicingAlgorithm.DEFAULT_PART_SIZE;
|
||||||
long remaining = 100L;
|
long remaining = 100L;
|
||||||
SortedMap<Integer, String> etags = Maps.newTreeMap();
|
SortedMap<Integer, String> etags = Maps.newTreeMap();
|
||||||
etags.put(new Integer(1), "eTag1");
|
etags.put(new Integer(1), "eTag1");
|
||||||
|
|
Loading…
Reference in New Issue