mirror of https://github.com/apache/jclouds.git
JCLOUDS-40 Remove internal usage of AsyncBlobStore.
This commit is contained in:
parent
7047874ad9
commit
1bdf8a1588
|
@ -16,9 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.jclouds.cloudfiles.blobstore;
|
package org.jclouds.cloudfiles.blobstore;
|
||||||
|
|
||||||
|
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
import javax.inject.Named;
|
||||||
import javax.inject.Provider;
|
import javax.inject.Provider;
|
||||||
import javax.inject.Singleton;
|
import javax.inject.Singleton;
|
||||||
|
|
||||||
|
@ -38,9 +41,10 @@ import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceList;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ContainerToResourceMetadata;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlob;
|
||||||
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||||
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class CloudFilesBlobStore extends SwiftBlobStore {
|
public class CloudFilesBlobStore extends SwiftBlobStore {
|
||||||
|
@ -48,7 +52,8 @@ public class CloudFilesBlobStore extends SwiftBlobStore {
|
||||||
private EnableCDNAndCache enableCDNAndCache;
|
private EnableCDNAndCache enableCDNAndCache;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
protected CloudFilesBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
|
protected CloudFilesBlobStore(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||||
|
BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
|
||||||
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync,
|
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync,
|
||||||
ContainerToResourceMetadata container2ResourceMd,
|
ContainerToResourceMetadata container2ResourceMd,
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
|
@ -56,9 +61,9 @@ public class CloudFilesBlobStore extends SwiftBlobStore {
|
||||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache,
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache,
|
||||||
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions,
|
super(userExecutor, context, blobUtils, defaultLocation, locations, sync, container2ResourceMd,
|
||||||
container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions,
|
container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd,
|
||||||
fetchBlobMetadataProvider, multipartUploadStrategy);
|
blob2ObjectGetOptions, fetchBlobMetadataProvider, multipartUploadStrategy);
|
||||||
this.enableCDNAndCache = enableCDNAndCache;
|
this.enableCDNAndCache = enableCDNAndCache;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,22 +18,23 @@ package org.jclouds.openstack.swift.blobstore;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static org.jclouds.blobstore.util.BlobStoreUtils.createParentIfNeededAsync;
|
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||||
import static org.jclouds.openstack.swift.options.ListContainerOptions.Builder.withPrefix;
|
import static org.jclouds.openstack.swift.options.ListContainerOptions.Builder.withPrefix;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
import javax.inject.Named;
|
||||||
import javax.inject.Provider;
|
import javax.inject.Provider;
|
||||||
import javax.inject.Singleton;
|
import javax.inject.Singleton;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.jclouds.blobstore.BlobStoreContext;
|
import org.jclouds.blobstore.BlobStoreContext;
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
import org.jclouds.blobstore.domain.BlobMetadata;
|
import org.jclouds.blobstore.domain.BlobMetadata;
|
||||||
import org.jclouds.blobstore.domain.PageSet;
|
import org.jclouds.blobstore.domain.PageSet;
|
||||||
import org.jclouds.blobstore.domain.StorageMetadata;
|
import org.jclouds.blobstore.domain.StorageMetadata;
|
||||||
import org.jclouds.blobstore.domain.internal.PageSetImpl;
|
import org.jclouds.blobstore.domain.internal.PageSetImpl;
|
||||||
|
import org.jclouds.blobstore.functions.BlobName;
|
||||||
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
|
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
|
||||||
import org.jclouds.blobstore.internal.BaseBlobStore;
|
import org.jclouds.blobstore.internal.BaseBlobStore;
|
||||||
import org.jclouds.blobstore.options.CreateContainerOptions;
|
import org.jclouds.blobstore.options.CreateContainerOptions;
|
||||||
|
@ -56,13 +57,16 @@ import org.jclouds.openstack.swift.domain.ContainerMetadata;
|
||||||
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
import org.jclouds.openstack.swift.domain.MutableObjectInfoWithMetadata;
|
||||||
import org.jclouds.openstack.swift.domain.ObjectInfo;
|
import org.jclouds.openstack.swift.domain.ObjectInfo;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class SwiftBlobStore extends BaseBlobStore {
|
public class SwiftBlobStore extends BaseBlobStore {
|
||||||
|
private final ListeningExecutorService userExecutor;
|
||||||
private final CommonSwiftClient sync;
|
private final CommonSwiftClient sync;
|
||||||
private final ContainerToResourceMetadata container2ResourceMd;
|
private final ContainerToResourceMetadata container2ResourceMd;
|
||||||
private final BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions;
|
private final BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions;
|
||||||
|
@ -75,7 +79,8 @@ public class SwiftBlobStore extends BaseBlobStore {
|
||||||
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
|
private final Provider<MultipartUploadStrategy> multipartUploadStrategy;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
protected SwiftBlobStore(BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
|
protected SwiftBlobStore(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||||
|
BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
|
||||||
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync,
|
@Memoized Supplier<Set<? extends Location>> locations, CommonSwiftClient sync,
|
||||||
ContainerToResourceMetadata container2ResourceMd,
|
ContainerToResourceMetadata container2ResourceMd,
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
|
@ -84,6 +89,7 @@ public class SwiftBlobStore extends BaseBlobStore {
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider,
|
||||||
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
super(context, blobUtils, defaultLocation, locations);
|
super(context, blobUtils, defaultLocation, locations);
|
||||||
|
this.userExecutor = userExecutor;
|
||||||
this.sync = sync;
|
this.sync = sync;
|
||||||
this.container2ResourceMd = container2ResourceMd;
|
this.container2ResourceMd = container2ResourceMd;
|
||||||
this.container2ContainerListOptions = container2ContainerListOptions;
|
this.container2ContainerListOptions = container2ContainerListOptions;
|
||||||
|
@ -196,10 +202,30 @@ public class SwiftBlobStore extends BaseBlobStore {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public String putBlob(String container, Blob blob) {
|
public String putBlob(String container, Blob blob) {
|
||||||
createParentIfNeededAsync(context.getAsyncBlobStore(), container, blob);
|
createParentIfNeededAsync(container, blob);
|
||||||
return sync.putObject(container, blob2Object.apply(blob));
|
return sync.putObject(container, blob2Object.apply(blob));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final BlobName blobName = new BlobName();
|
||||||
|
|
||||||
|
/** Legacy behavior which will not be carried forward in new blobstores. */
|
||||||
|
private void createParentIfNeededAsync(final String containerName, Blob blob) {
|
||||||
|
checkNotNull(containerName, "container");
|
||||||
|
checkNotNull(blob, "blob");
|
||||||
|
final String name = blobName.apply(blob);
|
||||||
|
if (name.indexOf('/') > 0) {
|
||||||
|
userExecutor.submit(new Runnable() {
|
||||||
|
@Override public void run() {
|
||||||
|
createDirectory(containerName, parseDirectoryFromPath(name));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String parseDirectoryFromPath(String path) {
|
||||||
|
return checkNotNull(path, "path").substring(0, path.lastIndexOf('/'));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This implementation invokes {@link CommonSwiftClient#putObject}
|
* This implementation invokes {@link CommonSwiftClient#putObject}
|
||||||
*
|
*
|
||||||
|
|
|
@ -19,11 +19,13 @@ package org.jclouds.blobstore.strategy.internal;
|
||||||
import static com.google.common.base.Preconditions.checkState;
|
import static com.google.common.base.Preconditions.checkState;
|
||||||
import static org.jclouds.concurrent.FutureIterables.transformParallel;
|
import static org.jclouds.concurrent.FutureIterables.transformParallel;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
|
||||||
import org.jclouds.Constants;
|
import org.jclouds.Constants;
|
||||||
import org.jclouds.blobstore.AsyncBlobStore;
|
import org.jclouds.blobstore.BlobStore;
|
||||||
import org.jclouds.blobstore.domain.BlobMetadata;
|
import org.jclouds.blobstore.domain.BlobMetadata;
|
||||||
import org.jclouds.blobstore.domain.PageSet;
|
import org.jclouds.blobstore.domain.PageSet;
|
||||||
import org.jclouds.blobstore.domain.StorageMetadata;
|
import org.jclouds.blobstore.domain.StorageMetadata;
|
||||||
|
@ -49,7 +51,7 @@ import com.google.inject.Inject;
|
||||||
public class FetchBlobMetadata implements Function<PageSet<? extends StorageMetadata>, PageSet<? extends StorageMetadata>> {
|
public class FetchBlobMetadata implements Function<PageSet<? extends StorageMetadata>, PageSet<? extends StorageMetadata>> {
|
||||||
|
|
||||||
protected final BackoffLimitedRetryHandler retryHandler;
|
protected final BackoffLimitedRetryHandler retryHandler;
|
||||||
protected final AsyncBlobStore ablobstore;
|
protected final BlobStore blobstore;
|
||||||
protected final ListeningExecutorService userExecutor;
|
protected final ListeningExecutorService userExecutor;
|
||||||
@Resource
|
@Resource
|
||||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
|
@ -64,10 +66,10 @@ public class FetchBlobMetadata implements Function<PageSet<? extends StorageMeta
|
||||||
protected Long maxTime;
|
protected Long maxTime;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
FetchBlobMetadata(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor, AsyncBlobStore ablobstore,
|
FetchBlobMetadata(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor, BlobStore blobstore,
|
||||||
BackoffLimitedRetryHandler retryHandler) {
|
BackoffLimitedRetryHandler retryHandler) {
|
||||||
this.userExecutor = userExecutor;
|
this.userExecutor = userExecutor;
|
||||||
this.ablobstore = ablobstore;
|
this.blobstore = blobstore;
|
||||||
this.retryHandler = retryHandler;
|
this.retryHandler = retryHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,8 +91,12 @@ public class FetchBlobMetadata implements Function<PageSet<? extends StorageMeta
|
||||||
}), new Function<StorageMetadata, ListenableFuture<? extends BlobMetadata>>() {
|
}), new Function<StorageMetadata, ListenableFuture<? extends BlobMetadata>>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<BlobMetadata> apply(StorageMetadata from) {
|
public ListenableFuture<BlobMetadata> apply(final StorageMetadata from) {
|
||||||
return ablobstore.blobMetadata(container, from.getName());
|
return userExecutor.submit(new Callable<BlobMetadata>() {
|
||||||
|
@Override public BlobMetadata call() throws Exception {
|
||||||
|
return blobstore.blobMetadata(container, from.getName());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
}, userExecutor, maxTime, logger, String.format("getting metadata from containerName: %s", container)));
|
}, userExecutor, maxTime, logger, String.format("getting metadata from containerName: %s", container)));
|
||||||
|
|
|
@ -18,12 +18,14 @@ package org.jclouds.blobstore.strategy.internal;
|
||||||
|
|
||||||
import static org.jclouds.concurrent.FutureIterables.transformParallel;
|
import static org.jclouds.concurrent.FutureIterables.transformParallel;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
import javax.inject.Singleton;
|
import javax.inject.Singleton;
|
||||||
|
|
||||||
import org.jclouds.Constants;
|
import org.jclouds.Constants;
|
||||||
import org.jclouds.blobstore.AsyncBlobStore;
|
import org.jclouds.blobstore.BlobStore;
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
import org.jclouds.blobstore.domain.BlobMetadata;
|
import org.jclouds.blobstore.domain.BlobMetadata;
|
||||||
import org.jclouds.blobstore.options.ListContainerOptions;
|
import org.jclouds.blobstore.options.ListContainerOptions;
|
||||||
|
@ -47,7 +49,7 @@ public class GetAllBlobsInListAndRetryOnFailure implements GetBlobsInListStrateg
|
||||||
|
|
||||||
protected final ListBlobsInContainer getAllBlobMetadata;
|
protected final ListBlobsInContainer getAllBlobMetadata;
|
||||||
protected final BackoffLimitedRetryHandler retryHandler;
|
protected final BackoffLimitedRetryHandler retryHandler;
|
||||||
protected final AsyncBlobStore ablobstore;
|
protected final BlobStore blobstore;
|
||||||
protected final ListeningExecutorService userExecutor;
|
protected final ListeningExecutorService userExecutor;
|
||||||
@Resource
|
@Resource
|
||||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
|
@ -61,9 +63,9 @@ public class GetAllBlobsInListAndRetryOnFailure implements GetBlobsInListStrateg
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
GetAllBlobsInListAndRetryOnFailure(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
GetAllBlobsInListAndRetryOnFailure(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||||
ListBlobsInContainer getAllBlobMetadata, AsyncBlobStore ablobstore, BackoffLimitedRetryHandler retryHandler) {
|
ListBlobsInContainer getAllBlobMetadata, BlobStore blobstore, BackoffLimitedRetryHandler retryHandler) {
|
||||||
this.userExecutor = userExecutor;
|
this.userExecutor = userExecutor;
|
||||||
this.ablobstore = ablobstore;
|
this.blobstore = blobstore;
|
||||||
this.getAllBlobMetadata = getAllBlobMetadata;
|
this.getAllBlobMetadata = getAllBlobMetadata;
|
||||||
this.retryHandler = retryHandler;
|
this.retryHandler = retryHandler;
|
||||||
}
|
}
|
||||||
|
@ -73,8 +75,12 @@ public class GetAllBlobsInListAndRetryOnFailure implements GetBlobsInListStrateg
|
||||||
return transformParallel(list, new Function<BlobMetadata, ListenableFuture<? extends Blob>>() {
|
return transformParallel(list, new Function<BlobMetadata, ListenableFuture<? extends Blob>>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Blob> apply(BlobMetadata from) {
|
public ListenableFuture<Blob> apply(final BlobMetadata from) {
|
||||||
return ablobstore.getBlob(container, from.getName());
|
return userExecutor.submit(new Callable<Blob>() {
|
||||||
|
@Override public Blob call() throws Exception {
|
||||||
|
return blobstore.getBlob(container, from.getName());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
}, userExecutor, maxTime, logger, String.format("getting from containerName: %s", container), retryHandler, 3);
|
}, userExecutor, maxTime, logger, String.format("getting from containerName: %s", container), retryHandler, 3);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
@ -28,7 +29,6 @@ import javax.inject.Named;
|
||||||
import javax.inject.Singleton;
|
import javax.inject.Singleton;
|
||||||
|
|
||||||
import org.jclouds.Constants;
|
import org.jclouds.Constants;
|
||||||
import org.jclouds.blobstore.AsyncBlobStore;
|
|
||||||
import org.jclouds.blobstore.BlobStore;
|
import org.jclouds.blobstore.BlobStore;
|
||||||
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
||||||
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||||
|
@ -63,7 +63,6 @@ import com.google.inject.Inject;
|
||||||
@Singleton
|
@Singleton
|
||||||
public class MarkersDeleteDirectoryStrategy implements DeleteDirectoryStrategy {
|
public class MarkersDeleteDirectoryStrategy implements DeleteDirectoryStrategy {
|
||||||
|
|
||||||
private final AsyncBlobStore ablobstore;
|
|
||||||
private final BlobStore blobstore;
|
private final BlobStore blobstore;
|
||||||
private final ListeningExecutorService userExecutor;
|
private final ListeningExecutorService userExecutor;
|
||||||
@Resource
|
@Resource
|
||||||
|
@ -77,23 +76,26 @@ public class MarkersDeleteDirectoryStrategy implements DeleteDirectoryStrategy {
|
||||||
protected Long maxTime;
|
protected Long maxTime;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
MarkersDeleteDirectoryStrategy(
|
MarkersDeleteDirectoryStrategy(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||||
@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
BlobStore blobstore) {
|
||||||
AsyncBlobStore ablobstore, BlobStore blobstore) {
|
|
||||||
this.userExecutor = userExecutor;
|
this.userExecutor = userExecutor;
|
||||||
this.ablobstore = ablobstore;
|
|
||||||
this.blobstore = blobstore;
|
this.blobstore = blobstore;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute(String containerName, String directory) {
|
public void execute(final String containerName, String directory) {
|
||||||
Set<String> names = Sets.newHashSet();
|
Set<String> names = Sets.newHashSet();
|
||||||
names.add(directory);
|
names.add(directory);
|
||||||
for (String suffix : BlobStoreConstants.DIRECTORY_SUFFIXES) {
|
for (String suffix : BlobStoreConstants.DIRECTORY_SUFFIXES) {
|
||||||
names.add(directory + suffix);
|
names.add(directory + suffix);
|
||||||
}
|
}
|
||||||
Map<String, ListenableFuture<?>> responses = Maps.newHashMap();
|
Map<String, ListenableFuture<?>> responses = Maps.newHashMap();
|
||||||
for (String name : names) {
|
for (final String name : names) {
|
||||||
responses.put(name, ablobstore.removeBlob(containerName, name));
|
responses.put(name, userExecutor.submit(new Callable<Void>() {
|
||||||
|
@Override public Void call() throws Exception {
|
||||||
|
blobstore.removeBlob(containerName, name);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
String message = String.format("deleting directory %s in containerName: %s", directory,
|
String message = String.format("deleting directory %s in containerName: %s", directory,
|
||||||
containerName);
|
containerName);
|
||||||
|
|
|
@ -20,6 +20,7 @@ import static com.google.common.base.Throwables.propagate;
|
||||||
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
@ -27,7 +28,7 @@ import javax.inject.Named;
|
||||||
import javax.inject.Singleton;
|
import javax.inject.Singleton;
|
||||||
|
|
||||||
import org.jclouds.Constants;
|
import org.jclouds.Constants;
|
||||||
import org.jclouds.blobstore.AsyncBlobStore;
|
import org.jclouds.blobstore.BlobStore;
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
import org.jclouds.blobstore.internal.BlobRuntimeException;
|
||||||
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
import org.jclouds.blobstore.reference.BlobStoreConstants;
|
||||||
|
@ -42,7 +43,7 @@ import com.google.inject.Inject;
|
||||||
@Singleton
|
@Singleton
|
||||||
public class PutBlobsStrategyImpl implements PutBlobsStrategy {
|
public class PutBlobsStrategyImpl implements PutBlobsStrategy {
|
||||||
|
|
||||||
private final AsyncBlobStore ablobstore;
|
private final BlobStore blobstore;
|
||||||
private final ListeningExecutorService userExecutor;
|
private final ListeningExecutorService userExecutor;
|
||||||
@Resource
|
@Resource
|
||||||
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
@Named(BlobStoreConstants.BLOBSTORE_LOGGER)
|
||||||
|
@ -56,16 +57,20 @@ public class PutBlobsStrategyImpl implements PutBlobsStrategy {
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
PutBlobsStrategyImpl(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
PutBlobsStrategyImpl(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||||
AsyncBlobStore ablobstore) {
|
BlobStore blobstore) {
|
||||||
this.userExecutor = userExecutor;
|
this.userExecutor = userExecutor;
|
||||||
this.ablobstore = ablobstore;
|
this.blobstore = blobstore;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(String containerName, Iterable<? extends Blob> blobs) {
|
public void execute(final String containerName, Iterable<? extends Blob> blobs) {
|
||||||
Map<Blob, ListenableFuture<?>> responses = Maps.newLinkedHashMap();
|
Map<Blob, ListenableFuture<?>> responses = Maps.newLinkedHashMap();
|
||||||
for (Blob blob : blobs) {
|
for (final Blob blob : blobs) {
|
||||||
responses.put(blob, ablobstore.putBlob(containerName, blob));
|
responses.put(blob, userExecutor.submit(new Callable<Object>() {
|
||||||
|
@Override public Object call() throws Exception {
|
||||||
|
return blobstore.putBlob(containerName, blob);
|
||||||
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
Map<Blob, Exception> exceptions;
|
Map<Blob, Exception> exceptions;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -23,18 +23,13 @@ import java.util.Map;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.jclouds.blobstore.AsyncBlobStore;
|
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
|
||||||
import org.jclouds.blobstore.domain.MutableBlobMetadata;
|
import org.jclouds.blobstore.domain.MutableBlobMetadata;
|
||||||
import org.jclouds.blobstore.domain.internal.MutableBlobMetadataImpl;
|
import org.jclouds.blobstore.domain.internal.MutableBlobMetadataImpl;
|
||||||
import org.jclouds.blobstore.functions.BlobName;
|
|
||||||
import org.jclouds.http.HttpRequest;
|
import org.jclouds.http.HttpRequest;
|
||||||
import org.jclouds.http.HttpRequestFilter;
|
import org.jclouds.http.HttpRequestFilter;
|
||||||
import org.jclouds.rest.internal.GeneratedHttpRequest;
|
import org.jclouds.rest.internal.GeneratedHttpRequest;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.util.concurrent.Futures;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
|
||||||
|
|
||||||
public class BlobStoreUtils {
|
public class BlobStoreUtils {
|
||||||
public static <T> HttpRequest cleanRequest(HttpRequest returnVal) {
|
public static <T> HttpRequest cleanRequest(HttpRequest returnVal) {
|
||||||
|
@ -45,10 +40,6 @@ public class BlobStoreUtils {
|
||||||
.headers(returnVal.getHeaders()).payload(returnVal.getPayload()).build();
|
.headers(returnVal.getHeaders()).payload(returnVal.getPayload()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String parseDirectoryFromPath(String path) {
|
|
||||||
return checkNotNull(path, "path").substring(0, path.lastIndexOf('/'));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Pattern keyFromContainer = Pattern.compile("/?[^/]+/(.*)");
|
private static Pattern keyFromContainer = Pattern.compile("/?[^/]+/(.*)");
|
||||||
|
|
||||||
public static String getNameFor(GeneratedHttpRequest request) {
|
public static String getNameFor(GeneratedHttpRequest request) {
|
||||||
|
@ -71,21 +62,6 @@ public class BlobStoreUtils {
|
||||||
return objectKey;
|
return objectKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final BlobName blobName = new BlobName();
|
|
||||||
|
|
||||||
public static ListenableFuture<Void> createParentIfNeededAsync(AsyncBlobStore asyncBlobStore, String container,
|
|
||||||
Blob blob) {
|
|
||||||
checkNotNull(asyncBlobStore, "asyncBlobStore");
|
|
||||||
checkNotNull(container, "container");
|
|
||||||
|
|
||||||
String name = blobName.apply(blob);
|
|
||||||
if (name.indexOf('/') > 0) {
|
|
||||||
return asyncBlobStore.createDirectory(container, parseDirectoryFromPath(name));
|
|
||||||
} else {
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static MutableBlobMetadata copy(MutableBlobMetadata in) {
|
public static MutableBlobMetadata copy(MutableBlobMetadata in) {
|
||||||
MutableBlobMetadata metadata = new MutableBlobMetadataImpl(in);
|
MutableBlobMetadata metadata = new MutableBlobMetadataImpl(in);
|
||||||
convertUserMetadataKeysToLowercase(metadata);
|
convertUserMetadataKeysToLowercase(metadata);
|
||||||
|
|
|
@ -16,11 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.jclouds.blobstore.util;
|
package org.jclouds.blobstore.util;
|
||||||
|
|
||||||
import static org.easymock.EasyMock.createMock;
|
|
||||||
import static org.easymock.EasyMock.expect;
|
|
||||||
import static org.easymock.EasyMock.replay;
|
|
||||||
import static org.easymock.EasyMock.verify;
|
|
||||||
import static org.jclouds.blobstore.util.BlobStoreUtils.createParentIfNeededAsync;
|
|
||||||
import static org.jclouds.blobstore.util.BlobStoreUtils.getNameFor;
|
import static org.jclouds.blobstore.util.BlobStoreUtils.getNameFor;
|
||||||
import static org.jclouds.reflect.Reflection2.method;
|
import static org.jclouds.reflect.Reflection2.method;
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
|
@ -28,83 +23,16 @@ import static org.testng.Assert.assertEquals;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.jclouds.blobstore.AsyncBlobStore;
|
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
|
||||||
import org.jclouds.blobstore.domain.MutableBlobMetadata;
|
|
||||||
import org.jclouds.reflect.Invocation;
|
import org.jclouds.reflect.Invocation;
|
||||||
import org.jclouds.rest.internal.GeneratedHttpRequest;
|
import org.jclouds.rest.internal.GeneratedHttpRequest;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
/**
|
|
||||||
* Tests behavior of {@code BlobStoreUtils}
|
|
||||||
*/
|
|
||||||
@Test(groups = "unit")
|
@Test(groups = "unit")
|
||||||
public class BlobStoreUtilsTest {
|
public class BlobStoreUtilsTest {
|
||||||
|
|
||||||
public void testCreateParentIfNeededAsyncNoPath() {
|
|
||||||
AsyncBlobStore asyncBlobStore = createMock(AsyncBlobStore.class);
|
|
||||||
String container = "container";
|
|
||||||
Blob blob = createMock(Blob.class);
|
|
||||||
MutableBlobMetadata md = createMock(MutableBlobMetadata.class);
|
|
||||||
|
|
||||||
expect(blob.getMetadata()).andReturn(md).atLeastOnce();
|
|
||||||
expect(md.getName()).andReturn("hello").atLeastOnce();
|
|
||||||
|
|
||||||
replay(asyncBlobStore);
|
|
||||||
replay(blob);
|
|
||||||
replay(md);
|
|
||||||
|
|
||||||
createParentIfNeededAsync(asyncBlobStore, container, blob);
|
|
||||||
|
|
||||||
verify(asyncBlobStore);
|
|
||||||
verify(blob);
|
|
||||||
verify(md);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testCreateParentIfNeededAsyncSinglePath() {
|
|
||||||
AsyncBlobStore asyncBlobStore = createMock(AsyncBlobStore.class);
|
|
||||||
String container = "container";
|
|
||||||
Blob blob = createMock(Blob.class);
|
|
||||||
MutableBlobMetadata md = createMock(MutableBlobMetadata.class);
|
|
||||||
|
|
||||||
expect(blob.getMetadata()).andReturn(md).atLeastOnce();
|
|
||||||
expect(md.getName()).andReturn("rootpath/hello").atLeastOnce();
|
|
||||||
expect(asyncBlobStore.createDirectory("container", "rootpath")).andReturn(null);
|
|
||||||
|
|
||||||
replay(asyncBlobStore);
|
|
||||||
replay(blob);
|
|
||||||
replay(md);
|
|
||||||
|
|
||||||
createParentIfNeededAsync(asyncBlobStore, container, blob);
|
|
||||||
|
|
||||||
verify(asyncBlobStore);
|
|
||||||
verify(blob);
|
|
||||||
verify(md);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testCreateParentIfNeededAsyncNestedPath() {
|
|
||||||
AsyncBlobStore asyncBlobStore = createMock(AsyncBlobStore.class);
|
|
||||||
String container = "container";
|
|
||||||
Blob blob = createMock(Blob.class);
|
|
||||||
MutableBlobMetadata md = createMock(MutableBlobMetadata.class);
|
|
||||||
|
|
||||||
expect(blob.getMetadata()).andReturn(md).atLeastOnce();
|
|
||||||
expect(md.getName()).andReturn("rootpath/subpath/hello").atLeastOnce();
|
|
||||||
expect(asyncBlobStore.createDirectory("container", "rootpath/subpath")).andReturn(null);
|
|
||||||
|
|
||||||
replay(asyncBlobStore);
|
|
||||||
replay(blob);
|
|
||||||
replay(md);
|
|
||||||
|
|
||||||
createParentIfNeededAsync(asyncBlobStore, container, blob);
|
|
||||||
|
|
||||||
verify(asyncBlobStore);
|
|
||||||
verify(blob);
|
|
||||||
verify(md);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testGetKeyForAzureS3AndRackspace() {
|
public void testGetKeyForAzureS3AndRackspace() {
|
||||||
GeneratedHttpRequest request = requestForEndpointAndArgs(
|
GeneratedHttpRequest request = requestForEndpointAndArgs(
|
||||||
"https://jclouds.blob.core.windows.net/adriancole-blobstore0/five",
|
"https://jclouds.blob.core.windows.net/adriancole-blobstore0/five",
|
||||||
|
|
|
@ -33,7 +33,6 @@ import java.util.UUID;
|
||||||
import org.jclouds.aws.AWSResponseException;
|
import org.jclouds.aws.AWSResponseException;
|
||||||
import org.jclouds.aws.domain.Region;
|
import org.jclouds.aws.domain.Region;
|
||||||
import org.jclouds.aws.s3.domain.DeleteResult;
|
import org.jclouds.aws.s3.domain.DeleteResult;
|
||||||
import org.jclouds.blobstore.AsyncBlobStore;
|
|
||||||
import org.jclouds.blobstore.BlobStore;
|
import org.jclouds.blobstore.BlobStore;
|
||||||
import org.jclouds.blobstore.KeyNotFoundException;
|
import org.jclouds.blobstore.KeyNotFoundException;
|
||||||
import org.jclouds.blobstore.domain.Blob;
|
import org.jclouds.blobstore.domain.Blob;
|
||||||
|
@ -146,20 +145,6 @@ public class AWSS3ClientLiveTest extends S3ClientLiveTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMultipartAsynchronouslySmallBlob() throws IOException, InterruptedException, Exception {
|
|
||||||
String containerName = getContainerName();
|
|
||||||
|
|
||||||
try {
|
|
||||||
AsyncBlobStore asyncBlobStore = view.getAsyncBlobStore();
|
|
||||||
asyncBlobStore.createContainerInLocation(null, containerName).get();
|
|
||||||
Blob blob = asyncBlobStore.blobBuilder("small").payload("small").build();
|
|
||||||
asyncBlobStore.putBlob(containerName, blob, PutOptions.Builder.multipart()).get();
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
returnContainer(containerName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testPutWithReducedRedundancyStorage() throws InterruptedException {
|
public void testPutWithReducedRedundancyStorage() throws InterruptedException {
|
||||||
String containerName = getContainerName();
|
String containerName = getContainerName();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -16,9 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.jclouds.hpcloud.objectstorage.blobstore;
|
package org.jclouds.hpcloud.objectstorage.blobstore;
|
||||||
|
|
||||||
|
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
import javax.inject.Named;
|
||||||
import javax.inject.Provider;
|
import javax.inject.Provider;
|
||||||
import javax.inject.Singleton;
|
import javax.inject.Singleton;
|
||||||
|
|
||||||
|
@ -41,6 +44,7 @@ import org.jclouds.openstack.swift.blobstore.functions.ObjectToBlobMetadata;
|
||||||
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
import org.jclouds.openstack.swift.blobstore.strategy.internal.MultipartUploadStrategy;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class HPCloudObjectStorageBlobStore extends SwiftBlobStore {
|
public class HPCloudObjectStorageBlobStore extends SwiftBlobStore {
|
||||||
|
@ -48,17 +52,18 @@ public class HPCloudObjectStorageBlobStore extends SwiftBlobStore {
|
||||||
private EnableCDNAndCache enableCDNAndCache;
|
private EnableCDNAndCache enableCDNAndCache;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
protected HPCloudObjectStorageBlobStore(BlobStoreContext context, BlobUtils blobUtils,
|
protected HPCloudObjectStorageBlobStore(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||||
Supplier<Location> defaultLocation, @Memoized Supplier<Set<? extends Location>> locations,
|
BlobStoreContext context, BlobUtils blobUtils, Supplier<Location> defaultLocation,
|
||||||
HPCloudObjectStorageApi sync, ContainerToResourceMetadata container2ResourceMd,
|
@Memoized Supplier<Set<? extends Location>> locations, HPCloudObjectStorageApi sync,
|
||||||
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
ContainerToResourceMetadata container2ResourceMd,
|
||||||
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
BlobStoreListContainerOptionsToListContainerOptions container2ContainerListOptions,
|
||||||
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
ContainerToResourceList container2ResourceList, ObjectToBlob object2Blob, BlobToObject blob2Object,
|
||||||
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache,
|
ObjectToBlobMetadata object2BlobMd, BlobToHttpGetOptions blob2ObjectGetOptions,
|
||||||
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
Provider<FetchBlobMetadata> fetchBlobMetadataProvider, EnableCDNAndCache enableCDNAndCache,
|
||||||
super(context, blobUtils, defaultLocation, locations, sync, container2ResourceMd, container2ContainerListOptions,
|
Provider<MultipartUploadStrategy> multipartUploadStrategy) {
|
||||||
container2ResourceList, object2Blob, blob2Object, object2BlobMd, blob2ObjectGetOptions,
|
super(userExecutor, context, blobUtils, defaultLocation, locations, sync, container2ResourceMd,
|
||||||
fetchBlobMetadataProvider, multipartUploadStrategy);
|
container2ContainerListOptions, container2ResourceList, object2Blob, blob2Object, object2BlobMd,
|
||||||
|
blob2ObjectGetOptions, fetchBlobMetadataProvider, multipartUploadStrategy);
|
||||||
this.enableCDNAndCache = enableCDNAndCache;
|
this.enableCDNAndCache = enableCDNAndCache;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue