Issue 144: replaced FutureFunctionCallable, FutureFunctionWrapper, RunnableFutureTask with Futures.compose and Futures.chain

git-svn-id: http://jclouds.googlecode.com/svn/trunk@2617 3d8758e0-26b5-11de-8745-db77d3ebf521
This commit is contained in:
adrian.f.cole 2010-01-07 01:46:43 +00:00
parent 6a5a4df606
commit 1284fd67fa
34 changed files with 498 additions and 963 deletions

View File

@ -18,6 +18,8 @@
*/
package org.jclouds.atmosonline.saas.blobstore;
import static com.google.common.util.concurrent.Futures.compose;
import static com.google.common.util.concurrent.Futures.makeListenable;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import java.net.URI;
@ -46,7 +48,6 @@ import org.jclouds.blobstore.domain.ResourceMetadata;
import org.jclouds.blobstore.domain.Blob.Factory;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.concurrent.FutureFunctionCallable;
import org.jclouds.encryption.EncryptionService;
import org.jclouds.http.options.GetOptions;
import org.jclouds.logging.Logger.LoggerFactory;
@ -77,15 +78,13 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob
* This implementation uses the AtmosStorage HEAD Object command to return the result
*/
public Future<BlobMetadata> blobMetadata(String container, String key) {
return wrapFuture(async.headFile(container + "/" + key),
return compose(makeListenable(async.headFile(container + "/" + key)),
new Function<AtmosObject, BlobMetadata>() {
@Override
public BlobMetadata apply(AtmosObject from) {
return object2BlobMd.apply(from);
}
});
}, service);
}
public Future<Void> clearContainer(final String container) {
@ -100,7 +99,8 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob
}
public Future<Boolean> createContainer(String container) {
return wrapFuture(async.createDirectory(container), new Function<URI, Boolean>() {
return compose(makeListenable(async.createDirectory(container)),
new Function<URI, Boolean>() {
public Boolean apply(URI from) {
return true;// no etag
@ -110,7 +110,7 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob
}
public Future<Void> createDirectory(String container, String directory) {
return wrapFuture(async.createDirectory(container + "/" + directory),
return compose(makeListenable(async.createDirectory(container + "/" + directory)),
new Function<URI, Void>() {
public Void apply(URI from) {
@ -151,11 +151,11 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob
org.jclouds.blobstore.options.GetOptions... optionsList) {
GetOptions httpOptions = blob2ObjectGetOptions.apply(optionsList);
Future<AtmosObject> returnVal = async.readFile(container + "/" + key, httpOptions);
return wrapFuture(returnVal, object2Blob);
return compose(makeListenable(returnVal), object2Blob, service);
}
public Future<? extends ListResponse<? extends ResourceMetadata>> list() {
return wrapFuture(async.listDirectories(), container2ResourceList);
return compose(makeListenable(async.listDirectories()), container2ResourceList, service);
}
public Future<? extends ListContainerResponse<? extends ResourceMetadata>> list(
@ -169,7 +169,8 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob
}
}
ListOptions nativeOptions = container2ContainerListOptions.apply(optionsList);
return wrapFuture(async.listDirectory(container, nativeOptions), container2ResourceList);
return compose(makeListenable(async.listDirectory(container, nativeOptions)),
container2ResourceList, service);
}
/**
@ -177,9 +178,7 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob
*/
public Future<String> putBlob(final String container, final Blob blob) {
final String path = container + "/" + blob.getMetadata().getName();
Callable<String> valueCallable = new FutureFunctionCallable<Void, String>(async
.deletePath(path), new Function<Void, String>() {
return compose(makeListenable(async.deletePath(path)), new Function<Void, String>() {
public String apply(Void from) {
try {
@ -200,8 +199,7 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob
}
}
});
return service.submit(valueCallable);
}, service);
}

View File

@ -21,7 +21,6 @@ package org.jclouds.atmosonline.saas.blobstore.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.inject.Named;
@ -36,10 +35,8 @@ import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.reference.BlobStoreConstants;
import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.logging.Logger.LoggerFactory;
import com.google.common.base.Function;
import com.google.inject.Inject;
public class BaseAtmosBlobStore {
@ -83,11 +80,6 @@ public class BaseAtmosBlobStore {
this.service = checkNotNull(service, "service");
}
protected <F, T> Future<T> wrapFuture(Future<? extends F> future, Function<F, T> function) {
return new FutureFunctionWrapper<F, T>(future, function, logFactory.getLogger(function
.getClass().getName()));
}
public Blob newBlob(String name) {
Blob blob = blobFactory.create(null);
blob.getMetadata().setName(name);

View File

@ -37,13 +37,13 @@ import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.blobstore.reference.BlobStoreConstants;
import org.jclouds.blobstore.strategy.ClearContainerStrategy;
import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.logging.Logger;
import org.jclouds.util.Utils;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
/**
@ -88,7 +88,7 @@ public class RecursiveRemove implements ClearListStrategy, ClearContainerStrateg
for (Future<Void> isdeleted : deletes) {
isdeleted.get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
}
return new FutureFunctionWrapper<Void, Void>(async.deletePath(fullPath),
return Futures.compose(async.deletePath(fullPath),
new Function<Void, Void>() {
public Void apply(Void from) {

View File

@ -19,6 +19,8 @@
package org.jclouds.atmosonline.saas.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import java.net.URI;
import java.util.concurrent.ExecutionException;
@ -45,13 +47,13 @@ import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions;
import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore;
import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore.FutureBase;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.http.options.GetOptions;
import org.jclouds.logging.Logger.LoggerFactory;
import org.jclouds.util.Utils;
import org.jclouds.rest.ResourceNotFoundException;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
/**
* Implementation of {@link AtmosStorageAsyncClient} which keeps all data in a local Map object.
@ -62,7 +64,6 @@ import com.google.common.base.Function;
public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
private final HttpGetOptionsListToGetOptions httpGetOptionsConverter;
private final StubAsyncBlobStore blobStore;
private final LoggerFactory logFactory;
private final AtmosObject.Factory objectProvider;
private final ObjectToBlob object2Blob;
private final BlobToObject blob2Object;
@ -71,13 +72,12 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
private final ResourceMetadataListToDirectoryEntryList resource2ObjectList;
@Inject
private StubAtmosStorageAsyncClient(StubAsyncBlobStore blobStore, LoggerFactory logFactory,
private StubAtmosStorageAsyncClient(StubAsyncBlobStore blobStore,
AtmosObject.Factory objectProvider,
HttpGetOptionsListToGetOptions httpGetOptionsConverter, ObjectToBlob object2Blob,
BlobToObject blob2Object, BlobMetadataToObject blob2ObjectInfo,
ListOptionsToBlobStoreListOptions container2ContainerListOptions,
ResourceMetadataListToDirectoryEntryList resource2ContainerList) {
this.logFactory = logFactory;
this.blobStore = blobStore;
this.objectProvider = objectProvider;
this.httpGetOptionsConverter = httpGetOptionsConverter;
@ -89,18 +89,14 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
this.resource2ObjectList = checkNotNull(resource2ContainerList, "resource2ContainerList");
}
protected <F, T> Future<T> wrapFuture(Future<? extends F> future, Function<F, T> function) {
return new FutureFunctionWrapper<F, T>(future, function, logFactory.getLogger(function
.getClass().getName()));
}
public Future<URI> createDirectory(String directoryName) {
final String container;
if (directoryName.indexOf('/') != -1)
container = directoryName.substring(0, directoryName.indexOf('/'));
else
container = directoryName;
return wrapFuture(blobStore.createContainer(container), new Function<Boolean, URI>() {
return Futures.compose(Futures.makeListenable(blobStore.createContainer(container)),
new Function<Boolean, URI>() {
public URI apply(Boolean from) {
return URI.create("http://stub/containers/" + container);
@ -121,7 +117,8 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
object.getContentMetadata().setName(path + "/" + file);
}
Blob blob = object2Blob.apply(object);
return wrapFuture(blobStore.putBlob(container, blob), new Function<String, URI>() {
return Futures.compose(Futures.makeListenable(blobStore.putBlob(container, blob)),
new Function<String, URI>() {
public URI apply(String from) {
return URI.create(uri);
@ -132,7 +129,8 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
public Future<Void> deletePath(String path) {
if (path.indexOf('/') == -1)
return wrapFuture(blobStore.deleteContainerImpl(path), new Function<Boolean, Void>() {
return Futures.compose(Futures.makeListenable(blobStore.deleteContainerImpl(path)),
new Function<Boolean, Void>() {
public Void apply(Boolean from) {
return null;
@ -156,7 +154,7 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
else {
String container = path.substring(0, path.indexOf('/'));
path = path.substring(path.indexOf('/') + 1);
return wrapFuture(blobStore.blobMetadata(container, path),
return Futures.compose(Futures.makeListenable(blobStore.blobMetadata(container, path)),
new Function<BlobMetadata, UserMetadata>() {
public UserMetadata apply(BlobMetadata from) {
return blob2ObjectInfo.apply(from).getUserMetadata();
@ -169,10 +167,10 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
String container = path.substring(0, path.indexOf('/'));
path = path.substring(path.indexOf('/') + 1);
try {
return wrapFuture(blobStore.getBlob(container, path), blob2Object);
return Futures.compose(Futures.makeListenable(blobStore.getBlob(container, path)),
blob2Object);
} catch (Exception e) {
Utils.<KeyNotFoundException> rethrowIfRuntimeOrSameType(e);
throw new RuntimeException(e);
return immediateFailedFuture(Throwables.getRootCause(e));
}
}
@ -180,7 +178,7 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
ListOptions... optionsList) {
// org.jclouds.blobstore.options.ListOptions options = container2ContainerListOptions
// .apply(optionsList);
return wrapFuture(blobStore.list(), resource2ObjectList);
return Futures.compose(Futures.makeListenable(blobStore.list()), resource2ObjectList);
}
public Future<? extends BoundedSortedSet<? extends DirectoryEntry>> listDirectory(
@ -194,7 +192,8 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
if (!path.equals(""))
options.inDirectory(path);
}
return wrapFuture(blobStore.list(container, options), resource2ObjectList);
return Futures.compose(Futures.makeListenable(blobStore.list(container, options)),
resource2ObjectList);
}
public AtmosObject newObject() {
@ -205,27 +204,30 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient {
if (path.indexOf('/') == -1 || (path.endsWith("/")))
return blobStore.containerExists(path);
else {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
String container = path.substring(0, path.indexOf('/'));
String blobName = path.substring(path.indexOf('/') + 1);
try {
blobStore.blobMetadata(container, blobName).get();
return true;
return immediateFuture(true);
} catch (KeyNotFoundException e) {
return false;
return immediateFuture(false);
} catch (InterruptedException e) {
return immediateFuture(false);
} catch (ExecutionException e) {
if (Iterables.size(Iterables.filter(Throwables.getCausalChain(e),
ResourceNotFoundException.class)) >= 1)
return immediateFuture(false);
return immediateFailedFuture(e);
}
}
};
}
}
public Future<AtmosObject> readFile(String path, GetOptions... options) {
String container = path.substring(0, path.indexOf('/'));
String blobName = path.substring(path.indexOf('/') + 1);
org.jclouds.blobstore.options.GetOptions getOptions = httpGetOptionsConverter.apply(options);
return wrapFuture(blobStore.getBlob(container, blobName, getOptions), blob2Object);
return Futures.compose(Futures.makeListenable(blobStore.getBlob(container, blobName,
getOptions)), blob2Object);
}
public Future<Void> updateFile(String parent, AtmosObject object) {

View File

@ -66,7 +66,7 @@ public class ParseAWSErrorFromXmlContent implements HttpErrorHandler {
command.setException(new HttpResponseException(command, response, content));
Utils.rethrowIfRuntime(he);
}
} else {
} else if (response.getStatusCode() == 404){
command.setException(new HttpResponseException(command, response));
}
} catch (Exception e) {

View File

@ -40,7 +40,6 @@ import org.jclouds.aws.s3.blobstore.internal.BaseS3BlobStore;
import org.jclouds.aws.s3.domain.BucketMetadata;
import org.jclouds.aws.s3.domain.ListBucketResponse;
import org.jclouds.aws.s3.domain.ObjectMetadata;
import org.jclouds.aws.s3.domain.S3Object;
import org.jclouds.aws.s3.options.ListBucketOptions;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.KeyNotFoundException;
@ -60,6 +59,7 @@ import org.jclouds.logging.Logger.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import static com.google.common.util.concurrent.Futures.*;
public class S3AsyncBlobStore extends BaseS3BlobStore implements AsyncBlobStore {
@ -74,15 +74,14 @@ public class S3AsyncBlobStore extends BaseS3BlobStore implements AsyncBlobStore
ExecutorService service) {
super(async, sync, blobFactory, logFactory, clearContainerStrategy, object2BlobMd,
object2Blob, blob2Object, container2BucketListOptions, blob2ObjectGetOptions,
getDirectoryStrategy, mkdirStrategy, bucket2ResourceMd, bucket2ResourceList,
service);
getDirectoryStrategy, mkdirStrategy, bucket2ResourceMd, bucket2ResourceList, service);
}
/**
* This implementation uses the S3 HEAD Object command to return the result
*/
public Future<BlobMetadata> blobMetadata(String container, String key) {
return wrapFuture(async.headObject(container, key),
return compose(makeListenable(async.headObject(container, key)),
new Function<ObjectMetadata, BlobMetadata>() {
@Override
@ -90,7 +89,7 @@ public class S3AsyncBlobStore extends BaseS3BlobStore implements AsyncBlobStore
return object2BlobMd.apply(from);
}
});
}, service);
}
public Future<Void> clearContainer(final String container) {
@ -153,27 +152,27 @@ public class S3AsyncBlobStore extends BaseS3BlobStore implements AsyncBlobStore
public Future<Blob> getBlob(String container, String key,
org.jclouds.blobstore.options.GetOptions... optionsList) {
GetOptions httpOptions = blob2ObjectGetOptions.apply(optionsList);
Future<S3Object> returnVal = async.getObject(container, key, httpOptions);
return wrapFuture(returnVal, object2Blob);
return compose(makeListenable(async.getObject(container, key, httpOptions)), object2Blob,
service);
}
public Future<? extends ListResponse<? extends ResourceMetadata>> list() {
return wrapFuture(
async.listOwnedBuckets(),
return compose(
makeListenable(async.listOwnedBuckets()),
new Function<SortedSet<BucketMetadata>, org.jclouds.blobstore.domain.ListResponse<? extends ResourceMetadata>>() {
public org.jclouds.blobstore.domain.ListResponse<? extends ResourceMetadata> apply(
SortedSet<BucketMetadata> from) {
return new ListResponseImpl<ResourceMetadata>(Iterables.transform(from,
bucket2ResourceMd), null, null, false);
}
});
}, service);
}
public Future<? extends ListContainerResponse<? extends ResourceMetadata>> list(
String container, ListContainerOptions... optionsList) {
ListBucketOptions httpOptions = container2BucketListOptions.apply(optionsList);
Future<ListBucketResponse> returnVal = async.listBucket(container, httpOptions);
return wrapFuture(returnVal, bucket2ResourceList);
return compose(makeListenable(returnVal), bucket2ResourceList, service);
}
public Future<String> putBlob(String container, Blob blob) {

View File

@ -21,7 +21,6 @@ package org.jclouds.aws.s3.blobstore.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.inject.Inject;
@ -38,11 +37,8 @@ import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.blobstore.strategy.GetDirectoryStrategy;
import org.jclouds.blobstore.strategy.MkdirStrategy;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.logging.Logger.LoggerFactory;
import com.google.common.base.Function;
public class BaseS3BlobStore {
protected final S3AsyncClient async;
protected final S3Client sync;
@ -80,19 +76,13 @@ public class BaseS3BlobStore {
this.container2BucketListOptions = checkNotNull(container2BucketListOptions,
"container2BucketListOptions");
this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions");
this.getDirectoryStrategy = checkNotNull(getDirectoryStrategy,
"getDirectoryStrategy");
this.getDirectoryStrategy = checkNotNull(getDirectoryStrategy, "getDirectoryStrategy");
this.mkdirStrategy = checkNotNull(mkdirStrategy, "mkdirStrategy");
this.bucket2ResourceMd = checkNotNull(bucket2ResourceMd, "bucket2ResourceMd");
this.bucket2ResourceList = checkNotNull(bucket2ResourceList, "bucket2ResourceList");
this.service = checkNotNull(service, "service");
}
protected <F, T> Future<T> wrapFuture(Future<? extends F> future, Function<F, T> function) {
return new FutureFunctionWrapper<F, T>(future, function, logFactory.getLogger(function
.getClass().getName()));
}
public Blob newBlob(String name) {
Blob blob = blobFactory.create(null);
blob.getMetadata().setName(name);

View File

@ -19,13 +19,14 @@
package org.jclouds.aws.s3.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import java.util.Date;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.inject.Inject;
@ -61,16 +62,14 @@ import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.MutableBlobMetadata;
import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions;
import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore;
import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore.FutureBase;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.date.DateService;
import org.jclouds.http.options.GetOptions;
import org.jclouds.logging.Logger.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
/**
* Implementation of {@link S3AsyncBlobStore} which keeps all data in a local Map object.
@ -83,7 +82,6 @@ public class StubS3AsyncClient implements S3AsyncClient {
private final DateService dateService;
private final HttpGetOptionsListToGetOptions httpGetOptionsConverter;
private final StubAsyncBlobStore blobStore;
private final LoggerFactory logFactory;
private final S3Object.Factory objectProvider;
private final Blob.Factory blobProvider;
private final ObjectToBlob object2Blob;
@ -93,7 +91,7 @@ public class StubS3AsyncClient implements S3AsyncClient {
private final ResourceToBucketList resource2BucketList;
@Inject
private StubS3AsyncClient(StubAsyncBlobStore blobStore, LoggerFactory logFactory,
private StubS3AsyncClient(StubAsyncBlobStore blobStore,
ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs,
DateService dateService, S3Object.Factory objectProvider, Blob.Factory blobProvider,
HttpGetOptionsListToGetOptions httpGetOptionsConverter, ObjectToBlob object2Blob,
@ -101,7 +99,6 @@ public class StubS3AsyncClient implements S3AsyncClient {
BucketToContainerListOptions bucket2ContainerListOptions,
ResourceToBucketList resource2BucketList) {
this.blobStore = blobStore;
this.logFactory = logFactory;
this.objectProvider = objectProvider;
this.blobProvider = blobProvider;
this.dateService = dateService;
@ -125,11 +122,6 @@ public class StubS3AsyncClient implements S3AsyncClient {
public static final String DEFAULT_OWNER_ID = "abc123";
protected <F, T> Future<T> wrapFuture(Future<? extends F> future, Function<F, T> function) {
return new FutureFunctionWrapper<F, T>(future, function, logFactory.getLogger(function
.getClass().getName()));
}
public Future<Boolean> putBucketIfNotExists(String name, PutBucketOptions... optionsList) {
final PutBucketOptions options = (optionsList.length == 0) ? new PutBucketOptions()
: optionsList[0];
@ -141,7 +133,8 @@ public class StubS3AsyncClient implements S3AsyncClient {
public Future<ListBucketResponse> listBucket(final String name, ListBucketOptions... optionsList) {
ListContainerOptions options = bucket2ContainerListOptions.apply(optionsList);
return wrapFuture(blobStore.list(name, options), resource2BucketList);
return Futures.compose(Futures.makeListenable(blobStore.list(name, options)),
resource2BucketList);
}
public Future<ObjectMetadata> copyObject(final String sourceBucket, final String sourceObject,
@ -149,37 +142,31 @@ public class StubS3AsyncClient implements S3AsyncClient {
CopyObjectOptions... nullableOptions) {
final CopyObjectOptions options = (nullableOptions.length == 0) ? new CopyObjectOptions()
: nullableOptions[0];
return new FutureBase<ObjectMetadata>() {
public ObjectMetadata get() throws InterruptedException, ExecutionException {
ConcurrentMap<String, Blob> source = blobStore.getContainerToBlobs().get(sourceBucket);
ConcurrentMap<String, Blob> dest = blobStore.getContainerToBlobs().get(
destinationBucket);
ConcurrentMap<String, Blob> dest = blobStore.getContainerToBlobs().get(destinationBucket);
if (source.containsKey(sourceObject)) {
Blob object = source.get(sourceObject);
if (options.getIfMatch() != null) {
if (!object.getMetadata().getETag().equals(options.getIfMatch()))
blobStore.throwResponseException(412);
return immediateFailedFuture(blobStore.returnResponseException(412));
}
if (options.getIfNoneMatch() != null) {
if (object.getMetadata().getETag().equals(options.getIfNoneMatch()))
blobStore.throwResponseException(412);
return immediateFailedFuture(blobStore.returnResponseException(412));
}
if (options.getIfModifiedSince() != null) {
Date modifiedSince = dateService.rfc822DateParse(options.getIfModifiedSince());
if (modifiedSince.after(object.getMetadata().getLastModified()))
blobStore.throwResponseException(412);
return immediateFailedFuture(blobStore.returnResponseException(412));
}
if (options.getIfUnmodifiedSince() != null) {
Date unmodifiedSince = dateService
.rfc822DateParse(options.getIfUnmodifiedSince());
Date unmodifiedSince = dateService.rfc822DateParse(options.getIfUnmodifiedSince());
if (unmodifiedSince.before(object.getMetadata().getLastModified()))
blobStore.throwResponseException(412);
return immediateFailedFuture(blobStore.returnResponseException(412));
}
Blob sourceS3 = source.get(sourceObject);
MutableBlobMetadata newMd = blobStore
.copy(sourceS3.getMetadata(), destinationObject);
MutableBlobMetadata newMd = blobStore.copy(sourceS3.getMetadata(), destinationObject);
if (options.getAcl() != null)
keyToAcl.put(destinationBucket + "/" + destinationObject, options.getAcl());
@ -187,11 +174,9 @@ public class StubS3AsyncClient implements S3AsyncClient {
Blob newBlob = blobProvider.create(newMd);
newBlob.setPayload(sourceS3.getContent());
dest.put(destinationObject, newBlob);
return blob2ObjectMetadata.apply(blobStore.copy(newMd));
return immediateFuture((ObjectMetadata) blob2ObjectMetadata.apply(blobStore.copy(newMd)));
}
throw new KeyNotFoundException(sourceBucket, sourceObject);
}
};
return immediateFailedFuture(new KeyNotFoundException(sourceBucket, sourceObject));
}
public Future<String> putObject(final String bucketName, final S3Object object,
@ -220,19 +205,11 @@ public class StubS3AsyncClient implements S3AsyncClient {
}
public Future<AccessControlList> getBucketACL(final String bucket) {
return new FutureBase<AccessControlList>() {
public AccessControlList get() throws InterruptedException, ExecutionException {
return getACLforS3Item(bucket);
}
};
return immediateFuture(getACLforS3Item(bucket));
}
public Future<AccessControlList> getObjectACL(final String bucket, final String objectKey) {
return new FutureBase<AccessControlList>() {
public AccessControlList get() throws InterruptedException, ExecutionException {
return getACLforS3Item(bucket + "/" + objectKey);
}
};
return immediateFuture(getACLforS3Item(bucket + "/" + objectKey));
}
/**
@ -259,30 +236,18 @@ public class StubS3AsyncClient implements S3AsyncClient {
}
public Future<Boolean> putBucketACL(final String bucket, final AccessControlList acl) {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
keyToAcl.put(bucket, sanitizeUploadedACL(acl));
return true;
}
};
return immediateFuture(true);
}
public Future<Boolean> putObjectACL(final String bucket, final String objectKey,
final AccessControlList acl) {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
keyToAcl.put(bucket + "/" + objectKey, sanitizeUploadedACL(acl));
return true;
}
};
return immediateFuture(true);
}
public Future<Boolean> bucketExists(final String bucketName) {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
return blobStore.getContainerToBlobs().containsKey(bucketName);
}
};
return immediateFuture(blobStore.getContainerToBlobs().containsKey(bucketName));
}
public Future<Boolean> deleteBucketIfEmpty(String bucketName) {
@ -296,36 +261,28 @@ public class StubS3AsyncClient implements S3AsyncClient {
public Future<S3Object> getObject(final String bucketName, final String key,
final GetOptions... options) {
org.jclouds.blobstore.options.GetOptions getOptions = httpGetOptionsConverter.apply(options);
return wrapFuture(blobStore.getBlob(bucketName, key, getOptions), blob2Object);
return Futures.compose(
Futures.makeListenable(blobStore.getBlob(bucketName, key, getOptions)), blob2Object);
}
public Future<ObjectMetadata> headObject(String bucketName, String key) {
return wrapFuture(blobStore.blobMetadata(bucketName, key),
return Futures.compose(Futures.makeListenable(blobStore.blobMetadata(bucketName, key)),
new Function<BlobMetadata, ObjectMetadata>() {
@Override
public ObjectMetadata apply(BlobMetadata from) {
return blob2ObjectMetadata.apply(from);
}
});
}
public Future<? extends SortedSet<BucketMetadata>> listOwnedBuckets() {
return new FutureBase<SortedSet<BucketMetadata>>() {
public SortedSet<BucketMetadata> get() throws InterruptedException, ExecutionException {
return Sets.newTreeSet(Iterables.transform(blobStore.getContainerToBlobs().keySet(),
new Function<String, BucketMetadata>() {
return immediateFuture(Sets.newTreeSet(Iterables.transform(blobStore.getContainerToBlobs()
.keySet(), new Function<String, BucketMetadata>() {
public BucketMetadata apply(String name) {
return new BucketMetadata(name, null, null);
}
}));
}
};
})));
}
public S3Object newS3Object() {
@ -334,56 +291,33 @@ public class StubS3AsyncClient implements S3AsyncClient {
@Override
public Future<LocationConstraint> getBucketLocation(String bucketName) {
return new FutureBase<LocationConstraint>() {
public LocationConstraint get() throws InterruptedException, ExecutionException {
return LocationConstraint.US_STANDARD;
}
};
return immediateFuture(LocationConstraint.US_STANDARD);
}
@Override
public Future<Payer> getBucketPayer(String bucketName) {
return new FutureBase<Payer>() {
public Payer get() throws InterruptedException, ExecutionException {
return Payer.BUCKET_OWNER;
}
};
return immediateFuture(Payer.BUCKET_OWNER);
}
@Override
public Future<Void> setBucketPayer(String bucketName, Payer payer) {
return new FutureBase<Void>() {
public Void get() throws InterruptedException, ExecutionException {
return null;
}
};
return immediateFuture(null);
}
@Override
public Future<Void> disableBucketLogging(String bucketName) {
return new FutureBase<Void>() {
public Void get() throws InterruptedException, ExecutionException {
return null;
}
};
return immediateFuture(null);
}
@Override
public Future<Void> enableBucketLogging(String bucketName, BucketLogging logging) {
return new FutureBase<Void>() {
public Void get() throws InterruptedException, ExecutionException {
return null;
}
};
return immediateFuture(null);
}
@Override
public Future<BucketLogging> getBucketLogging(String bucketName) {
return new FutureBase<BucketLogging>() {
public BucketLogging get() throws InterruptedException, ExecutionException {
return null;
}
};
return immediateFuture(null);
}
}

View File

@ -18,6 +18,8 @@
*/
package org.jclouds.azure.storage.blob.blobstore;
import static com.google.common.util.concurrent.Futures.compose;
import static com.google.common.util.concurrent.Futures.makeListenable;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import java.util.SortedSet;
@ -81,7 +83,7 @@ public class AzureAsyncBlobStore extends BaseAzureBlobStore implements AsyncBlob
* This implementation uses the AzureBlob HEAD Object command to return the result
*/
public Future<BlobMetadata> blobMetadata(String container, String key) {
return wrapFuture(async.getBlobProperties(container, key),
return compose(makeListenable(async.getBlobProperties(container, key)),
new Function<BlobProperties, BlobMetadata>() {
@Override
@ -89,7 +91,7 @@ public class AzureAsyncBlobStore extends BaseAzureBlobStore implements AsyncBlob
return object2BlobMd.apply(from);
}
});
}, service);
}
public Future<Void> clearContainer(final String container) {
@ -120,26 +122,26 @@ public class AzureAsyncBlobStore extends BaseAzureBlobStore implements AsyncBlob
org.jclouds.blobstore.options.GetOptions... optionsList) {
GetOptions httpOptions = blob2ObjectGetOptions.apply(optionsList);
Future<AzureBlob> returnVal = async.getBlob(container, key, httpOptions);
return wrapFuture(returnVal, object2Blob);
return compose(makeListenable(returnVal), object2Blob, service);
}
public Future<? extends org.jclouds.blobstore.domain.ListResponse<? extends ResourceMetadata>> list() {
return wrapFuture(
async.listContainers(),
return compose(
makeListenable(async.listContainers()),
new Function<SortedSet<ListableContainerProperties>, org.jclouds.blobstore.domain.ListResponse<? extends ResourceMetadata>>() {
public org.jclouds.blobstore.domain.ListResponse<? extends ResourceMetadata> apply(
SortedSet<ListableContainerProperties> from) {
return new ListResponseImpl<ResourceMetadata>(Iterables.transform(from,
container2ResourceMd), null, null, false);
}
});
}, service);
}
public Future<? extends ListContainerResponse<? extends ResourceMetadata>> list(
String container, ListContainerOptions... optionsList) {
ListBlobsOptions httpOptions = container2ContainerListOptions.apply(optionsList);
Future<ListBlobsResponse> returnVal = async.listBlobs(container, httpOptions);
return wrapFuture(returnVal, container2ResourceList);
return compose(makeListenable(returnVal), container2ResourceList, service);
}
public Future<String> putBlob(String container, Blob blob) {

View File

@ -21,7 +21,6 @@ package org.jclouds.azure.storage.blob.blobstore.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.inject.Inject;
@ -38,11 +37,8 @@ import org.jclouds.blobstore.functions.BlobToHttpGetOptions;
import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.blobstore.strategy.GetDirectoryStrategy;
import org.jclouds.blobstore.strategy.MkdirStrategy;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.logging.Logger.LoggerFactory;
import com.google.common.base.Function;
public class BaseAzureBlobStore {
protected final AzureBlobAsyncClient async;
protected final AzureBlobClient sync;
@ -87,11 +83,6 @@ public class BaseAzureBlobStore {
this.service = checkNotNull(service, "service");
}
protected <F, T> Future<T> wrapFuture(Future<? extends F> future, Function<F, T> function) {
return new FutureFunctionWrapper<F, T>(future, function, logFactory.getLogger(function
.getClass().getName()));
}
public Blob newBlob(String name) {
Blob blob = blobFactory.create(null);
blob.getMetadata().setName(name);

View File

@ -19,12 +19,12 @@
package org.jclouds.azure.storage.blob.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import java.net.URI;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.inject.Inject;
@ -51,13 +51,11 @@ import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions;
import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore;
import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore.FutureBase;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.http.options.GetOptions;
import org.jclouds.logging.Logger.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
/**
* Implementation of {@link AzureBlobAsyncClient} which keeps all data in a local Map object.
@ -68,7 +66,6 @@ import com.google.common.collect.Iterables;
public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient {
private final HttpGetOptionsListToGetOptions httpGetOptionsConverter;
private final StubAsyncBlobStore blobStore;
private final LoggerFactory logFactory;
private final AzureBlob.Factory objectProvider;
private final AzureBlobToBlob object2Blob;
private final BlobToAzureBlob blob2Object;
@ -78,14 +75,13 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient {
private final ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs;
@Inject
private StubAzureBlobAsyncClient(StubAsyncBlobStore blobStore, LoggerFactory logFactory,
private StubAzureBlobAsyncClient(StubAsyncBlobStore blobStore,
ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs,
AzureBlob.Factory objectProvider,
HttpGetOptionsListToGetOptions httpGetOptionsConverter, AzureBlobToBlob object2Blob,
BlobToAzureBlob blob2Object, BlobMetadataToBlobProperties blob2ObjectInfo,
ListBlobsOptionsToListOptions container2ContainerListOptions,
ResourceToListBlobsResponse resource2ContainerList) {
this.logFactory = logFactory;
this.containerToBlobs = containerToBlobs;
this.blobStore = blobStore;
this.objectProvider = objectProvider;
@ -98,11 +94,6 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient {
this.resource2ObjectList = checkNotNull(resource2ContainerList, "resource2ContainerList");
}
protected <F, T> Future<T> wrapFuture(Future<? extends F> future, Function<F, T> function) {
return new FutureFunctionWrapper<F, T>(future, function, logFactory.getLogger(function
.getClass().getName()));
}
public Future<Boolean> createContainer(String container, CreateContainerOptions... options) {
return blobStore.createContainer(container);
}
@ -116,12 +107,8 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient {
}
public Future<Void> deleteContainer(final String container) {
return new FutureBase<Void>() {
public Void get() throws InterruptedException, ExecutionException {
StubAzureBlobAsyncClient.this.containerToBlobs.remove(container);
return null;
}
};
return immediateFuture(null);
}
public Future<Boolean> deleteRootContainer() {
@ -130,11 +117,12 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient {
public Future<AzureBlob> getBlob(String container, String key, GetOptions... options) {
org.jclouds.blobstore.options.GetOptions getOptions = httpGetOptionsConverter.apply(options);
return wrapFuture(blobStore.getBlob(container, key, getOptions), blob2Object);
return Futures.compose(Futures.makeListenable(blobStore.getBlob(container, key, getOptions)),
blob2Object);
}
public Future<BlobProperties> getBlobProperties(String container, String key) {
return wrapFuture(blobStore.blobMetadata(container, key),
return Futures.compose(Futures.makeListenable(blobStore.blobMetadata(container, key)),
new Function<BlobMetadata, BlobProperties>() {
@Override
@ -153,7 +141,8 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient {
public Future<ListBlobsResponse> listBlobs(String container, ListBlobsOptions... optionsList) {
org.jclouds.blobstore.options.ListContainerOptions options = container2ContainerListOptions
.apply(optionsList);
return wrapFuture(blobStore.list(container, options), resource2ObjectList);
return Futures.compose(Futures.makeListenable(blobStore.list(container, options)),
resource2ObjectList);
}
public Future<ListBlobsResponse> listBlobs(ListBlobsOptions... options) {
@ -162,21 +151,15 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient {
public Future<? extends BoundedSortedSet<ListableContainerProperties>> listContainers(
ListOptions... listOptions) {
return new FutureBase<BoundedSortedSet<ListableContainerProperties>>() {
public BoundedSortedSet<ListableContainerProperties> get() throws InterruptedException,
ExecutionException {
return new BoundedTreeSet<ListableContainerProperties>(Iterables.transform(blobStore
.getContainerToBlobs().keySet(),
return immediateFuture(new BoundedTreeSet<ListableContainerProperties>(Iterables.transform(
blobStore.getContainerToBlobs().keySet(),
new Function<String, ListableContainerProperties>() {
public ListableContainerProperties apply(String name) {
return new ListableContainerPropertiesImpl(URI.create("http://stub/"
+ name), new Date(), "");
return new ListableContainerPropertiesImpl(URI.create("http://stub/" + name),
new Date(), "");
}
}), null, null, null, null, null);
}
};
}), null, null, null, null, null));
}
public AzureBlob newBlob() {
@ -196,11 +179,7 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient {
}
public Future<Boolean> containerExists(final String container) {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
return blobStore.getContainerToBlobs().containsKey(container);
}
};
return immediateFuture(blobStore.getContainerToBlobs().containsKey(container));
}
}

View File

@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Named;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.ListResponse;
@ -43,9 +42,11 @@ import org.jclouds.blobstore.strategy.ContainsValueInListStrategy;
import org.jclouds.blobstore.strategy.CountListStrategy;
import org.jclouds.blobstore.strategy.GetBlobsInListStrategy;
import org.jclouds.blobstore.strategy.ListBlobMetadataStrategy;
import org.jclouds.rest.ResourceNotFoundException;
import org.jclouds.util.Utils;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
@ -204,9 +205,10 @@ public abstract class BaseBlobMap<V> {
try {
return connection.blobMetadata(containerName, realKey).get(requestTimeoutMilliseconds,
TimeUnit.MILLISECONDS) != null;
} catch (KeyNotFoundException e) {
return false;
} catch (Exception e) {
if (Iterables.size(Iterables.filter(Throwables.getCausalChain(e),
ResourceNotFoundException.class)) >= 1)
return false;
Utils.<BlobRuntimeException> rethrowIfRuntimeOrSameType(e);
throw new BlobRuntimeException(String.format("Error searching for %1$s:%2$s",
containerName, realKey), e);

View File

@ -27,8 +27,8 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.jclouds.blobstore.BlobMap;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobMap;
import org.jclouds.blobstore.KeyNotFoundException;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.ListContainerOptions;
@ -40,6 +40,7 @@ import org.jclouds.blobstore.strategy.ListBlobMetadataStrategy;
import org.jclouds.util.Utils;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
@ -118,15 +119,13 @@ public class BlobMapImpl extends BaseBlobMap<Blob> implements BlobMap {
return stripPrefix(connection.getBlob(containerName, realKey).get(
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS));
} catch (Exception e) {
if (e instanceof KeyNotFoundException)
return null;
// the following will unwrap any exceptions, so we should double-check that it
// wasn't unwrapped to a KNFE
e = Utils.<BlobRuntimeException> rethrowIfRuntimeOrSameType(e);
if (e instanceof KeyNotFoundException)
Throwable cause = Throwables.getRootCause(e);
if (cause instanceof KeyNotFoundException) {
return null;
} else {
throw new BlobRuntimeException(String.format("Error geting object %1$s:%2$s",
containerName, realKey), e);
containerName, realKey), cause);
}
}
}

View File

@ -49,6 +49,7 @@ import org.jclouds.util.Utils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
@ -86,7 +87,10 @@ public class InputStreamMapImpl extends BaseBlobMap<InputStream> implements Inpu
} catch (KeyNotFoundException e) {
return null;
} catch (Exception e) {
Utils.<BlobRuntimeException> rethrowIfRuntimeOrSameType(e);
Throwable cause = Throwables.getRootCause(e);
if (cause instanceof KeyNotFoundException)
return null;
Throwables.propagateIfInstanceOf(e, BlobRuntimeException.class);
throw new BlobRuntimeException(String.format("Error geting object %1$s:%2$s",
containerName, realKey), e);
}

View File

@ -42,6 +42,7 @@ import org.jclouds.blobstore.strategy.ListBlobMetadataStrategy;
import org.jclouds.util.Utils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
@ -107,11 +108,15 @@ public class GetAllBlobsInListAndRetryOnFailure implements GetBlobsInListStrateg
object.getMetadata().setName(key);
objects.add(object);
return;
} catch (KeyNotFoundException e) {
} catch (Exception e) {
Throwable cause = Throwables.getRootCause(e);
if (cause instanceof KeyNotFoundException) {
Thread.sleep(requestRetryMilliseconds);
value = connection.getBlob(container, key);
} else {
Throwables.propagate(e);
}
}
}
}
}

View File

@ -19,6 +19,8 @@
package org.jclouds.blobstore.integration.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -41,11 +43,8 @@ import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import javax.ws.rs.core.HttpHeaders;
@ -154,33 +153,26 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
}
public Future<Blob> getBlob(final String bucketName, final String key) {
return new FutureBase<Blob>() {
public Blob get() throws InterruptedException, ExecutionException {
if (!getContainerToBlobs().containsKey(bucketName))
throw new ContainerNotFoundException(bucketName);
return immediateFailedFuture(new ContainerNotFoundException(bucketName));
Map<String, Blob> realContents = getContainerToBlobs().get(bucketName);
if (!realContents.containsKey(key))
throw new KeyNotFoundException(bucketName, key);
return immediateFailedFuture(new KeyNotFoundException(bucketName, key));
Blob object = realContents.get(key);
Blob returnVal = blobProvider.create(copy(object.getMetadata()));
returnVal.setPayload(object.getContent());
return returnVal;
}
};
return immediateFuture(returnVal);
}
public Future<? extends ListContainerResponse<? extends ResourceMetadata>> list(
final String name, ListContainerOptions... optionsList) {
final ListContainerOptions options = (optionsList.length == 0) ? new ListContainerOptions()
: optionsList[0];
return new FutureBase<ListContainerResponse<ResourceMetadata>>() {
public ListContainerResponse<ResourceMetadata> get() throws InterruptedException,
ExecutionException {
final Map<String, Blob> realContents = getContainerToBlobs().get(name);
if (realContents == null)
throw new ContainerNotFoundException(name);
return immediateFailedFuture(new ContainerNotFoundException(name));
SortedSet<ResourceMetadata> contents = Sets.newTreeSet(Iterables.transform(realContents
.keySet(), new Function<String, ResourceMetadata>() {
@ -206,8 +198,7 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
final String prefix = options.getDir();
if (prefix != null) {
contents = Sets.newTreeSet(Iterables.filter(contents,
new Predicate<ResourceMetadata>() {
contents = Sets.newTreeSet(Iterables.filter(contents, new Predicate<ResourceMetadata>() {
public boolean apply(ResourceMetadata o) {
return (o != null && o.getName().startsWith(prefix));
}
@ -236,8 +227,7 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
SortedSet<String> commonPrefixes = null;
Iterable<String> iterable = Iterables.transform(contents, new CommonPrefixes(
prefix != null ? prefix : null, delimiter));
commonPrefixes = iterable != null ? Sets.newTreeSet(iterable)
: new TreeSet<String>();
commonPrefixes = iterable != null ? Sets.newTreeSet(iterable) : new TreeSet<String>();
commonPrefixes.remove(CommonPrefixes.NO_PREFIX);
contents = Sets.newTreeSet(Iterables.filter(contents, new DelimiterFilter(
@ -253,10 +243,9 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
}
}));
}
return new ListContainerResponseImpl<ResourceMetadata>(contents, prefix, marker,
maxResults, truncated);
}
};
return immediateFuture(new ListContainerResponseImpl<ResourceMetadata>(contents, prefix,
marker, maxResults, truncated));
}
public MutableBlobMetadata copy(MutableBlobMetadata in) {
@ -288,96 +277,54 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
return newMd;
}
public BlobMetadata metadata(final String container, final String key) {
if (!getContainerToBlobs().containsKey(container))
throw new ContainerNotFoundException(container);
Map<String, Blob> realContents = getContainerToBlobs().get(container);
if (!realContents.containsKey(key))
throw new KeyNotFoundException(container, key);
return copy(realContents.get(key).getMetadata());
}
// public BlobMetadata metadata(final String container, final String key) {
// if (!getContainerToBlobs().containsKey(container))
// return immediateFailedFuture(new ContainerNotFoundException(container));
// Map<String, Blob> realContents = getContainerToBlobs().get(container);
// if (!realContents.containsKey(key))
// return immediateFailedFuture(new KeyNotFoundException(container, key));
// return copy(realContents.get(key).getMetadata());
// }
public Future<Void> removeBlob(final String container, final String key) {
return new FutureBase<Void>() {
public Void get() throws InterruptedException, ExecutionException {
if (getContainerToBlobs().containsKey(container)) {
getContainerToBlobs().get(container).remove(key);
}
return null;
}
};
return immediateFuture(null);
}
public Future<Void> deleteContainer(final String container) {
return new FutureBase<Void>() {
public Void get() throws InterruptedException, ExecutionException {
if (getContainerToBlobs().containsKey(container)) {
getContainerToBlobs().remove(container);
}
return null;
}
};
return immediateFuture(null);
}
public Future<Boolean> deleteContainerImpl(final String container) {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
Boolean returnVal = true;
if (getContainerToBlobs().containsKey(container)) {
if (getContainerToBlobs().get(container).size() == 0)
getContainerToBlobs().remove(container);
else
return false;
returnVal = false;
}
return true;
}
};
return immediateFuture(returnVal);
}
public Future<Boolean> containerExists(final String container) {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
return getContainerToBlobs().containsKey(container);
}
};
}
public static abstract class FutureBase<V> implements Future<V> {
public boolean cancel(boolean b) {
return false;
}
public boolean isCancelled() {
return false;
}
public boolean isDone() {
return true;
}
public V get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException,
TimeoutException {
return get();
}
return immediateFuture(getContainerToBlobs().containsKey(container));
}
public Future<? extends ListResponse<? extends ResourceMetadata>> list() {
return new FutureBase<ListResponse<? extends ResourceMetadata>>() {
public ListResponse<ResourceMetadata> get() throws InterruptedException,
ExecutionException {
return new ListResponseImpl<ResourceMetadata>(Iterables.transform(getContainerToBlobs()
.keySet(), new Function<String, ResourceMetadata>() {
return immediateFuture(new ListResponseImpl<ResourceMetadata>(Iterables.transform(
getContainerToBlobs().keySet(), new Function<String, ResourceMetadata>() {
public ResourceMetadata apply(String name) {
MutableResourceMetadata cmd = create();
cmd.setName(name);
cmd.setType(ResourceType.CONTAINER);
return cmd;
}
}), null, null, false);
}
};
}), null, null, false));
}
@ -386,14 +333,10 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
}
public Future<Boolean> createContainer(final String name) {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
if (!getContainerToBlobs().containsKey(name)) {
getContainerToBlobs().put(name, new ConcurrentHashMap<String, Blob>());
}
return getContainerToBlobs().containsKey(name);
}
};
return immediateFuture(getContainerToBlobs().containsKey(name));
}
public String getFirstQueryOrNull(String string, @Nullable HttpRequestOptions options) {
@ -452,11 +395,11 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
return Sets.newTreeSet(slices.get(0));
}
public void throwResponseException(int code) throws ExecutionException {
public HttpResponseException returnResponseException(int code) {
HttpResponse response = null;
response = new HttpResponse(); // TODO: Get real object URL?
response.setStatusCode(code);
throw new ExecutionException(new HttpResponseException(new HttpCommand() {
return new HttpResponseException(new HttpCommand() {
public int getRedirectCount() {
return 0;
@ -499,7 +442,7 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
@Override
public void redirectPath(String newPath) {
}
}, response));
}, response);
}
public Future<String> putBlob(final String bucketName, final Blob object) {
@ -531,14 +474,9 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
for (Entry<String, String> userMD : newMd.getUserMetadata().entrySet()) {
blob.getAllHeaders().put(userMD.getKey(), userMD.getValue());
}
return new FutureBase<String>() {
public String get() throws InterruptedException, ExecutionException {
return eTag;
}
};
return immediateFuture(eTag);
} catch (IOException e) {
throw new RuntimeException(e);
return immediateFailedFuture(new RuntimeException(e));
}
}
@ -546,32 +484,30 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
public Future<? extends Blob> getBlob(final String bucketName, final String key,
GetOptions... optionsList) {
final GetOptions options = (optionsList.length == 0) ? new GetOptions() : optionsList[0];
return new FutureBase<Blob>() {
public Blob get() throws InterruptedException, ExecutionException {
if (!getContainerToBlobs().containsKey(bucketName))
throw new ContainerNotFoundException(bucketName);
return immediateFailedFuture(new ContainerNotFoundException(bucketName));
Map<String, Blob> realContents = getContainerToBlobs().get(bucketName);
if (!realContents.containsKey(key))
throw new KeyNotFoundException(bucketName, key);
return immediateFailedFuture(new KeyNotFoundException(bucketName, key));
Blob object = realContents.get(key);
if (options.getIfMatch() != null) {
if (!object.getMetadata().getETag().equals(options.getIfMatch()))
throwResponseException(412);
return immediateFailedFuture(returnResponseException(412));
}
if (options.getIfNoneMatch() != null) {
if (object.getMetadata().getETag().equals(options.getIfNoneMatch()))
throwResponseException(304);
return immediateFailedFuture(returnResponseException(304));
}
if (options.getIfModifiedSince() != null) {
Date modifiedSince = options.getIfModifiedSince();
if (object.getMetadata().getLastModified().before(modifiedSince)) {
HttpResponse response = new HttpResponse();
response.setStatusCode(304);
throw new ExecutionException(new HttpResponseException(String.format(
"%1$s is before %2$s", object.getMetadata().getLastModified(),
modifiedSince), null, response));
return immediateFailedFuture(new HttpResponseException(String.format(
"%1$s is before %2$s", object.getMetadata().getLastModified(), modifiedSince),
null, response));
}
}
@ -580,8 +516,8 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
if (object.getMetadata().getLastModified().after(unmodifiedSince)) {
HttpResponse response = new HttpResponse();
response.setStatusCode(412);
throw new ExecutionException(new HttpResponseException(String.format(
"%1$s is after %2$s", object.getMetadata().getLastModified(),
return immediateFailedFuture(new HttpResponseException(
String.format("%1$s is after %2$s", object.getMetadata().getLastModified(),
unmodifiedSince), null, response));
}
}
@ -592,7 +528,7 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
try {
data = ByteStreams.toByteArray(returnVal.getPayload().getContent());
} catch (IOException e) {
throw new RuntimeException(e);
return immediateFailedFuture(new RuntimeException(e));
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
for (String s : options.getRanges()) {
@ -609,7 +545,8 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
int length = (last < data.length) ? last + 1 : data.length - offset;
out.write(data, offset, length);
} else {
throw new IllegalArgumentException("first and last were null!");
return immediateFailedFuture(new IllegalArgumentException(
"first and last were null!"));
}
}
@ -618,24 +555,18 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
returnVal.getMetadata().setSize(new Long(data.length));
}
returnVal.setPayload(returnVal.getPayload());
return returnVal;
}
};
return immediateFuture(returnVal);
}
public Future<BlobMetadata> blobMetadata(final String container, final String key) {
return new FutureBase<BlobMetadata>() {
public BlobMetadata get() throws InterruptedException, ExecutionException {
try {
return copy(getBlob(container, key).get().getMetadata());
return immediateFuture((BlobMetadata) copy(getBlob(container, key).get().getMetadata()));
} catch (Exception e) {
Utils.<ContainerNotFoundException> rethrowIfRuntimeOrSameType(e);
Utils.<KeyNotFoundException> rethrowIfRuntimeOrSameType(e);
throw new RuntimeException(e);// TODO
return immediateFailedFuture(e);
}
}
};
}
private Blob copyBlob(Blob object) {
Blob returnVal = blobProvider.create(copy(object.getMetadata()));
@ -648,12 +579,8 @@ public class StubAsyncBlobStore implements AsyncBlobStore {
}
public Future<Void> clearContainer(final String container) {
return new FutureBase<Void>() {
public Void get() throws InterruptedException, ExecutionException {
getContainerToBlobs().get(container).clear();
return null;
}
};
return immediateFuture(null);
}
public Future<Void> createDirectory(final String container, final String directory) {

View File

@ -57,7 +57,8 @@ public class RetryOnNotFoundGetAllBlobsStrategyTest {
@BeforeTest
void setUp() {
blobProvider = Guice.createInjector(new BlobStoreObjectModule()).getInstance(Blob.Factory.class);
blobProvider = Guice.createInjector(new BlobStoreObjectModule()).getInstance(
Blob.Factory.class);
}
@SuppressWarnings("unchecked")
@ -81,7 +82,8 @@ public class RetryOnNotFoundGetAllBlobsStrategyTest {
map.ifNotFoundRetryOtherwiseAddToSet("container", "key", futureObject, objects);
// should have retried once
assert System.currentTimeMillis() >= time + map.requestRetryMilliseconds;
assertEquals(Utils.toStringAndClose((InputStream) objects.iterator().next().getContent()), "goo");
assertEquals(Utils.toStringAndClose((InputStream) objects.iterator().next().getContent()),
"goo");
assert !objects.contains(null);
}

View File

@ -19,30 +19,35 @@
package org.jclouds.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jclouds.logging.Logger;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Transforms the result of a future as soon as it is available.
*
* Temporarily here until the following is resolved: <a
* href="http://code.google.com/p/guava-libraries/issues/detail?id=310"> guava issue 310</a>
*
* @author Adrian Cole
*/
public class FutureExceptionParser<T> implements Future<T> {
public class FutureExceptionParser<T> implements ListenableFuture<T> {
private final Future<T> delegate;
private final ListenableFuture<T> delegate;
private final Function<Exception, T> function;
private final Logger logger;
public FutureExceptionParser(Future<T> delegate, Function<Exception, T> function) {
public FutureExceptionParser(ListenableFuture<T> delegate, Function<Exception, T> function) {
this(delegate, function, Logger.NULL);
}
public FutureExceptionParser(Future<T> delegate, Function<Exception, T> function, Logger logger) {
public FutureExceptionParser(ListenableFuture<T> delegate, Function<Exception, T> function,
Logger logger) {
this.delegate = delegate;
this.function = function;
this.logger = logger;
@ -89,4 +94,9 @@ public class FutureExceptionParser<T> implements Future<T> {
return delegate.isDone();
}
@Override
public void addListener(Runnable listener, Executor exec) {
delegate.addListener(listener, exec);
}
}

View File

@ -1,64 +0,0 @@
/**
*
* Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.jclouds.logging.Logger;
import com.google.common.base.Function;
/**
* Transforms the result of a future as soon as it is available.
*
* @author Adrian Cole
*/
public class FutureFunctionCallable<F, T> implements Callable<T> {
private final Future<F> future;
private final Function<F, T> function;
private final Logger logger;
public FutureFunctionCallable(Future<F> future, Function<F, T> function) {
this(future, function, Logger.NULL);
}
public FutureFunctionCallable(Future<F> future, Function<F, T> function, Logger logger) {
this.future = future;
this.function = function;
this.logger = logger;
}
public T call() throws Exception {
try {
F input = future.get();
logger.debug("Processing intermediate result for: %s", input);
T result = function.apply(input);
logger.debug("Processed intermediate result for: %s", input);
return result;
} catch (ExecutionException e) {
if (e.getCause() instanceof Error)
throw (Error) e.getCause();
throw (Exception) e.getCause();
}
}
}

View File

@ -1,79 +0,0 @@
/**
*
* Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jclouds.logging.Logger;
import com.google.common.base.Function;
/**
* Transforms the result of a future once requested via get().
*
* @author Adrian Cole
*/
public class FutureFunctionWrapper<F, T> implements Future<T>, Function<F, T> {
private final Future<? extends F> future;
private final Function<F, T> function;
private final Logger logger;
public FutureFunctionWrapper(Future<? extends F> future, Function<F, T> function) {
this(future, function, Logger.NULL);
}
public FutureFunctionWrapper(Future<? extends F> future, Function<F, T> function, Logger logger) {
this.future = future;
this.function = function;
this.logger = logger;
}
public T apply(F input) {
logger.debug("Processing intermediate result for: %s", input);
T result = function.apply(input);
logger.debug("Processed intermediate result for: %s", input);
return result;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
public T get() throws InterruptedException, ExecutionException {
return apply(future.get());
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return apply(future.get(timeout, unit));
}
public boolean isCancelled() {
return future.isCancelled();
}
public boolean isDone() {
return future.isDone();
}
}

View File

@ -1,107 +0,0 @@
/**
*
* Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.concurrent;
import static com.google.common.base.Preconditions.checkState;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Future that is a result of a task ran within a single thread. As such, cancel is not valid, as
* the operation is already complete.
*
* This class is like {@link FutureTask} except that it does not attempt {@code Thread.interrupt() }
* which is sometimes prohibited.
*
* @author Adrian Cole
*/
public class RunnableFutureTask<V> implements Future<V> {
private ExecutionException executionException;
private InterruptedException interruptedException;
private CancellationException cancellationException;
private V value;
private boolean ran = false;
private final Callable<V> task;
public RunnableFutureTask(Callable<V> task) {
this.task = task;
}
/**
* {@inheritDoc}
*
* @param mayInterruptIfRunning
* - ignored as this cannot be called at the same time as the task is running
*/
public boolean cancel(boolean mayInterruptIfRunning) {
if (ran) {
return false;
} else {
cancellationException = new CancellationException();
}
return true;
}
public boolean isCancelled() {
return cancellationException != null;
}
public boolean isDone() {
return ran || cancellationException != null || interruptedException != null
|| executionException != null;
}
public V get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException,
TimeoutException {
return get();
}
public void run() {
try {
value = task.call();
} catch (InterruptedException e) {
interruptedException = e;
} catch (Exception e) {
executionException = new ExecutionException(e);
} finally {
ran = true;
}
}
/**
* {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
if (cancellationException != null)
throw cancellationException;
if (interruptedException != null)
throw interruptedException;
if (executionException != null)
throw executionException;
checkState(ran, "run() was never called");
return value;
}
}

View File

@ -18,9 +18,10 @@
*/
package org.jclouds.http;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Used for passing objects for response processing
*
@ -31,10 +32,11 @@ public class HttpCommandRendezvous<T> {
private final HttpCommand command;
@SuppressWarnings("unchecked")
private final SynchronousQueue rendezvous;
private final Future<T> future;
private final ListenableFuture<T> future;
@SuppressWarnings("unchecked")
public HttpCommandRendezvous(HttpCommand command, SynchronousQueue rendezvous, Future<T> future) {
public HttpCommandRendezvous(HttpCommand command, SynchronousQueue rendezvous,
ListenableFuture<T> future) {
this.command = command;
this.rendezvous = rendezvous;
this.future = future;
@ -58,7 +60,7 @@ public class HttpCommandRendezvous<T> {
return command;
}
public Future<T> getFuture() {
public ListenableFuture<T> getFuture() {
return future;
}

View File

@ -19,7 +19,8 @@
package org.jclouds.http;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Command that utilizes RESTFul apis and extracts <code>T</code> from the HttpResponse.
@ -36,5 +37,5 @@ public interface TransformingHttpCommand<T> extends HttpCommand {
* @throws ExecutionException
* if there is a fatal error preventing the command from invoking
*/
Future<T> execute() throws ExecutionException;
ListenableFuture<T> execute() throws ExecutionException;
}

View File

@ -21,7 +21,7 @@ package org.jclouds.http;
import java.util.concurrent.Future;
import com.google.common.base.Function;
import com.google.inject.internal.Nullable;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Executor which will invoke and transform the response of an {@code EndpointCommand} into generic
@ -40,11 +40,8 @@ public interface TransformingHttpCommandExecutorService {
* what to execute
* @param responseTransformer
* how to transform the response from the above command
* @param exceptionTransformer
* maps any non-critical exceptions to the return type {@code <T>}
* @return value of the intended response.
*/
public <T> Future<T> submit(HttpCommand command, Function<HttpResponse, T> responseTransformer,
@Nullable Function<Exception, T> exceptionTransformer);
public <T> ListenableFuture<T> submit(HttpCommand command, Function<HttpResponse, T> responseTransformer);
}

View File

@ -18,15 +18,16 @@
*/
package org.jclouds.http;
import java.util.concurrent.Callable;
import static com.google.common.util.concurrent.Futures.compose;
import static com.google.common.util.concurrent.Futures.makeListenable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.jclouds.concurrent.FutureFunctionCallable;
import org.jclouds.logging.Logger.LoggerFactory;
import javax.inject.Inject;
import com.google.common.base.Function;
import javax.inject.Inject;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Executor which will invoke and transform the response of an {@code EndpointCommand} into generic
@ -38,25 +39,20 @@ public class TransformingHttpCommandExecutorServiceImpl implements
TransformingHttpCommandExecutorService {
private final HttpCommandExecutorService client;
private final ExecutorService executorService;
private final LoggerFactory logFactory;
@Inject
public TransformingHttpCommandExecutorServiceImpl(HttpCommandExecutorService client,
ExecutorService executorService, LoggerFactory logFactory) {
ExecutorService executorService) {
this.client = client;
this.executorService = executorService;
this.logFactory = logFactory;
}
/**
* {@inheritDoc}
*/
public <T> Future<T> submit(HttpCommand command, Function<HttpResponse, T> responseTransformer,
Function<Exception, T> exceptionTransformer) {
public <T> ListenableFuture<T> submit(HttpCommand command, Function<HttpResponse, T> responseTransformer) {
Future<HttpResponse> responseFuture = client.submit(command);
Callable<T> valueCallable = new FutureFunctionCallable<HttpResponse, T>(responseFuture,
responseTransformer, logFactory.getLogger(responseTransformer.getClass().getName()));
return executorService.submit(valueCallable);
return compose(makeListenable(responseFuture), responseTransformer, executorService);
}
}

View File

@ -20,7 +20,6 @@ package org.jclouds.http;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Resource;
import javax.inject.Inject;
@ -32,7 +31,7 @@ import org.jclouds.logging.Logger;
import org.jclouds.rest.internal.GeneratedHttpRequest;
import com.google.common.base.Function;
import com.google.inject.internal.Nullable;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Executor which will invoke and transform the response of an {@code EndpointCommand} into generic
@ -46,7 +45,6 @@ public class TransformingHttpCommandImpl<T> implements TransformingHttpCommand<T
private final TransformingHttpCommandExecutorService executorService;
private final Function<HttpResponse, T> transformer;
private final Function<Exception, T> exceptionTransformer;
private GeneratedHttpRequest<?> request;
private volatile int failureCount;
@ -59,19 +57,17 @@ public class TransformingHttpCommandImpl<T> implements TransformingHttpCommand<T
@Inject
public TransformingHttpCommandImpl(TransformingHttpCommandExecutorService executorService,
GeneratedHttpRequest<?> request, Function<HttpResponse, T> transformer,
@Nullable Function<Exception, T> exceptionTransformer) {
GeneratedHttpRequest<?> request, Function<HttpResponse, T> transformer) {
this.request = request;
this.executorService = executorService;
this.transformer = transformer;
this.exceptionTransformer = exceptionTransformer;
this.failureCount = 0;
}
public Future<T> execute() throws ExecutionException {
public ListenableFuture<T> execute() throws ExecutionException {
if (exception != null)
throw new ExecutionException(exception);
return executorService.submit(this, transformer, exceptionTransformer);
return executorService.submit(this, transformer);
}
public int getFailureCount() {

View File

@ -53,9 +53,8 @@ public class RestModule extends AbstractModule {
@SuppressWarnings("unchecked")
public TransformingHttpCommand<?> create(GeneratedHttpRequest<?> request,
Function<HttpResponse, ?> transformer, Function<Exception, ?> exceptionTransformer) {
return new TransformingHttpCommandImpl(executorService, request, transformer,
exceptionTransformer);
Function<HttpResponse, ?> transformer) {
return new TransformingHttpCommandImpl(executorService, request, transformer);
}
}

View File

@ -42,10 +42,10 @@ import org.jclouds.logging.Logger;
import org.jclouds.rest.InvocationContext;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.TypeLiteral;
import com.google.inject.internal.Nullable;
@Singleton
public class AsyncRestClientProxy<T> implements InvocationHandler {
@ -142,7 +142,7 @@ public class AsyncRestClientProxy<T> implements InvocationHandler {
.getName(), transformer.getClass().getSimpleName());
logger.debug("Invoking %s.%s", declaring.getSimpleName(), method.getName());
Future<?> result = commandFactory.create(request, transformer, exceptionParser).execute();
ListenableFuture<?> result = commandFactory.create(request, transformer).execute();
if (exceptionParser != null) {
logger.trace("Exceptions from %s.%s are parsed by %s", declaring.getSimpleName(),
@ -164,8 +164,7 @@ public class AsyncRestClientProxy<T> implements InvocationHandler {
public static interface Factory {
public TransformingHttpCommand<?> create(GeneratedHttpRequest<?> request,
Function<HttpResponse, ?> transformer,
@Nullable Function<Exception, ?> exceptionTransformer);
Function<HttpResponse, ?> transformer);
}
@Override

View File

@ -18,12 +18,12 @@
*/
package org.jclouds.concurrent;
import static com.google.common.util.concurrent.Futures.makeListenable;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -31,51 +31,52 @@ import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Executors;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Tests behavior of FutureExceptionParser
* Tests behavior of ListenableFutureExceptionParser
*
* @author Adrian Cole
*/
@Test(groups = "unit", testName = "concurrent.FutureExceptionParserTest")
@Test(groups = "unit", testName = "concurrent.ListenableFutureExceptionParserTest")
public class FutureExceptionParserTest {
ExecutorService executorService = Executors.sameThreadExecutor();
@Test
public void testGet() throws InterruptedException, ExecutionException {
Future<?> future = createFuture(new RuntimeException("foo"));
ListenableFuture<?> future = createListenableFuture(new RuntimeException("foo"));
assertEquals(future.get(), "foo");
}
@Test(expectedExceptions = ExecutionException.class)
public void testGetUnmatched() throws InterruptedException, ExecutionException {
Future<?> future = createFuture(new Exception("foo"));
ListenableFuture<?> future = createListenableFuture(new Exception("foo"));
assertEquals(future.get(), "foo");
}
@Test
public void testGetLongTimeUnit() throws InterruptedException, ExecutionException,
TimeoutException {
Future<?> future = createFuture(new RuntimeException("foo"));
ListenableFuture<?> future = createListenableFuture(new RuntimeException("foo"));
assertEquals(future.get(1, TimeUnit.SECONDS), "foo");
}
@Test(expectedExceptions = ExecutionException.class)
public void testGetLongTimeUnitUnmatched() throws InterruptedException, ExecutionException,
TimeoutException {
Future<?> future = createFuture(new Exception("foo"));
ListenableFuture<?> future = createListenableFuture(new Exception("foo"));
assertEquals(future.get(1, TimeUnit.SECONDS), "foo");
}
@SuppressWarnings("unchecked")
private Future<?> createFuture(final Exception exception) {
Future<?> future = executorService.submit(new Callable<String>() {
private ListenableFuture<?> createListenableFuture(final Exception exception) {
ListenableFuture<?> future = makeListenable(executorService.submit(new Callable<String>() {
public String call() throws Exception {
throw exception;
}
});
}));
future = new FutureExceptionParser(future, new Function<Exception, String>() {

View File

@ -37,13 +37,10 @@ import org.jclouds.http.TransformingHttpCommandImpl;
import org.jclouds.http.functions.ReturnStringIf200;
import org.jclouds.http.internal.HttpWire;
import org.jclouds.http.internal.JavaUrlHttpCommandExecutorService;
import org.jclouds.logging.Logger;
import org.jclouds.logging.Logger.LoggerFactory;
import org.jclouds.rest.internal.RestAnnotationProcessor;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.inject.AbstractModule;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
@ -96,14 +93,7 @@ public class BackoffLimitedRetryHandlerTest {
JavaUrlHttpCommandExecutorService httpService = new JavaUrlHttpCommandExecutorService(
execService, new DelegatingRetryHandler(), new DelegatingErrorHandler(),
new HttpWire(Executors.newCachedThreadPool()));
executorService = new TransformingHttpCommandExecutorServiceImpl(httpService, execService,
new LoggerFactory() {
public Logger getLogger(String category) {
return Logger.NULL;
}
});
executorService = new TransformingHttpCommandExecutorServiceImpl(httpService, execService);
}
@Test
@ -164,14 +154,8 @@ public class BackoffLimitedRetryHandlerTest {
private HttpCommand createCommand() throws SecurityException, NoSuchMethodException {
Method method = IntegrationTestAsyncClient.class.getMethod("download", String.class);
HttpCommand command = new TransformingHttpCommandImpl<String>(executorService, processor
.createRequest(method, "1"), new ReturnStringIf200(),
new Function<Exception, String>() {
public String apply(Exception from) {
return null;
}
});
return command;
return new TransformingHttpCommandImpl<String>(executorService, processor.createRequest(
method, "1"), new ReturnStringIf200());
}
@Test

View File

@ -19,6 +19,7 @@
package org.jclouds.http.pool;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.makeListenable;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
@ -33,7 +34,6 @@ import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import org.jclouds.concurrent.FutureExceptionParser;
import org.jclouds.http.HttpCommand;
import org.jclouds.http.HttpCommandRendezvous;
import org.jclouds.http.HttpResponse;
@ -45,6 +45,7 @@ import org.jclouds.util.Utils;
import com.google.common.base.Function;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.ListenableFuture;
/**
* // TODO: Adrian: Document this!
@ -130,9 +131,8 @@ public class ConnectionPoolTransformingHttpCommandExecutorService<C> extends Bas
* will be processed via the {@link #invoke(TransformingHttpCommandExecutorService) invoke}
* method.
*/
public <T> Future<T> submit(HttpCommand command,
final Function<HttpResponse, T> responseTransformer,
Function<Exception, T> exceptionTransformer) {
public <T> ListenableFuture<T> submit(HttpCommand command,
final Function<HttpResponse, T> responseTransformer) {
exceptionIfNotActive();
final SynchronousQueue<?> channel = new SynchronousQueue<Object>();
// should block and immediately parse the response on exit.
@ -151,11 +151,9 @@ public class ConnectionPoolTransformingHttpCommandExecutorService<C> extends Bas
}
});
HttpCommandRendezvous<T> rendezvous = new HttpCommandRendezvous<T>(command, channel, future);
HttpCommandRendezvous<T> rendezvous = new HttpCommandRendezvous<T>(command, channel,
makeListenable(future));
commandQueue.add(rendezvous);
if (exceptionTransformer != null) {
return new FutureExceptionParser<T>(rendezvous.getFuture(), exceptionTransformer);
}
return rendezvous.getFuture();
}
@ -227,4 +225,5 @@ public class ConnectionPoolTransformingHttpCommandExecutorService<C> extends Bas
endpoint.getPort()));
}
}
}

View File

@ -18,6 +18,8 @@
*/
package org.jclouds.rackspace.cloudfiles.blobstore;
import static com.google.common.util.concurrent.Futures.compose;
import static com.google.common.util.concurrent.Futures.makeListenable;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import java.util.SortedSet;
@ -83,7 +85,7 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements
*/
public Future<BlobMetadata> blobMetadata(String container, String key) {
return wrapFuture(async.getObjectInfo(container, key),
return compose(makeListenable(async.getObjectInfo(container, key)),
new Function<MutableObjectInfoWithMetadata, BlobMetadata>() {
@Override
@ -91,7 +93,7 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements
return object2BlobMd.apply(from);
}
});
}, service);
}
public Future<Void> clearContainer(final String container) {
@ -129,19 +131,19 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements
org.jclouds.blobstore.options.GetOptions... optionsList) {
GetOptions httpOptions = blob2ObjectGetOptions.apply(optionsList);
Future<CFObject> returnVal = async.getObject(container, key, httpOptions);
return wrapFuture(returnVal, object2Blob);
return compose(makeListenable(returnVal), object2Blob, service);
}
public Future<? extends ListResponse<? extends ResourceMetadata>> list() {
return wrapFuture(
async.listContainers(),
return compose(
makeListenable(async.listContainers()),
new Function<SortedSet<ContainerMetadata>, org.jclouds.blobstore.domain.ListResponse<? extends ResourceMetadata>>() {
public org.jclouds.blobstore.domain.ListResponse<? extends ResourceMetadata> apply(
SortedSet<ContainerMetadata> from) {
return new ListResponseImpl<ResourceMetadata>(Iterables.transform(from,
container2ResourceMd), null, null, false);
}
});
}, service);
}
public Future<? extends ListContainerResponse<? extends ResourceMetadata>> list(
@ -150,7 +152,7 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements
.apply(optionsList);
Future<ListContainerResponse<ObjectInfo>> returnVal = async.listObjects(container,
httpOptions);
return wrapFuture(returnVal, container2ResourceList);
return compose(makeListenable(returnVal), container2ResourceList, service);
}
public Future<String> putBlob(String container, Blob blob) {
@ -174,7 +176,6 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements
public Future<Boolean> directoryExists(final String container, final String directory) {
return service.submit(new Callable<Boolean>() {
public Boolean call() throws Exception {
try {
getDirectoryStrategy.execute(CloudFilesAsyncBlobStore.this, container, directory);

View File

@ -21,7 +21,6 @@ package org.jclouds.rackspace.cloudfiles.blobstore.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.inject.Inject;
@ -29,7 +28,6 @@ import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.blobstore.strategy.GetDirectoryStrategy;
import org.jclouds.blobstore.strategy.MkdirStrategy;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.logging.Logger.LoggerFactory;
import org.jclouds.rackspace.cloudfiles.CloudFilesAsyncClient;
import org.jclouds.rackspace.cloudfiles.CloudFilesClient;
@ -41,8 +39,6 @@ import org.jclouds.rackspace.cloudfiles.blobstore.functions.ContainerToResourceM
import org.jclouds.rackspace.cloudfiles.blobstore.functions.ObjectToBlob;
import org.jclouds.rackspace.cloudfiles.blobstore.functions.ObjectToBlobMetadata;
import com.google.common.base.Function;
public class BaseCloudFilesBlobStore {
protected final CloudFilesAsyncClient async;
protected final CloudFilesClient sync;
@ -88,11 +84,6 @@ public class BaseCloudFilesBlobStore {
this.service = checkNotNull(service, "service");
}
protected <F, T> Future<T> wrapFuture(Future<? extends F> future, Function<F, T> function) {
return new FutureFunctionWrapper<F, T>(future, function, logFactory.getLogger(function
.getClass().getName()));
}
public Blob newBlob(String name) {
Blob blob = blobFactory.create(null);
blob.getMetadata().setName(name);

View File

@ -19,12 +19,12 @@
package org.jclouds.rackspace.cloudfiles.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import java.net.URI;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.inject.Inject;
@ -34,11 +34,8 @@ import org.jclouds.blobstore.domain.BlobMetadata;
import org.jclouds.blobstore.domain.ListContainerResponse;
import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions;
import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore;
import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore.FutureBase;
import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.concurrent.FutureFunctionWrapper;
import org.jclouds.http.options.GetOptions;
import org.jclouds.logging.Logger.LoggerFactory;
import org.jclouds.rackspace.cloudfiles.CloudFilesAsyncClient;
import org.jclouds.rackspace.cloudfiles.blobstore.functions.BlobToObject;
import org.jclouds.rackspace.cloudfiles.blobstore.functions.ListContainerOptionsToBlobStoreListContainerOptions;
@ -56,6 +53,7 @@ import org.jclouds.rackspace.cloudfiles.options.ListCdnContainerOptions;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
/**
* Implementation of {@link CloudFilesAsyncClient} which keeps all data in a local Map object.
@ -65,7 +63,6 @@ import com.google.common.collect.Sets;
public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient {
private final HttpGetOptionsListToGetOptions httpGetOptionsConverter;
private final StubAsyncBlobStore blobStore;
private final LoggerFactory logFactory;
private final CFObject.Factory objectProvider;
private final ObjectToBlob object2Blob;
private final BlobToObject blob2Object;
@ -74,7 +71,7 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient {
private final ResourceToObjectList resource2ObjectList;
@Inject
private StubCloudFilesAsyncClient(StubAsyncBlobStore blobStore, LoggerFactory logFactory,
private StubCloudFilesAsyncClient(StubAsyncBlobStore blobStore,
ConcurrentMap<String, ConcurrentMap<String, Blob>> containerToBlobs,
CFObject.Factory objectProvider,
HttpGetOptionsListToGetOptions httpGetOptionsConverter, ObjectToBlob object2Blob,
@ -82,7 +79,6 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient {
ListContainerOptionsToBlobStoreListContainerOptions container2ContainerListOptions,
ResourceToObjectList resource2ContainerList) {
this.blobStore = blobStore;
this.logFactory = logFactory;
this.objectProvider = objectProvider;
this.httpGetOptionsConverter = httpGetOptionsConverter;
this.object2Blob = checkNotNull(object2Blob, "object2Blob");
@ -93,17 +89,8 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient {
this.resource2ObjectList = checkNotNull(resource2ContainerList, "resource2ContainerList");
}
protected <F, T> Future<T> wrapFuture(Future<? extends F> future, Function<F, T> function) {
return new FutureFunctionWrapper<F, T>(future, function, logFactory.getLogger(function
.getClass().getName()));
}
public Future<Boolean> containerExists(final String container) {
return new FutureBase<Boolean>() {
public Boolean get() throws InterruptedException, ExecutionException {
return blobStore.getContainerToBlobs().containsKey(container);
}
};
return immediateFuture(blobStore.getContainerToBlobs().containsKey(container));
}
public Future<Boolean> createContainer(String container) {
@ -136,11 +123,12 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient {
public Future<CFObject> getObject(String container, String key, GetOptions... options) {
org.jclouds.blobstore.options.GetOptions getOptions = httpGetOptionsConverter.apply(options);
return wrapFuture(blobStore.getBlob(container, key, getOptions), blob2Object);
return Futures.compose(Futures.makeListenable(blobStore.getBlob(container, key, getOptions)),
blob2Object);
}
public Future<MutableObjectInfoWithMetadata> getObjectInfo(String container, String key) {
return wrapFuture(blobStore.blobMetadata(container, key),
return Futures.compose(Futures.makeListenable(blobStore.blobMetadata(container, key)),
new Function<BlobMetadata, MutableObjectInfoWithMetadata>() {
@Override
@ -159,24 +147,19 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient {
public Future<? extends SortedSet<ContainerMetadata>> listContainers(
org.jclouds.rackspace.cloudfiles.options.ListContainerOptions... options) {
return new FutureBase<SortedSet<ContainerMetadata>>() {
public SortedSet<ContainerMetadata> get() throws InterruptedException, ExecutionException {
return Sets.newTreeSet(Iterables.transform(blobStore.getContainerToBlobs().keySet(),
new Function<String, ContainerMetadata>() {
return immediateFuture(Sets.newTreeSet(Iterables.transform(blobStore.getContainerToBlobs()
.keySet(), new Function<String, ContainerMetadata>() {
public ContainerMetadata apply(String name) {
return new ContainerMetadata(name, -1, -1);
}
}));
}
};
})));
}
public Future<ListContainerResponse<ObjectInfo>> listObjects(String container,
org.jclouds.rackspace.cloudfiles.options.ListContainerOptions... optionsList) {
ListContainerOptions options = container2ContainerListOptions.apply(optionsList);
return wrapFuture(blobStore.list(container, options), resource2ObjectList);
return Futures.compose(Futures.makeListenable(blobStore.list(container, options)),
resource2ObjectList);
}
public Future<String> putObject(String container, CFObject object) {