mirror of https://github.com/apache/jclouds.git
Merge pull request #573 from novel/swiftmultipart
Multipart Upload Support for Swift
This commit is contained in:
commit
e542433474
|
@ -49,6 +49,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.AsyncMultipartUploadStrategy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -66,10 +67,11 @@ public class CloudFilesAsyncBlobStore extends SwiftAsyncBlobStore {
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) {
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache,
|
||||||
|
Provider<AsyncMultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd,
|
super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd,
|
||||||
container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd,
|
container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd,
|
||||||
blob2ObjectGetOptions, fetchBlobMetadataProvider);
|
blob2ObjectGetOptions, fetchBlobMetadataProvider, multipartUploadStrategy);
|
||||||
this.enableCDNAndCache = enableCDNAndCache;
|
this.enableCDNAndCache = enableCDNAndCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -59,10 +60,11 @@ public class CloudFilesBlobStore extends SwiftBlobStore {
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) {
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache,
|
||||||
|
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions,
|
super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions,
|
||||||
container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions,
|
container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions,
|
||||||
fetchBlobMetadataProvider);
|
fetchBlobMetadataProvider, multipartUploadStrategy);
|
||||||
this.enableCDNAndCache = enableCDNAndCache;
|
this.enableCDNAndCache = enableCDNAndCache;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,14 +52,7 @@ import org.jclouds.openstack.swift.functions.ParseObjectInfoFromHeaders;
|
||||||
import org.jclouds.openstack.swift.functions.ParseObjectInfoListFromJsonResponse;
|
import org.jclouds.openstack.swift.functions.ParseObjectInfoListFromJsonResponse;
|
||||||
import org.jclouds.openstack.swift.functions.ReturnTrueOn404FalseOn409;
|
import org.jclouds.openstack.swift.functions.ReturnTrueOn404FalseOn409;
|
||||||
import org.jclouds.openstack.swift.options.ListContainerOptions;
|
import org.jclouds.openstack.swift.options.ListContainerOptions;
|
||||||
import org.jclouds.rest.annotations.BinderParam;
|
import org.jclouds.rest.annotations.*;
|
||||||
import org.jclouds.rest.annotations.Endpoint;
|
|
||||||
import org.jclouds.rest.annotations.ExceptionParser;
|
|
||||||
import org.jclouds.rest.annotations.ParamParser;
|
|
||||||
import org.jclouds.rest.annotations.QueryParams;
|
|
||||||
import org.jclouds.rest.annotations.RequestFilters;
|
|
||||||
import org.jclouds.rest.annotations.ResponseParser;
|
|
||||||
import org.jclouds.rest.annotations.SkipEncoding;
|
|
||||||
import org.jclouds.rest.functions.ReturnVoidOnNotFoundOr404;
|
import org.jclouds.rest.functions.ReturnVoidOnNotFoundOr404;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
@ -183,4 +176,11 @@ public interface CommonSwiftAsyncClient {
|
||||||
@Path("/{container}/{name}")
|
@Path("/{container}/{name}")
|
||||||
ListenableFuture<Void> removeObject(@PathParam("container") String container, @PathParam("name") String name);
|
ListenableFuture<Void> removeObject(@PathParam("container") String container, @PathParam("name") String name);
|
||||||
|
|
||||||
|
@PUT
|
||||||
|
@Path("/{container}/{name}")
|
||||||
|
@ResponseParser(ParseETagHeader.class)
|
||||||
|
@Headers(keys = "X-Object-Manifest", values="{container}/{name}")
|
||||||
|
ListenableFuture<String> putObjectManifest(@PathParam("container") String container,
|
||||||
|
@PathParam("name") String name);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.jclouds.blobstore.ContainerNotFoundException;
|
||||||
import org.jclouds.blobstore.domain.PageSet;
|
import org.jclouds.blobstore.domain.PageSet;
|
||||||
import org.jclouds.concurrent.Timeout;
|
import org.jclouds.concurrent.Timeout;
|
||||||
import org.jclouds.http.options.GetOptions;
|
import org.jclouds.http.options.GetOptions;
|
||||||
|
import org.jclouds.io.Payload;
|
||||||
import org.jclouds.openstack.swift.domain.AccountMetadata;
|
import org.jclouds.openstack.swift.domain.AccountMetadata;
|
||||||
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
||||||
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
||||||
|
@ -113,4 +114,5 @@ public interface CommonSwiftClient {
|
||||||
*/
|
*/
|
||||||
boolean objectExists(String container, String name);
|
boolean objectExists(String container, String name);
|
||||||
|
|
||||||
|
String putObjectManifest(String container, String name);
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,8 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.AsyncMultipartUploadStrategy;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
||||||
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
||||||
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
||||||
import org.jclouds.openstack.swift.domain.ObjectInfo;
|
import org.jclouds.openstack.swift.domain.ObjectInfo;
|
||||||
|
@ -81,6 +83,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
||||||
private final ObjectToBlobMetadata object2BlobMd;
|
private final ObjectToBlobMetadata object2BlobMd;
|
||||||
private final BlobToHttpGetOptions blob2ObjectGetOptions;
|
private final BlobToHttpGetOptions blob2ObjectGetOptions;
|
||||||
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
|
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
|
||||||
|
private final Provider<AsyncMultipartUploadStrategy> multipartUploadStrategy;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
protected SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
protected SwiftAsyncBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
||||||
|
@ -90,7 +93,8 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
|
||||||
|
Provider<AsyncMultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, service, defaultLocation, locations);
|
super(context, blobUtils, service, defaultLocation, locations);
|
||||||
this.sync = sync;
|
this.sync = sync;
|
||||||
this.async = async;
|
this.async = async;
|
||||||
|
@ -102,6 +106,7 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
||||||
this.object2BlobMd = object2BlobMd;
|
this.object2BlobMd = object2BlobMd;
|
||||||
this.blob2ObjectGetOptions = blob2ObjectGetOptions;
|
this.blob2ObjectGetOptions = blob2ObjectGetOptions;
|
||||||
this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider");
|
this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider");
|
||||||
|
this.multipartUploadStrategy = multipartUploadStrategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -238,8 +243,11 @@ public class SwiftAsyncBlobStore extends BaseAsyncBlobStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
|
public ListenableFuture<String> putBlob(String container, Blob blob, PutOptions options) {
|
||||||
// TODO implement options
|
if (options.isMultipart()) {
|
||||||
return putBlob(container, blob);
|
return multipartUploadStrategy.get().execute(container, blob, options, blob2Object);
|
||||||
|
} else {
|
||||||
|
return putBlob(container, blob);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
||||||
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
|
@ -71,6 +72,7 @@ public class SwiftBlobStore extends BaseBlobStore {
|
||||||
private final ObjectToBlobMetadata object2BlobMd;
|
private final ObjectToBlobMetadata object2BlobMd;
|
||||||
private final BlobToHttpGetOptions blob2ObjectGetOptions;
|
private final BlobToHttpGetOptions blob2ObjectGetOptions;
|
||||||
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
|
private final Provider<FetchBlobMetadata> fetchBlobMetadataProvider;
|
||||||
|
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
protected SwiftBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
|
protected SwiftBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
|
||||||
|
@ -79,7 +81,8 @@ public class SwiftBlobStore extends BaseBlobStore {
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider) {
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
|
||||||
|
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, defaultLocation, locations);
|
super(context, blobUtils, defaultLocation, locations);
|
||||||
this.sync = sync;
|
this.sync = sync;
|
||||||
this.container2ResourceMd = container2ResourceMd;
|
this.container2ResourceMd = container2ResourceMd;
|
||||||
|
@ -90,6 +93,7 @@ public class SwiftBlobStore extends BaseBlobStore {
|
||||||
this.object2BlobMd = object2BlobMd;
|
this.object2BlobMd = object2BlobMd;
|
||||||
this.blob2ObjectGetOptions = blob2ObjectGetOptions;
|
this.blob2ObjectGetOptions = blob2ObjectGetOptions;
|
||||||
this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider");
|
this.fetchBlobMetadataProvider = checkNotNull(fetchBlobMetadataProvider, "fetchBlobMetadataProvider");
|
||||||
|
this.multipartUploadStrategy = multipartUploadStrategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -206,8 +210,11 @@ public class SwiftBlobStore extends BaseBlobStore {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String putBlob(String container, Blob blob, PutOptions options) {
|
public String putBlob(String container, Blob blob, PutOptions options) {
|
||||||
// TODO implement options
|
if (options.isMultipart()) {
|
||||||
return putBlob(container, blob);
|
return multipartUploadStrategy.get().execute(container, blob, options, blob2Object);
|
||||||
|
} else {
|
||||||
|
return putBlob(container, blob);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
package org.jclouds.openstack.swift.blobstore.strategy;
|
||||||
|
|
||||||
|
public interface MultipartUpload {
|
||||||
|
|
||||||
|
/* Maximum number of parts per upload */
|
||||||
|
public static final int MAX_NUMBER_OF_PARTS = 10000;
|
||||||
|
/* Maximum number of parts returned for a list parts request */
|
||||||
|
public static final int MAX_LIST_PARTS_RETURNED = 1000;
|
||||||
|
/* Maximum number of multipart uploads returned in a list multipart uploads request */
|
||||||
|
public static final int MAX_LIST_MPU_RETURNED = 1000;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* part size 5 MB to 5 GB, last part can be < 5 MB
|
||||||
|
*/
|
||||||
|
public static final long MIN_PART_SIZE = 5242880L;
|
||||||
|
public static final long MAX_PART_SIZE = 5368709120L;
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.inject.ImplementedBy;
|
||||||
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
import org.jclouds.blobstore.options.PutOptions;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.functions.BlobToObject;
|
||||||
|
|
||||||
|
@ImplementedBy(ParallelMultipartUploadStrategy.class)
|
||||||
|
public interface AsyncMultipartUploadStrategy {
|
||||||
|
ListenableFuture<String> execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object);
|
||||||
|
}
|
|
@ -0,0 +1,152 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
/*
|
||||||
|
* MultipartUploadSlicingAlgorithm.java
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* Created by: tibor
|
||||||
|
*
|
||||||
|
* History
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||||
|
import org.jclouds.logging.Logger;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import javax.inject.Named;
|
||||||
|
|
||||||
|
public class MultipartUploadSlicingAlgorithm {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
|
protected Logger logger = Logger.NULL;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final long DEFAULT_PART_SIZE = 33554432; // 32MB
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int DEFAULT_MAGNITUDE_BASE = 100;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parts.size")
|
||||||
|
@VisibleForTesting
|
||||||
|
long defaultPartSize = DEFAULT_PART_SIZE;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parts.magnitude")
|
||||||
|
@VisibleForTesting
|
||||||
|
int magnitudeBase = DEFAULT_MAGNITUDE_BASE;
|
||||||
|
|
||||||
|
// calculated only once, but not from the constructor
|
||||||
|
private volatile int parts; // required number of parts with chunkSize
|
||||||
|
private volatile long chunkSize;
|
||||||
|
private volatile long remaining; // number of bytes remained for the last part
|
||||||
|
|
||||||
|
// sequentially updated values
|
||||||
|
private volatile int part;
|
||||||
|
private volatile long chunkOffset;
|
||||||
|
private volatile long copied;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected long calculateChunkSize(long length) {
|
||||||
|
long unitPartSize = defaultPartSize; // first try with default part size
|
||||||
|
int parts = (int)(length / unitPartSize);
|
||||||
|
long partSize = unitPartSize;
|
||||||
|
int magnitude = (int) (parts / magnitudeBase);
|
||||||
|
if (magnitude > 0) {
|
||||||
|
partSize = magnitude * unitPartSize;
|
||||||
|
if (partSize > MultipartUpload.MAX_PART_SIZE) {
|
||||||
|
partSize = MultipartUpload.MAX_PART_SIZE;
|
||||||
|
unitPartSize = MultipartUpload.MAX_PART_SIZE;
|
||||||
|
}
|
||||||
|
parts = (int)(length / partSize);
|
||||||
|
if (parts * partSize < length) {
|
||||||
|
partSize = (magnitude + 1) * unitPartSize;
|
||||||
|
if (partSize > MultipartUpload.MAX_PART_SIZE) {
|
||||||
|
partSize = MultipartUpload.MAX_PART_SIZE;
|
||||||
|
unitPartSize = MultipartUpload.MAX_PART_SIZE;
|
||||||
|
}
|
||||||
|
parts = (int)(length / partSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if splits in too many parts or
|
||||||
|
// cannot be split
|
||||||
|
unitPartSize = MultipartUpload.MIN_PART_SIZE; // take the minimum part size
|
||||||
|
parts = (int)(length / unitPartSize);
|
||||||
|
}
|
||||||
|
if (parts > MultipartUpload.MAX_NUMBER_OF_PARTS) { // if still splits in too many parts
|
||||||
|
parts = MultipartUpload.MAX_NUMBER_OF_PARTS - 1; // limit them. do not care about not
|
||||||
|
// covering
|
||||||
|
}
|
||||||
|
long remainder = length % unitPartSize;
|
||||||
|
if (remainder == 0 && parts > 0) {
|
||||||
|
parts -= 1;
|
||||||
|
}
|
||||||
|
this.chunkSize = partSize;
|
||||||
|
this.parts = parts;
|
||||||
|
this.remaining = length - partSize * parts;
|
||||||
|
logger.debug(" %d bytes partitioned in %d parts of part size: %d, remaining: %d%s", length, parts, chunkSize,
|
||||||
|
remaining, (remaining > MultipartUpload.MAX_PART_SIZE ? " overflow!" : ""));
|
||||||
|
return this.chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCopied() {
|
||||||
|
return copied;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCopied(long copied) {
|
||||||
|
this.copied = copied;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected int getParts() {
|
||||||
|
return parts;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getNextPart() {
|
||||||
|
return ++part;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addCopied(long copied) {
|
||||||
|
this.copied += copied;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getNextChunkOffset() {
|
||||||
|
long next = chunkOffset;
|
||||||
|
chunkOffset += getChunkSize();
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected long getChunkSize() {
|
||||||
|
return chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected long getRemaining() {
|
||||||
|
return remaining;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import com.google.inject.ImplementedBy;
|
||||||
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
import org.jclouds.blobstore.options.PutOptions;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.functions.BlobToObject;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.MultipartUpload;
|
||||||
|
|
||||||
|
@ImplementedBy(SequentialMultipartUploadStrategy.class)
|
||||||
|
public interface MultipartUploadStrategy extends MultipartUpload {
|
||||||
|
|
||||||
|
String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object);
|
||||||
|
}
|
|
@ -0,0 +1,268 @@
|
||||||
|
package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import org.jclouds.Constants;
|
||||||
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
||||||
|
import org.jclouds.blobstore.options.PutOptions;
|
||||||
|
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||||
|
import org.jclouds.concurrent.Futures;
|
||||||
|
import org.jclouds.io.Payload;
|
||||||
|
import org.jclouds.io.PayloadSlicer;
|
||||||
|
import org.jclouds.logging.Logger;
|
||||||
|
import org.jclouds.openstack.swift.CommonSwiftAsyncClient;
|
||||||
|
import org.jclouds.openstack.swift.CommonSwiftClient;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.SwiftAsyncBlobStore;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.functions.BlobToObject;
|
||||||
|
import org.jclouds.util.Throwables2;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import javax.inject.Named;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.SortedMap;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
public class ParallelMultipartUploadStrategy implements AsyncMultipartUploadStrategy {
|
||||||
|
@Resource
|
||||||
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
|
protected Logger logger = Logger.NULL;
|
||||||
|
|
||||||
|
public static final String PART_SEPARATOR = "/";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int DEFAULT_PARALLEL_DEGREE = 4;
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int DEFAULT_MIN_RETRIES = 5;
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int DEFAULT_MAX_PERCENT_RETRIES = 10;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parallel.degree")
|
||||||
|
@VisibleForTesting
|
||||||
|
int parallelDegree = DEFAULT_PARALLEL_DEGREE;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parallel.retries.min")
|
||||||
|
@VisibleForTesting
|
||||||
|
int minRetries = DEFAULT_MIN_RETRIES;
|
||||||
|
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named("jclouds.mpu.parallel.retries.maxpercent")
|
||||||
|
@VisibleForTesting
|
||||||
|
int maxPercentRetries = DEFAULT_MAX_PERCENT_RETRIES;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* maximum duration of an blob Request
|
||||||
|
*/
|
||||||
|
@Inject(optional = true)
|
||||||
|
@Named(Constants.PROPERTY_REQUEST_TIMEOUT)
|
||||||
|
protected Long maxTime;
|
||||||
|
|
||||||
|
private final ExecutorService ioWorkerExecutor;
|
||||||
|
|
||||||
|
protected final SwiftAsyncBlobStore ablobstore;
|
||||||
|
protected final PayloadSlicer slicer;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public ParallelMultipartUploadStrategy(SwiftAsyncBlobStore ablobstore, PayloadSlicer slicer,
|
||||||
|
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor) {
|
||||||
|
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
|
||||||
|
this.slicer = checkNotNull(slicer, "slicer");
|
||||||
|
this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void prepareUploadPart(final String container, final Blob blob, final String key,
|
||||||
|
final Integer part, final Payload payload,
|
||||||
|
final long offset, final long size, final SortedMap<Integer, String> etags,
|
||||||
|
final BlockingQueue<Integer> activeParts,
|
||||||
|
final Map<Integer, ListenableFuture<String>> futureParts,
|
||||||
|
final AtomicInteger errors, final int maxRetries, final Map<Integer, Exception> errorMap,
|
||||||
|
final Queue<Part> toRetry, final CountDownLatch latch,
|
||||||
|
BlobToObject blob2Object) {
|
||||||
|
if (errors.get() > maxRetries) {
|
||||||
|
activeParts.remove(part); // remove part from the bounded-queue without blocking
|
||||||
|
latch.countDown();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final CommonSwiftAsyncClient client = (CommonSwiftAsyncClient) ablobstore.getContext()
|
||||||
|
.getProviderSpecificContext().getAsyncApi();
|
||||||
|
Payload chunkedPart = slicer.slice(payload, offset, size);
|
||||||
|
logger.debug(String.format("async uploading part %s of %s to container %s", part, key, container));
|
||||||
|
final long start = System.currentTimeMillis();
|
||||||
|
String blobPartName = blob.getMetadata().getName() + PART_SEPARATOR +
|
||||||
|
String.valueOf(part);
|
||||||
|
|
||||||
|
Blob blobPart = ablobstore.blobBuilder(blobPartName).payload(chunkedPart).
|
||||||
|
contentDisposition(blobPartName).build();
|
||||||
|
final ListenableFuture<String> futureETag = client.putObject(container, blob2Object.apply(blobPart));
|
||||||
|
futureETag.addListener(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
etags.put(part, futureETag.get());
|
||||||
|
logger.debug(String.format("async uploaded part %s of %s to container %s in %sms",
|
||||||
|
part, key, container, (System.currentTimeMillis() - start)));
|
||||||
|
} catch (CancellationException e) {
|
||||||
|
errorMap.put(part, e);
|
||||||
|
String message = String.format("%s while uploading part %s - [%s,%s] to container %s with running since %dms",
|
||||||
|
e.getMessage(), part, offset, size, container, (System.currentTimeMillis() - start));
|
||||||
|
logger.debug(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
errorMap.put(part, e);
|
||||||
|
String message = String.format("%s while uploading part %s - [%s,%s] to container %s running since %dms",
|
||||||
|
e.getMessage(), part, offset, size, container, (System.currentTimeMillis() - start));
|
||||||
|
logger.error(message, e);
|
||||||
|
if (errors.incrementAndGet() <= maxRetries)
|
||||||
|
toRetry.add(new Part(part, offset, size));
|
||||||
|
} finally {
|
||||||
|
activeParts.remove(part); // remove part from the bounded-queue without blocking
|
||||||
|
futureParts.remove(part);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, ioWorkerExecutor);
|
||||||
|
futureParts.put(part, futureETag);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListenableFuture<String> execute(final String container, final Blob blob, final PutOptions options, final BlobToObject blob2Object) {
|
||||||
|
return Futures.makeListenable(
|
||||||
|
ioWorkerExecutor.submit(new Callable<String>() {
|
||||||
|
@Override
|
||||||
|
public String call() throws Exception {
|
||||||
|
String key = blob.getMetadata().getName();
|
||||||
|
Payload payload = blob.getPayload();
|
||||||
|
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
|
||||||
|
algorithm.calculateChunkSize(payload.getContentMetadata()
|
||||||
|
.getContentLength());
|
||||||
|
int parts = algorithm.getParts();
|
||||||
|
long chunkSize = algorithm.getChunkSize();
|
||||||
|
long remaining = algorithm.getRemaining();
|
||||||
|
if (parts > 0) {
|
||||||
|
CommonSwiftClient client = (CommonSwiftClient) ablobstore
|
||||||
|
.getContext().getProviderSpecificContext().getApi();
|
||||||
|
final Map<Integer, ListenableFuture<String>> futureParts =
|
||||||
|
new ConcurrentHashMap<Integer, ListenableFuture<String>>();
|
||||||
|
final Map<Integer, Exception> errorMap = Maps.newHashMap();
|
||||||
|
AtomicInteger errors = new AtomicInteger(0);
|
||||||
|
int maxRetries = Math.max(minRetries, parts * maxPercentRetries / 100);
|
||||||
|
int effectiveParts = remaining > 0 ? parts + 1 : parts;
|
||||||
|
try {
|
||||||
|
logger.debug(String.format("initiated multipart upload of %s to container %s" +
|
||||||
|
" consisting from %s part (possible max. retries: %d)",
|
||||||
|
key, container, effectiveParts, maxRetries));
|
||||||
|
// we need a bounded-blocking queue to control the amount of parallel jobs
|
||||||
|
ArrayBlockingQueue<Integer> activeParts = new ArrayBlockingQueue<Integer>(parallelDegree);
|
||||||
|
Queue<Part> toRetry = new ConcurrentLinkedQueue<Part>();
|
||||||
|
SortedMap<Integer, String> etags = new ConcurrentSkipListMap<Integer, String>();
|
||||||
|
CountDownLatch latch = new CountDownLatch(effectiveParts);
|
||||||
|
int part;
|
||||||
|
while ((part = algorithm.getNextPart()) <= parts) {
|
||||||
|
Integer partKey = new Integer(part);
|
||||||
|
activeParts.put(partKey);
|
||||||
|
|
||||||
|
prepareUploadPart(container, blob, key, partKey, payload,
|
||||||
|
algorithm.getNextChunkOffset(), chunkSize, etags,
|
||||||
|
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch,
|
||||||
|
blob2Object);
|
||||||
|
}
|
||||||
|
if (remaining > 0) {
|
||||||
|
Integer partKey = new Integer(part);
|
||||||
|
activeParts.put(partKey);
|
||||||
|
prepareUploadPart(container, blob, key, partKey, payload,
|
||||||
|
algorithm.getNextChunkOffset(), remaining, etags,
|
||||||
|
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch,
|
||||||
|
blob2Object);
|
||||||
|
}
|
||||||
|
latch.await();
|
||||||
|
// handling retries
|
||||||
|
while (errors.get() <= maxRetries && toRetry.size() > 0) {
|
||||||
|
int atOnce = Math.min(Math.min(toRetry.size(), errors.get()), parallelDegree);
|
||||||
|
CountDownLatch retryLatch = new CountDownLatch(atOnce);
|
||||||
|
for (int i = 0; i < atOnce; i++) {
|
||||||
|
Part failedPart = toRetry.poll();
|
||||||
|
Integer partKey = new Integer(failedPart.getPart());
|
||||||
|
activeParts.put(partKey);
|
||||||
|
prepareUploadPart(container, blob, key, partKey, payload,
|
||||||
|
failedPart.getOffset(), failedPart.getSize(), etags,
|
||||||
|
activeParts, futureParts, errors, maxRetries, errorMap, toRetry, retryLatch,
|
||||||
|
blob2Object);
|
||||||
|
}
|
||||||
|
retryLatch.await();
|
||||||
|
}
|
||||||
|
if (errors.get() > maxRetries) {
|
||||||
|
throw new BlobRuntimeException(String.format(
|
||||||
|
"Too many failed parts: %s while multipart upload of %s to container %s",
|
||||||
|
errors.get(), key, container));
|
||||||
|
}
|
||||||
|
|
||||||
|
String eTag = client.putObjectManifest(container, key);
|
||||||
|
logger.debug(String.format("multipart upload of %s to container %s" +
|
||||||
|
" succeffully finished with %s retries", key, container, errors.get()));
|
||||||
|
return eTag;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class);
|
||||||
|
if (rtex == null) {
|
||||||
|
rtex = new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
for (Map.Entry<Integer, ListenableFuture<String>> entry : futureParts.entrySet()) {
|
||||||
|
entry.getValue().cancel(false);
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
if (uploadId != null) {
|
||||||
|
client.abortMultipartUpload(container, key, uploadId);
|
||||||
|
} */
|
||||||
|
throw rtex;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ListenableFuture<String> futureETag = ablobstore.putBlob(container, blob, options);
|
||||||
|
return maxTime != null ?
|
||||||
|
futureETag.get(maxTime, TimeUnit.SECONDS) : futureETag.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}), ioWorkerExecutor);
|
||||||
|
}
|
||||||
|
|
||||||
|
class Part {
|
||||||
|
private int part;
|
||||||
|
private long offset;
|
||||||
|
private long size;
|
||||||
|
|
||||||
|
Part(int part, long offset, long size) {
|
||||||
|
this.part = part;
|
||||||
|
this.offset = offset;
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPart() {
|
||||||
|
return part;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPart(int part) {
|
||||||
|
this.part = part;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOffset() {
|
||||||
|
return offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOffset(long offset) {
|
||||||
|
this.offset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSize() {
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSize(long size) {
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
package org.jclouds.openstack.swift.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import javax.inject.Named;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import org.jclouds.blobstore.KeyNotFoundException;
|
||||||
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
import org.jclouds.blobstore.options.PutOptions;
|
||||||
|
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||||
|
import org.jclouds.io.Payload;
|
||||||
|
import org.jclouds.io.PayloadSlicer;
|
||||||
|
import org.jclouds.logging.Logger;
|
||||||
|
import org.jclouds.openstack.swift.CommonSwiftClient;
|
||||||
|
import org.jclouds.openstack.swift.SwiftClient;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.SwiftBlobStore;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.functions.BlobToObject;
|
||||||
|
import org.jclouds.openstack.swift.domain.SwiftObject;
|
||||||
|
import org.jclouds.util.Throwables2;
|
||||||
|
|
||||||
|
import java.util.SortedMap;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
|
||||||
|
public class SequentialMultipartUploadStrategy implements MultipartUploadStrategy {
|
||||||
|
public static final String PART_SEPARATOR = "/";
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
|
protected Logger logger = Logger.NULL;
|
||||||
|
|
||||||
|
protected final SwiftBlobStore ablobstore;
|
||||||
|
protected final PayloadSlicer slicer;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public SequentialMultipartUploadStrategy(SwiftBlobStore ablobstore, PayloadSlicer slicer) {
|
||||||
|
this.ablobstore = checkNotNull(ablobstore, "ablobstore");
|
||||||
|
this.slicer = checkNotNull(slicer, "slicer");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String execute(String container, Blob blob, PutOptions options, BlobToObject blob2Object) {
|
||||||
|
String key = blob.getMetadata().getName();
|
||||||
|
Payload payload = blob.getPayload();
|
||||||
|
MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
|
||||||
|
algorithm
|
||||||
|
.calculateChunkSize(checkNotNull(
|
||||||
|
payload.getContentMetadata().getContentLength(),
|
||||||
|
"contentLength required on all uploads to swift; please invoke payload.getContentMetadata().setContentLength(length) first"));
|
||||||
|
int parts = algorithm.getParts();
|
||||||
|
long chunkSize = algorithm.getChunkSize();
|
||||||
|
if (parts > 0) {
|
||||||
|
CommonSwiftClient client = (CommonSwiftClient) ablobstore.getContext()
|
||||||
|
.getProviderSpecificContext().getApi();
|
||||||
|
|
||||||
|
try {
|
||||||
|
int part;
|
||||||
|
while ((part = algorithm.getNextPart()) <= parts) {
|
||||||
|
Payload chunkedPart = slicer.slice(payload,
|
||||||
|
algorithm.getNextChunkOffset(), chunkSize);
|
||||||
|
Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR +
|
||||||
|
String.valueOf(part)).payload(chunkedPart).contentDisposition(
|
||||||
|
blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build();
|
||||||
|
client.putObject(container, blob2Object.apply(blobPart));
|
||||||
|
}
|
||||||
|
long remaining = algorithm.getRemaining();
|
||||||
|
if (remaining > 0) {
|
||||||
|
Payload chunkedPart = slicer.slice(payload,
|
||||||
|
algorithm.getNextChunkOffset(), remaining);
|
||||||
|
Blob blobPart = ablobstore.blobBuilder(blob.getMetadata().getName() + PART_SEPARATOR +
|
||||||
|
String.valueOf(part)).payload(chunkedPart).contentDisposition(
|
||||||
|
blob.getMetadata().getName() + PART_SEPARATOR + String.valueOf(part)).build();
|
||||||
|
client.putObject(container, blob2Object.apply(blobPart));
|
||||||
|
}
|
||||||
|
return client.putObjectManifest(container, key);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
RuntimeException rtex = Throwables2.getFirstThrowableOfType(ex, RuntimeException.class);
|
||||||
|
if (rtex == null) {
|
||||||
|
rtex = new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
throw rtex;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return ablobstore.putBlob(container, blob, PutOptions.NONE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,11 +18,20 @@
|
||||||
*/
|
*/
|
||||||
package org.jclouds.openstack.swift.blobstore.integration;
|
package org.jclouds.openstack.swift.blobstore.integration;
|
||||||
|
|
||||||
|
import com.google.common.io.ByteStreams;
|
||||||
|
import com.google.common.io.InputSupplier;
|
||||||
|
import org.jclouds.blobstore.BlobStore;
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest;
|
import org.jclouds.blobstore.integration.internal.BaseBlobIntegrationTest;
|
||||||
|
import org.jclouds.blobstore.options.PutOptions;
|
||||||
|
import org.jclouds.crypto.CryptoStreams;
|
||||||
|
import org.testng.ITestContext;
|
||||||
|
import org.testng.annotations.BeforeClass;
|
||||||
import org.testng.annotations.DataProvider;
|
import org.testng.annotations.DataProvider;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @author James Murty
|
* @author James Murty
|
||||||
|
@ -30,6 +39,9 @@ import org.testng.annotations.Test;
|
||||||
*/
|
*/
|
||||||
@Test(groups = "live")
|
@Test(groups = "live")
|
||||||
public class SwiftBlobIntegrationLiveTest extends BaseBlobIntegrationTest {
|
public class SwiftBlobIntegrationLiveTest extends BaseBlobIntegrationTest {
|
||||||
|
private InputSupplier<InputStream> oneHundredOneConstitutions;
|
||||||
|
private byte[] oneHundredOneConstitutionsMD5;
|
||||||
|
|
||||||
public SwiftBlobIntegrationLiveTest() {
|
public SwiftBlobIntegrationLiveTest() {
|
||||||
provider = "swift";
|
provider = "swift";
|
||||||
}
|
}
|
||||||
|
@ -39,6 +51,13 @@ public class SwiftBlobIntegrationLiveTest extends BaseBlobIntegrationTest {
|
||||||
// not supported in swift
|
// not supported in swift
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@BeforeClass(groups = { "integration", "live" }, dependsOnMethods = "setupContext")
|
||||||
|
@Override
|
||||||
|
public void setUpResourcesOnThisThread(ITestContext testContext) throws Exception {
|
||||||
|
super.setUpResourcesOnThisThread(testContext);
|
||||||
|
oneHundredOneConstitutions = getTestDataSupplier();
|
||||||
|
oneHundredOneConstitutionsMD5 = CryptoStreams.md5(oneHundredOneConstitutions);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void checkContentDisposition(Blob blob, String contentDisposition) {
|
protected void checkContentDisposition(Blob blob, String contentDisposition) {
|
||||||
|
@ -61,4 +80,22 @@ public class SwiftBlobIntegrationLiveTest extends BaseBlobIntegrationTest {
|
||||||
return new Object[][] { { "normal" }, { "sp ace" }, { "qu?stion" }, { "unic₪de" }, { "path/foo" }, { "colon:" },
|
return new Object[][] { { "normal" }, { "sp ace" }, { "qu?stion" }, { "unic₪de" }, { "path/foo" }, { "colon:" },
|
||||||
{ "asteri*k" }, { "{great<r}" }, { "lesst>en" }, { "p|pe" } };
|
{ "asteri*k" }, { "{great<r}" }, { "lesst>en" }, { "p|pe" } };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMultipartChunkedFileStream() throws IOException, InterruptedException {
|
||||||
|
FileOutputStream fous = new FileOutputStream(new File("target/const.txt"));
|
||||||
|
ByteStreams.copy(oneHundredOneConstitutions.getInput(), fous);
|
||||||
|
fous.flush();
|
||||||
|
fous.close();
|
||||||
|
String containerName = getContainerName();
|
||||||
|
|
||||||
|
try {
|
||||||
|
BlobStore blobStore = context.getBlobStore();
|
||||||
|
blobStore.createContainerInLocation(null, containerName);
|
||||||
|
Blob blob = blobStore.blobBuilder("const.txt")
|
||||||
|
.payload(new File("target/const.txt")).build();
|
||||||
|
blobStore.putBlob(containerName, blob, PutOptions.Builder.multipart());
|
||||||
|
} finally {
|
||||||
|
returnContainer(containerName);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
import javax.inject.Singleton;
|
import javax.inject.Singleton;
|
||||||
|
import javax.ws.rs.PathParam;
|
||||||
|
|
||||||
import org.jclouds.Constants;
|
import org.jclouds.Constants;
|
||||||
import org.jclouds.blobstore.TransientAsyncBlobStore;
|
import org.jclouds.blobstore.TransientAsyncBlobStore;
|
||||||
|
@ -167,7 +168,12 @@ public class StubSwiftAsyncClient implements CommonSwiftAsyncClient {
|
||||||
return blobStore.removeBlob(container, key);
|
return blobStore.removeBlob(container, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ListenableFuture<Boolean> setObjectInfo(String container, String key, Map<String, String> userMetadata) {
|
@Override
|
||||||
|
public ListenableFuture<String> putObjectManifest(String container, String name) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ListenableFuture<Boolean> setObjectInfo(String container, String key, Map<String, String> userMetadata) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,7 @@ public interface AWSS3Client extends S3Client {
|
||||||
*/
|
*/
|
||||||
String initiateMultipartUpload(String bucketName, ObjectMetadata objectMetadata, PutObjectOptions... options);
|
String initiateMultipartUpload(String bucketName, ObjectMetadata objectMetadata, PutObjectOptions... options);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This operation aborts a multipart upload. After a multipart upload is aborted, no additional
|
* This operation aborts a multipart upload. After a multipart upload is aborted, no additional
|
||||||
* parts can be uploaded using that upload ID. The storage consumed by any previously uploaded
|
* parts can be uploaded using that upload ID. The storage consumed by any previously uploaded
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.AsyncMultipartUploadStrategy;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
@ -66,10 +67,11 @@ public class HPCloudObjectStorageAsyncBlobStore extends SwiftAsyncBlobStore {
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) {
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache,
|
||||||
|
Provider<AsyncMultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd,
|
super(context, blobUtils, service, defaultLocation, locations, sync, async, container2ResourceMd,
|
||||||
container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd,
|
container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd,
|
||||||
blob2ObjectGetOptions, fetchBlobMetadataProvider);
|
blob2ObjectGetOptions, fetchBlobMetadataProvider, multipartUploadStrategy);
|
||||||
this.enableCDNAndCache = enableCDNAndCache;
|
this.enableCDNAndCache = enableCDNAndCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
|
@ -59,10 +60,11 @@ public class HPCloudObjectStorageBlobStore extends SwiftBlobStore {
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache) {
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache,
|
||||||
|
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions,
|
super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions,
|
||||||
container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions,
|
container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions,
|
||||||
fetchBlobMetadataProvider);
|
fetchBlobMetadataProvider, multipartUploadStrategy);
|
||||||
this.enableCDNAndCache = enableCDNAndCache;
|
this.enableCDNAndCache = enableCDNAndCache;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue