diff --git a/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/AtmosAsyncBlobStore.java b/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/AtmosAsyncBlobStore.java index a4272206a1..5521ddb8b3 100644 --- a/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/AtmosAsyncBlobStore.java +++ b/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/AtmosAsyncBlobStore.java @@ -18,6 +18,8 @@ */ package org.jclouds.atmosonline.saas.blobstore; +import static com.google.common.util.concurrent.Futures.compose; +import static com.google.common.util.concurrent.Futures.makeListenable; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import java.net.URI; @@ -46,7 +48,6 @@ import org.jclouds.blobstore.domain.ResourceMetadata; import org.jclouds.blobstore.domain.Blob.Factory; import org.jclouds.blobstore.functions.BlobToHttpGetOptions; import org.jclouds.blobstore.strategy.ClearListStrategy; -import org.jclouds.concurrent.FutureFunctionCallable; import org.jclouds.encryption.EncryptionService; import org.jclouds.http.options.GetOptions; import org.jclouds.logging.Logger.LoggerFactory; @@ -77,15 +78,13 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob * This implementation uses the AtmosStorage HEAD Object command to return the result */ public Future blobMetadata(String container, String key) { - return wrapFuture(async.headFile(container + "/" + key), + return compose(makeListenable(async.headFile(container + "/" + key)), new Function() { - @Override public BlobMetadata apply(AtmosObject from) { return object2BlobMd.apply(from); } - - }); + }, service); } public Future clearContainer(final String container) { @@ -100,17 +99,18 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob } public Future createContainer(String container) { - return wrapFuture(async.createDirectory(container), new Function() { + return compose(makeListenable(async.createDirectory(container)), + new Function() { - public Boolean apply(URI from) { - return true;// no etag - } + public Boolean apply(URI from) { + return true;// no etag + } - }); + }); } public Future createDirectory(String container, String directory) { - return wrapFuture(async.createDirectory(container + "/" + directory), + return compose(makeListenable(async.createDirectory(container + "/" + directory)), new Function() { public Void apply(URI from) { @@ -151,11 +151,11 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob org.jclouds.blobstore.options.GetOptions... optionsList) { GetOptions httpOptions = blob2ObjectGetOptions.apply(optionsList); Future returnVal = async.readFile(container + "/" + key, httpOptions); - return wrapFuture(returnVal, object2Blob); + return compose(makeListenable(returnVal), object2Blob, service); } public Future> list() { - return wrapFuture(async.listDirectories(), container2ResourceList); + return compose(makeListenable(async.listDirectories()), container2ResourceList, service); } public Future> list( @@ -169,7 +169,8 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob } } ListOptions nativeOptions = container2ContainerListOptions.apply(optionsList); - return wrapFuture(async.listDirectory(container, nativeOptions), container2ResourceList); + return compose(makeListenable(async.listDirectory(container, nativeOptions)), + container2ResourceList, service); } /** @@ -177,9 +178,7 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob */ public Future putBlob(final String container, final Blob blob) { final String path = container + "/" + blob.getMetadata().getName(); - - Callable valueCallable = new FutureFunctionCallable(async - .deletePath(path), new Function() { + return compose(makeListenable(async.deletePath(path)), new Function() { public String apply(Void from) { try { @@ -200,8 +199,7 @@ public class AtmosAsyncBlobStore extends BaseAtmosBlobStore implements AsyncBlob } } - }); - return service.submit(valueCallable); + }, service); } diff --git a/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/internal/BaseAtmosBlobStore.java b/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/internal/BaseAtmosBlobStore.java index 94e8043524..d03ac32853 100644 --- a/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/internal/BaseAtmosBlobStore.java +++ b/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/internal/BaseAtmosBlobStore.java @@ -21,7 +21,6 @@ package org.jclouds.atmosonline.saas.blobstore.internal; import static com.google.common.base.Preconditions.checkNotNull; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import javax.inject.Named; @@ -36,10 +35,8 @@ import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.functions.BlobToHttpGetOptions; import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.blobstore.strategy.ClearListStrategy; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.logging.Logger.LoggerFactory; -import com.google.common.base.Function; import com.google.inject.Inject; public class BaseAtmosBlobStore { @@ -83,11 +80,6 @@ public class BaseAtmosBlobStore { this.service = checkNotNull(service, "service"); } - protected Future wrapFuture(Future future, Function function) { - return new FutureFunctionWrapper(future, function, logFactory.getLogger(function - .getClass().getName())); - } - public Blob newBlob(String name) { Blob blob = blobFactory.create(null); blob.getMetadata().setName(name); diff --git a/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/strategy/RecursiveRemove.java b/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/strategy/RecursiveRemove.java index b7000f47dc..3a779de948 100644 --- a/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/strategy/RecursiveRemove.java +++ b/atmos/src/main/java/org/jclouds/atmosonline/saas/blobstore/strategy/RecursiveRemove.java @@ -37,13 +37,13 @@ import org.jclouds.blobstore.options.ListContainerOptions; import org.jclouds.blobstore.reference.BlobStoreConstants; import org.jclouds.blobstore.strategy.ClearContainerStrategy; import org.jclouds.blobstore.strategy.ClearListStrategy; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.logging.Logger; import org.jclouds.util.Utils; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; import com.google.inject.Inject; /** @@ -88,7 +88,7 @@ public class RecursiveRemove implements ClearListStrategy, ClearContainerStrateg for (Future isdeleted : deletes) { isdeleted.get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); } - return new FutureFunctionWrapper(async.deletePath(fullPath), + return Futures.compose(async.deletePath(fullPath), new Function() { public Void apply(Void from) { diff --git a/atmos/src/test/java/org/jclouds/atmosonline/saas/internal/StubAtmosStorageAsyncClient.java b/atmos/src/test/java/org/jclouds/atmosonline/saas/internal/StubAtmosStorageAsyncClient.java index 7292417191..f06909f2c0 100644 --- a/atmos/src/test/java/org/jclouds/atmosonline/saas/internal/StubAtmosStorageAsyncClient.java +++ b/atmos/src/test/java/org/jclouds/atmosonline/saas/internal/StubAtmosStorageAsyncClient.java @@ -19,6 +19,8 @@ package org.jclouds.atmosonline.saas.internal; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; import java.net.URI; import java.util.concurrent.ExecutionException; @@ -45,13 +47,13 @@ import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.BlobMetadata; import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions; import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore; -import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore.FutureBase; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.http.options.GetOptions; -import org.jclouds.logging.Logger.LoggerFactory; -import org.jclouds.util.Utils; +import org.jclouds.rest.ResourceNotFoundException; import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; /** * Implementation of {@link AtmosStorageAsyncClient} which keeps all data in a local Map object. @@ -62,7 +64,6 @@ import com.google.common.base.Function; public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { private final HttpGetOptionsListToGetOptions httpGetOptionsConverter; private final StubAsyncBlobStore blobStore; - private final LoggerFactory logFactory; private final AtmosObject.Factory objectProvider; private final ObjectToBlob object2Blob; private final BlobToObject blob2Object; @@ -71,13 +72,12 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { private final ResourceMetadataListToDirectoryEntryList resource2ObjectList; @Inject - private StubAtmosStorageAsyncClient(StubAsyncBlobStore blobStore, LoggerFactory logFactory, + private StubAtmosStorageAsyncClient(StubAsyncBlobStore blobStore, AtmosObject.Factory objectProvider, HttpGetOptionsListToGetOptions httpGetOptionsConverter, ObjectToBlob object2Blob, BlobToObject blob2Object, BlobMetadataToObject blob2ObjectInfo, ListOptionsToBlobStoreListOptions container2ContainerListOptions, ResourceMetadataListToDirectoryEntryList resource2ContainerList) { - this.logFactory = logFactory; this.blobStore = blobStore; this.objectProvider = objectProvider; this.httpGetOptionsConverter = httpGetOptionsConverter; @@ -89,24 +89,20 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { this.resource2ObjectList = checkNotNull(resource2ContainerList, "resource2ContainerList"); } - protected Future wrapFuture(Future future, Function function) { - return new FutureFunctionWrapper(future, function, logFactory.getLogger(function - .getClass().getName())); - } - public Future createDirectory(String directoryName) { final String container; if (directoryName.indexOf('/') != -1) container = directoryName.substring(0, directoryName.indexOf('/')); else container = directoryName; - return wrapFuture(blobStore.createContainer(container), new Function() { + return Futures.compose(Futures.makeListenable(blobStore.createContainer(container)), + new Function() { - public URI apply(Boolean from) { - return URI.create("http://stub/containers/" + container); - } + public URI apply(Boolean from) { + return URI.create("http://stub/containers/" + container); + } - }); + }); } public Future createFile(String parent, AtmosObject object) { @@ -121,24 +117,26 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { object.getContentMetadata().setName(path + "/" + file); } Blob blob = object2Blob.apply(object); - return wrapFuture(blobStore.putBlob(container, blob), new Function() { + return Futures.compose(Futures.makeListenable(blobStore.putBlob(container, blob)), + new Function() { - public URI apply(String from) { - return URI.create(uri); - } + public URI apply(String from) { + return URI.create(uri); + } - }); + }); } public Future deletePath(String path) { if (path.indexOf('/') == -1) - return wrapFuture(blobStore.deleteContainerImpl(path), new Function() { + return Futures.compose(Futures.makeListenable(blobStore.deleteContainerImpl(path)), + new Function() { - public Void apply(Boolean from) { - return null; - } + public Void apply(Boolean from) { + return null; + } - }); + }); else { String container = path.substring(0, path.indexOf('/')); path = path.substring(path.indexOf('/') + 1); @@ -156,7 +154,7 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { else { String container = path.substring(0, path.indexOf('/')); path = path.substring(path.indexOf('/') + 1); - return wrapFuture(blobStore.blobMetadata(container, path), + return Futures.compose(Futures.makeListenable(blobStore.blobMetadata(container, path)), new Function() { public UserMetadata apply(BlobMetadata from) { return blob2ObjectInfo.apply(from).getUserMetadata(); @@ -169,10 +167,10 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { String container = path.substring(0, path.indexOf('/')); path = path.substring(path.indexOf('/') + 1); try { - return wrapFuture(blobStore.getBlob(container, path), blob2Object); + return Futures.compose(Futures.makeListenable(blobStore.getBlob(container, path)), + blob2Object); } catch (Exception e) { - Utils. rethrowIfRuntimeOrSameType(e); - throw new RuntimeException(e); + return immediateFailedFuture(Throwables.getRootCause(e)); } } @@ -180,7 +178,7 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { ListOptions... optionsList) { // org.jclouds.blobstore.options.ListOptions options = container2ContainerListOptions // .apply(optionsList); - return wrapFuture(blobStore.list(), resource2ObjectList); + return Futures.compose(Futures.makeListenable(blobStore.list()), resource2ObjectList); } public Future> listDirectory( @@ -194,7 +192,8 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { if (!path.equals("")) options.inDirectory(path); } - return wrapFuture(blobStore.list(container, options), resource2ObjectList); + return Futures.compose(Futures.makeListenable(blobStore.list(container, options)), + resource2ObjectList); } public AtmosObject newObject() { @@ -205,19 +204,21 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { if (path.indexOf('/') == -1 || (path.endsWith("/"))) return blobStore.containerExists(path); else { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - String container = path.substring(0, path.indexOf('/')); - String blobName = path.substring(path.indexOf('/') + 1); - try { - blobStore.blobMetadata(container, blobName).get(); - return true; - } catch (KeyNotFoundException e) { - return false; - } - } - }; - + String container = path.substring(0, path.indexOf('/')); + String blobName = path.substring(path.indexOf('/') + 1); + try { + blobStore.blobMetadata(container, blobName).get(); + return immediateFuture(true); + } catch (KeyNotFoundException e) { + return immediateFuture(false); + } catch (InterruptedException e) { + return immediateFuture(false); + } catch (ExecutionException e) { + if (Iterables.size(Iterables.filter(Throwables.getCausalChain(e), + ResourceNotFoundException.class)) >= 1) + return immediateFuture(false); + return immediateFailedFuture(e); + } } } @@ -225,7 +226,8 @@ public class StubAtmosStorageAsyncClient implements AtmosStorageAsyncClient { String container = path.substring(0, path.indexOf('/')); String blobName = path.substring(path.indexOf('/') + 1); org.jclouds.blobstore.options.GetOptions getOptions = httpGetOptionsConverter.apply(options); - return wrapFuture(blobStore.getBlob(container, blobName, getOptions), blob2Object); + return Futures.compose(Futures.makeListenable(blobStore.getBlob(container, blobName, + getOptions)), blob2Object); } public Future updateFile(String parent, AtmosObject object) { diff --git a/aws/core/src/main/java/org/jclouds/aws/handlers/ParseAWSErrorFromXmlContent.java b/aws/core/src/main/java/org/jclouds/aws/handlers/ParseAWSErrorFromXmlContent.java index 9a731d39bc..21dd305981 100755 --- a/aws/core/src/main/java/org/jclouds/aws/handlers/ParseAWSErrorFromXmlContent.java +++ b/aws/core/src/main/java/org/jclouds/aws/handlers/ParseAWSErrorFromXmlContent.java @@ -66,7 +66,7 @@ public class ParseAWSErrorFromXmlContent implements HttpErrorHandler { command.setException(new HttpResponseException(command, response, content)); Utils.rethrowIfRuntime(he); } - } else { + } else if (response.getStatusCode() == 404){ command.setException(new HttpResponseException(command, response)); } } catch (Exception e) { diff --git a/aws/core/src/main/java/org/jclouds/aws/s3/blobstore/S3AsyncBlobStore.java b/aws/core/src/main/java/org/jclouds/aws/s3/blobstore/S3AsyncBlobStore.java index a81604b8fd..2b246a1e94 100644 --- a/aws/core/src/main/java/org/jclouds/aws/s3/blobstore/S3AsyncBlobStore.java +++ b/aws/core/src/main/java/org/jclouds/aws/s3/blobstore/S3AsyncBlobStore.java @@ -40,7 +40,6 @@ import org.jclouds.aws.s3.blobstore.internal.BaseS3BlobStore; import org.jclouds.aws.s3.domain.BucketMetadata; import org.jclouds.aws.s3.domain.ListBucketResponse; import org.jclouds.aws.s3.domain.ObjectMetadata; -import org.jclouds.aws.s3.domain.S3Object; import org.jclouds.aws.s3.options.ListBucketOptions; import org.jclouds.blobstore.AsyncBlobStore; import org.jclouds.blobstore.KeyNotFoundException; @@ -60,6 +59,7 @@ import org.jclouds.logging.Logger.LoggerFactory; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import static com.google.common.util.concurrent.Futures.*; public class S3AsyncBlobStore extends BaseS3BlobStore implements AsyncBlobStore { @@ -74,15 +74,14 @@ public class S3AsyncBlobStore extends BaseS3BlobStore implements AsyncBlobStore ExecutorService service) { super(async, sync, blobFactory, logFactory, clearContainerStrategy, object2BlobMd, object2Blob, blob2Object, container2BucketListOptions, blob2ObjectGetOptions, - getDirectoryStrategy, mkdirStrategy, bucket2ResourceMd, bucket2ResourceList, - service); + getDirectoryStrategy, mkdirStrategy, bucket2ResourceMd, bucket2ResourceList, service); } /** * This implementation uses the S3 HEAD Object command to return the result */ public Future blobMetadata(String container, String key) { - return wrapFuture(async.headObject(container, key), + return compose(makeListenable(async.headObject(container, key)), new Function() { @Override @@ -90,7 +89,7 @@ public class S3AsyncBlobStore extends BaseS3BlobStore implements AsyncBlobStore return object2BlobMd.apply(from); } - }); + }, service); } public Future clearContainer(final String container) { @@ -153,27 +152,27 @@ public class S3AsyncBlobStore extends BaseS3BlobStore implements AsyncBlobStore public Future getBlob(String container, String key, org.jclouds.blobstore.options.GetOptions... optionsList) { GetOptions httpOptions = blob2ObjectGetOptions.apply(optionsList); - Future returnVal = async.getObject(container, key, httpOptions); - return wrapFuture(returnVal, object2Blob); + return compose(makeListenable(async.getObject(container, key, httpOptions)), object2Blob, + service); } public Future> list() { - return wrapFuture( - async.listOwnedBuckets(), + return compose( + makeListenable(async.listOwnedBuckets()), new Function, org.jclouds.blobstore.domain.ListResponse>() { public org.jclouds.blobstore.domain.ListResponse apply( SortedSet from) { return new ListResponseImpl(Iterables.transform(from, bucket2ResourceMd), null, null, false); } - }); + }, service); } public Future> list( String container, ListContainerOptions... optionsList) { ListBucketOptions httpOptions = container2BucketListOptions.apply(optionsList); Future returnVal = async.listBucket(container, httpOptions); - return wrapFuture(returnVal, bucket2ResourceList); + return compose(makeListenable(returnVal), bucket2ResourceList, service); } public Future putBlob(String container, Blob blob) { diff --git a/aws/core/src/main/java/org/jclouds/aws/s3/blobstore/internal/BaseS3BlobStore.java b/aws/core/src/main/java/org/jclouds/aws/s3/blobstore/internal/BaseS3BlobStore.java index 83d57cfe00..2266ca0713 100644 --- a/aws/core/src/main/java/org/jclouds/aws/s3/blobstore/internal/BaseS3BlobStore.java +++ b/aws/core/src/main/java/org/jclouds/aws/s3/blobstore/internal/BaseS3BlobStore.java @@ -21,7 +21,6 @@ package org.jclouds.aws.s3.blobstore.internal; import static com.google.common.base.Preconditions.checkNotNull; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import javax.inject.Inject; @@ -38,11 +37,8 @@ import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.strategy.ClearListStrategy; import org.jclouds.blobstore.strategy.GetDirectoryStrategy; import org.jclouds.blobstore.strategy.MkdirStrategy; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.logging.Logger.LoggerFactory; -import com.google.common.base.Function; - public class BaseS3BlobStore { protected final S3AsyncClient async; protected final S3Client sync; @@ -80,19 +76,13 @@ public class BaseS3BlobStore { this.container2BucketListOptions = checkNotNull(container2BucketListOptions, "container2BucketListOptions"); this.blob2ObjectGetOptions = checkNotNull(blob2ObjectGetOptions, "blob2ObjectGetOptions"); - this.getDirectoryStrategy = checkNotNull(getDirectoryStrategy, - "getDirectoryStrategy"); + this.getDirectoryStrategy = checkNotNull(getDirectoryStrategy, "getDirectoryStrategy"); this.mkdirStrategy = checkNotNull(mkdirStrategy, "mkdirStrategy"); this.bucket2ResourceMd = checkNotNull(bucket2ResourceMd, "bucket2ResourceMd"); this.bucket2ResourceList = checkNotNull(bucket2ResourceList, "bucket2ResourceList"); this.service = checkNotNull(service, "service"); } - protected Future wrapFuture(Future future, Function function) { - return new FutureFunctionWrapper(future, function, logFactory.getLogger(function - .getClass().getName())); - } - public Blob newBlob(String name) { Blob blob = blobFactory.create(null); blob.getMetadata().setName(name); diff --git a/aws/core/src/test/java/org/jclouds/aws/s3/internal/StubS3AsyncClient.java b/aws/core/src/test/java/org/jclouds/aws/s3/internal/StubS3AsyncClient.java index 10ce528bd7..5022907f3f 100755 --- a/aws/core/src/test/java/org/jclouds/aws/s3/internal/StubS3AsyncClient.java +++ b/aws/core/src/test/java/org/jclouds/aws/s3/internal/StubS3AsyncClient.java @@ -19,13 +19,14 @@ package org.jclouds.aws.s3.internal; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; import java.util.Date; import java.util.Map; import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.inject.Inject; @@ -61,16 +62,14 @@ import org.jclouds.blobstore.domain.BlobMetadata; import org.jclouds.blobstore.domain.MutableBlobMetadata; import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions; import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore; -import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore.FutureBase; import org.jclouds.blobstore.options.ListContainerOptions; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.date.DateService; import org.jclouds.http.options.GetOptions; -import org.jclouds.logging.Logger.LoggerFactory; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; /** * Implementation of {@link S3AsyncBlobStore} which keeps all data in a local Map object. @@ -83,7 +82,6 @@ public class StubS3AsyncClient implements S3AsyncClient { private final DateService dateService; private final HttpGetOptionsListToGetOptions httpGetOptionsConverter; private final StubAsyncBlobStore blobStore; - private final LoggerFactory logFactory; private final S3Object.Factory objectProvider; private final Blob.Factory blobProvider; private final ObjectToBlob object2Blob; @@ -93,7 +91,7 @@ public class StubS3AsyncClient implements S3AsyncClient { private final ResourceToBucketList resource2BucketList; @Inject - private StubS3AsyncClient(StubAsyncBlobStore blobStore, LoggerFactory logFactory, + private StubS3AsyncClient(StubAsyncBlobStore blobStore, ConcurrentMap> containerToBlobs, DateService dateService, S3Object.Factory objectProvider, Blob.Factory blobProvider, HttpGetOptionsListToGetOptions httpGetOptionsConverter, ObjectToBlob object2Blob, @@ -101,7 +99,6 @@ public class StubS3AsyncClient implements S3AsyncClient { BucketToContainerListOptions bucket2ContainerListOptions, ResourceToBucketList resource2BucketList) { this.blobStore = blobStore; - this.logFactory = logFactory; this.objectProvider = objectProvider; this.blobProvider = blobProvider; this.dateService = dateService; @@ -125,11 +122,6 @@ public class StubS3AsyncClient implements S3AsyncClient { public static final String DEFAULT_OWNER_ID = "abc123"; - protected Future wrapFuture(Future future, Function function) { - return new FutureFunctionWrapper(future, function, logFactory.getLogger(function - .getClass().getName())); - } - public Future putBucketIfNotExists(String name, PutBucketOptions... optionsList) { final PutBucketOptions options = (optionsList.length == 0) ? new PutBucketOptions() : optionsList[0]; @@ -141,7 +133,8 @@ public class StubS3AsyncClient implements S3AsyncClient { public Future listBucket(final String name, ListBucketOptions... optionsList) { ListContainerOptions options = bucket2ContainerListOptions.apply(optionsList); - return wrapFuture(blobStore.list(name, options), resource2BucketList); + return Futures.compose(Futures.makeListenable(blobStore.list(name, options)), + resource2BucketList); } public Future copyObject(final String sourceBucket, final String sourceObject, @@ -149,49 +142,41 @@ public class StubS3AsyncClient implements S3AsyncClient { CopyObjectOptions... nullableOptions) { final CopyObjectOptions options = (nullableOptions.length == 0) ? new CopyObjectOptions() : nullableOptions[0]; - return new FutureBase() { - public ObjectMetadata get() throws InterruptedException, ExecutionException { - ConcurrentMap source = blobStore.getContainerToBlobs().get(sourceBucket); - ConcurrentMap dest = blobStore.getContainerToBlobs().get( - destinationBucket); - if (source.containsKey(sourceObject)) { - Blob object = source.get(sourceObject); - if (options.getIfMatch() != null) { - if (!object.getMetadata().getETag().equals(options.getIfMatch())) - blobStore.throwResponseException(412); - - } - if (options.getIfNoneMatch() != null) { - if (object.getMetadata().getETag().equals(options.getIfNoneMatch())) - blobStore.throwResponseException(412); - } - if (options.getIfModifiedSince() != null) { - Date modifiedSince = dateService.rfc822DateParse(options.getIfModifiedSince()); - if (modifiedSince.after(object.getMetadata().getLastModified())) - blobStore.throwResponseException(412); - - } - if (options.getIfUnmodifiedSince() != null) { - Date unmodifiedSince = dateService - .rfc822DateParse(options.getIfUnmodifiedSince()); - if (unmodifiedSince.before(object.getMetadata().getLastModified())) - blobStore.throwResponseException(412); - } - Blob sourceS3 = source.get(sourceObject); - MutableBlobMetadata newMd = blobStore - .copy(sourceS3.getMetadata(), destinationObject); - if (options.getAcl() != null) - keyToAcl.put(destinationBucket + "/" + destinationObject, options.getAcl()); - - newMd.setLastModified(new Date()); - Blob newBlob = blobProvider.create(newMd); - newBlob.setPayload(sourceS3.getContent()); - dest.put(destinationObject, newBlob); - return blob2ObjectMetadata.apply(blobStore.copy(newMd)); - } - throw new KeyNotFoundException(sourceBucket, sourceObject); + ConcurrentMap source = blobStore.getContainerToBlobs().get(sourceBucket); + ConcurrentMap dest = blobStore.getContainerToBlobs().get(destinationBucket); + if (source.containsKey(sourceObject)) { + Blob object = source.get(sourceObject); + if (options.getIfMatch() != null) { + if (!object.getMetadata().getETag().equals(options.getIfMatch())) + return immediateFailedFuture(blobStore.returnResponseException(412)); } - }; + if (options.getIfNoneMatch() != null) { + if (object.getMetadata().getETag().equals(options.getIfNoneMatch())) + return immediateFailedFuture(blobStore.returnResponseException(412)); + } + if (options.getIfModifiedSince() != null) { + Date modifiedSince = dateService.rfc822DateParse(options.getIfModifiedSince()); + if (modifiedSince.after(object.getMetadata().getLastModified())) + return immediateFailedFuture(blobStore.returnResponseException(412)); + + } + if (options.getIfUnmodifiedSince() != null) { + Date unmodifiedSince = dateService.rfc822DateParse(options.getIfUnmodifiedSince()); + if (unmodifiedSince.before(object.getMetadata().getLastModified())) + return immediateFailedFuture(blobStore.returnResponseException(412)); + } + Blob sourceS3 = source.get(sourceObject); + MutableBlobMetadata newMd = blobStore.copy(sourceS3.getMetadata(), destinationObject); + if (options.getAcl() != null) + keyToAcl.put(destinationBucket + "/" + destinationObject, options.getAcl()); + + newMd.setLastModified(new Date()); + Blob newBlob = blobProvider.create(newMd); + newBlob.setPayload(sourceS3.getContent()); + dest.put(destinationObject, newBlob); + return immediateFuture((ObjectMetadata) blob2ObjectMetadata.apply(blobStore.copy(newMd))); + } + return immediateFailedFuture(new KeyNotFoundException(sourceBucket, sourceObject)); } public Future putObject(final String bucketName, final S3Object object, @@ -220,19 +205,11 @@ public class StubS3AsyncClient implements S3AsyncClient { } public Future getBucketACL(final String bucket) { - return new FutureBase() { - public AccessControlList get() throws InterruptedException, ExecutionException { - return getACLforS3Item(bucket); - } - }; + return immediateFuture(getACLforS3Item(bucket)); } public Future getObjectACL(final String bucket, final String objectKey) { - return new FutureBase() { - public AccessControlList get() throws InterruptedException, ExecutionException { - return getACLforS3Item(bucket + "/" + objectKey); - } - }; + return immediateFuture(getACLforS3Item(bucket + "/" + objectKey)); } /** @@ -259,30 +236,18 @@ public class StubS3AsyncClient implements S3AsyncClient { } public Future putBucketACL(final String bucket, final AccessControlList acl) { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - keyToAcl.put(bucket, sanitizeUploadedACL(acl)); - return true; - } - }; + keyToAcl.put(bucket, sanitizeUploadedACL(acl)); + return immediateFuture(true); } public Future putObjectACL(final String bucket, final String objectKey, final AccessControlList acl) { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - keyToAcl.put(bucket + "/" + objectKey, sanitizeUploadedACL(acl)); - return true; - } - }; + keyToAcl.put(bucket + "/" + objectKey, sanitizeUploadedACL(acl)); + return immediateFuture(true); } public Future bucketExists(final String bucketName) { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - return blobStore.getContainerToBlobs().containsKey(bucketName); - } - }; + return immediateFuture(blobStore.getContainerToBlobs().containsKey(bucketName)); } public Future deleteBucketIfEmpty(String bucketName) { @@ -296,36 +261,28 @@ public class StubS3AsyncClient implements S3AsyncClient { public Future getObject(final String bucketName, final String key, final GetOptions... options) { org.jclouds.blobstore.options.GetOptions getOptions = httpGetOptionsConverter.apply(options); - return wrapFuture(blobStore.getBlob(bucketName, key, getOptions), blob2Object); + return Futures.compose( + Futures.makeListenable(blobStore.getBlob(bucketName, key, getOptions)), blob2Object); } public Future headObject(String bucketName, String key) { - return wrapFuture(blobStore.blobMetadata(bucketName, key), + return Futures.compose(Futures.makeListenable(blobStore.blobMetadata(bucketName, key)), new Function() { - @Override public ObjectMetadata apply(BlobMetadata from) { - return blob2ObjectMetadata.apply(from); } - }); } public Future> listOwnedBuckets() { - return new FutureBase>() { - - public SortedSet get() throws InterruptedException, ExecutionException { - return Sets.newTreeSet(Iterables.transform(blobStore.getContainerToBlobs().keySet(), - new Function() { - public BucketMetadata apply(String name) { - return new BucketMetadata(name, null, null); - } - - })); + return immediateFuture(Sets.newTreeSet(Iterables.transform(blobStore.getContainerToBlobs() + .keySet(), new Function() { + public BucketMetadata apply(String name) { + return new BucketMetadata(name, null, null); } - }; + }))); } public S3Object newS3Object() { @@ -334,56 +291,33 @@ public class StubS3AsyncClient implements S3AsyncClient { @Override public Future getBucketLocation(String bucketName) { - return new FutureBase() { - public LocationConstraint get() throws InterruptedException, ExecutionException { - return LocationConstraint.US_STANDARD; - } - }; + return immediateFuture(LocationConstraint.US_STANDARD); } @Override public Future getBucketPayer(String bucketName) { - return new FutureBase() { - public Payer get() throws InterruptedException, ExecutionException { - return Payer.BUCKET_OWNER; - } - }; + return immediateFuture(Payer.BUCKET_OWNER); } @Override public Future setBucketPayer(String bucketName, Payer payer) { - return new FutureBase() { - public Void get() throws InterruptedException, ExecutionException { - return null; - } - }; + return immediateFuture(null); + } @Override public Future disableBucketLogging(String bucketName) { - return new FutureBase() { - public Void get() throws InterruptedException, ExecutionException { - return null; - } - }; + return immediateFuture(null); } @Override public Future enableBucketLogging(String bucketName, BucketLogging logging) { - return new FutureBase() { - public Void get() throws InterruptedException, ExecutionException { - return null; - } - }; + return immediateFuture(null); } @Override public Future getBucketLogging(String bucketName) { - return new FutureBase() { - public BucketLogging get() throws InterruptedException, ExecutionException { - return null; - } - }; + return immediateFuture(null); } } diff --git a/azure/src/main/java/org/jclouds/azure/storage/blob/blobstore/AzureAsyncBlobStore.java b/azure/src/main/java/org/jclouds/azure/storage/blob/blobstore/AzureAsyncBlobStore.java index 9c7cc17bf5..0fea5943d1 100644 --- a/azure/src/main/java/org/jclouds/azure/storage/blob/blobstore/AzureAsyncBlobStore.java +++ b/azure/src/main/java/org/jclouds/azure/storage/blob/blobstore/AzureAsyncBlobStore.java @@ -18,6 +18,8 @@ */ package org.jclouds.azure.storage.blob.blobstore; +import static com.google.common.util.concurrent.Futures.compose; +import static com.google.common.util.concurrent.Futures.makeListenable; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import java.util.SortedSet; @@ -81,7 +83,7 @@ public class AzureAsyncBlobStore extends BaseAzureBlobStore implements AsyncBlob * This implementation uses the AzureBlob HEAD Object command to return the result */ public Future blobMetadata(String container, String key) { - return wrapFuture(async.getBlobProperties(container, key), + return compose(makeListenable(async.getBlobProperties(container, key)), new Function() { @Override @@ -89,7 +91,7 @@ public class AzureAsyncBlobStore extends BaseAzureBlobStore implements AsyncBlob return object2BlobMd.apply(from); } - }); + }, service); } public Future clearContainer(final String container) { @@ -120,26 +122,26 @@ public class AzureAsyncBlobStore extends BaseAzureBlobStore implements AsyncBlob org.jclouds.blobstore.options.GetOptions... optionsList) { GetOptions httpOptions = blob2ObjectGetOptions.apply(optionsList); Future returnVal = async.getBlob(container, key, httpOptions); - return wrapFuture(returnVal, object2Blob); + return compose(makeListenable(returnVal), object2Blob, service); } public Future> list() { - return wrapFuture( - async.listContainers(), + return compose( + makeListenable(async.listContainers()), new Function, org.jclouds.blobstore.domain.ListResponse>() { public org.jclouds.blobstore.domain.ListResponse apply( SortedSet from) { return new ListResponseImpl(Iterables.transform(from, container2ResourceMd), null, null, false); } - }); + }, service); } public Future> list( String container, ListContainerOptions... optionsList) { ListBlobsOptions httpOptions = container2ContainerListOptions.apply(optionsList); Future returnVal = async.listBlobs(container, httpOptions); - return wrapFuture(returnVal, container2ResourceList); + return compose(makeListenable(returnVal), container2ResourceList, service); } public Future putBlob(String container, Blob blob) { diff --git a/azure/src/main/java/org/jclouds/azure/storage/blob/blobstore/internal/BaseAzureBlobStore.java b/azure/src/main/java/org/jclouds/azure/storage/blob/blobstore/internal/BaseAzureBlobStore.java index 7b98cfdace..e6c8a7de02 100644 --- a/azure/src/main/java/org/jclouds/azure/storage/blob/blobstore/internal/BaseAzureBlobStore.java +++ b/azure/src/main/java/org/jclouds/azure/storage/blob/blobstore/internal/BaseAzureBlobStore.java @@ -21,7 +21,6 @@ package org.jclouds.azure.storage.blob.blobstore.internal; import static com.google.common.base.Preconditions.checkNotNull; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import javax.inject.Inject; @@ -38,11 +37,8 @@ import org.jclouds.blobstore.functions.BlobToHttpGetOptions; import org.jclouds.blobstore.strategy.ClearListStrategy; import org.jclouds.blobstore.strategy.GetDirectoryStrategy; import org.jclouds.blobstore.strategy.MkdirStrategy; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.logging.Logger.LoggerFactory; -import com.google.common.base.Function; - public class BaseAzureBlobStore { protected final AzureBlobAsyncClient async; protected final AzureBlobClient sync; @@ -87,11 +83,6 @@ public class BaseAzureBlobStore { this.service = checkNotNull(service, "service"); } - protected Future wrapFuture(Future future, Function function) { - return new FutureFunctionWrapper(future, function, logFactory.getLogger(function - .getClass().getName())); - } - public Blob newBlob(String name) { Blob blob = blobFactory.create(null); blob.getMetadata().setName(name); diff --git a/azure/src/test/java/org/jclouds/azure/storage/blob/internal/StubAzureBlobAsyncClient.java b/azure/src/test/java/org/jclouds/azure/storage/blob/internal/StubAzureBlobAsyncClient.java index 96b41c9015..9dc115a5de 100644 --- a/azure/src/test/java/org/jclouds/azure/storage/blob/internal/StubAzureBlobAsyncClient.java +++ b/azure/src/test/java/org/jclouds/azure/storage/blob/internal/StubAzureBlobAsyncClient.java @@ -19,12 +19,12 @@ package org.jclouds.azure.storage.blob.internal; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.util.concurrent.Futures.immediateFuture; import java.net.URI; import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.inject.Inject; @@ -51,13 +51,11 @@ import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.BlobMetadata; import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions; import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore; -import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore.FutureBase; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.http.options.GetOptions; -import org.jclouds.logging.Logger.LoggerFactory; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; /** * Implementation of {@link AzureBlobAsyncClient} which keeps all data in a local Map object. @@ -68,7 +66,6 @@ import com.google.common.collect.Iterables; public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient { private final HttpGetOptionsListToGetOptions httpGetOptionsConverter; private final StubAsyncBlobStore blobStore; - private final LoggerFactory logFactory; private final AzureBlob.Factory objectProvider; private final AzureBlobToBlob object2Blob; private final BlobToAzureBlob blob2Object; @@ -78,14 +75,13 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient { private final ConcurrentMap> containerToBlobs; @Inject - private StubAzureBlobAsyncClient(StubAsyncBlobStore blobStore, LoggerFactory logFactory, + private StubAzureBlobAsyncClient(StubAsyncBlobStore blobStore, ConcurrentMap> containerToBlobs, AzureBlob.Factory objectProvider, HttpGetOptionsListToGetOptions httpGetOptionsConverter, AzureBlobToBlob object2Blob, BlobToAzureBlob blob2Object, BlobMetadataToBlobProperties blob2ObjectInfo, ListBlobsOptionsToListOptions container2ContainerListOptions, ResourceToListBlobsResponse resource2ContainerList) { - this.logFactory = logFactory; this.containerToBlobs = containerToBlobs; this.blobStore = blobStore; this.objectProvider = objectProvider; @@ -98,11 +94,6 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient { this.resource2ObjectList = checkNotNull(resource2ContainerList, "resource2ContainerList"); } - protected Future wrapFuture(Future future, Function function) { - return new FutureFunctionWrapper(future, function, logFactory.getLogger(function - .getClass().getName())); - } - public Future createContainer(String container, CreateContainerOptions... options) { return blobStore.createContainer(container); } @@ -116,12 +107,8 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient { } public Future deleteContainer(final String container) { - return new FutureBase() { - public Void get() throws InterruptedException, ExecutionException { - StubAzureBlobAsyncClient.this.containerToBlobs.remove(container); - return null; - } - }; + StubAzureBlobAsyncClient.this.containerToBlobs.remove(container); + return immediateFuture(null); } public Future deleteRootContainer() { @@ -130,11 +117,12 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient { public Future getBlob(String container, String key, GetOptions... options) { org.jclouds.blobstore.options.GetOptions getOptions = httpGetOptionsConverter.apply(options); - return wrapFuture(blobStore.getBlob(container, key, getOptions), blob2Object); + return Futures.compose(Futures.makeListenable(blobStore.getBlob(container, key, getOptions)), + blob2Object); } public Future getBlobProperties(String container, String key) { - return wrapFuture(blobStore.blobMetadata(container, key), + return Futures.compose(Futures.makeListenable(blobStore.blobMetadata(container, key)), new Function() { @Override @@ -153,7 +141,8 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient { public Future listBlobs(String container, ListBlobsOptions... optionsList) { org.jclouds.blobstore.options.ListContainerOptions options = container2ContainerListOptions .apply(optionsList); - return wrapFuture(blobStore.list(container, options), resource2ObjectList); + return Futures.compose(Futures.makeListenable(blobStore.list(container, options)), + resource2ObjectList); } public Future listBlobs(ListBlobsOptions... options) { @@ -162,21 +151,15 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient { public Future> listContainers( ListOptions... listOptions) { - return new FutureBase>() { + return immediateFuture(new BoundedTreeSet(Iterables.transform( + blobStore.getContainerToBlobs().keySet(), + new Function() { + public ListableContainerProperties apply(String name) { + return new ListableContainerPropertiesImpl(URI.create("http://stub/" + name), + new Date(), ""); + } - public BoundedSortedSet get() throws InterruptedException, - ExecutionException { - return new BoundedTreeSet(Iterables.transform(blobStore - .getContainerToBlobs().keySet(), - new Function() { - public ListableContainerProperties apply(String name) { - return new ListableContainerPropertiesImpl(URI.create("http://stub/" - + name), new Date(), ""); - } - - }), null, null, null, null, null); - } - }; + }), null, null, null, null, null)); } public AzureBlob newBlob() { @@ -196,11 +179,7 @@ public class StubAzureBlobAsyncClient implements AzureBlobAsyncClient { } public Future containerExists(final String container) { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - return blobStore.getContainerToBlobs().containsKey(container); - } - }; + return immediateFuture(blobStore.getContainerToBlobs().containsKey(container)); } } diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobMap.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobMap.java index 215401d665..d56c082f81 100755 --- a/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobMap.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/BaseBlobMap.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import javax.inject.Named; import org.jclouds.blobstore.AsyncBlobStore; -import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.domain.BlobMetadata; import org.jclouds.blobstore.domain.ListResponse; @@ -43,9 +42,11 @@ import org.jclouds.blobstore.strategy.ContainsValueInListStrategy; import org.jclouds.blobstore.strategy.CountListStrategy; import org.jclouds.blobstore.strategy.GetBlobsInListStrategy; import org.jclouds.blobstore.strategy.ListBlobMetadataStrategy; +import org.jclouds.rest.ResourceNotFoundException; import org.jclouds.util.Utils; import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.inject.Inject; @@ -204,9 +205,10 @@ public abstract class BaseBlobMap { try { return connection.blobMetadata(containerName, realKey).get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS) != null; - } catch (KeyNotFoundException e) { - return false; } catch (Exception e) { + if (Iterables.size(Iterables.filter(Throwables.getCausalChain(e), + ResourceNotFoundException.class)) >= 1) + return false; Utils. rethrowIfRuntimeOrSameType(e); throw new BlobRuntimeException(String.format("Error searching for %1$s:%2$s", containerName, realKey), e); diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/BlobMapImpl.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/BlobMapImpl.java index c534ca93c8..ee8703d0dc 100755 --- a/blobstore/src/main/java/org/jclouds/blobstore/internal/BlobMapImpl.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/BlobMapImpl.java @@ -27,8 +27,8 @@ import java.util.concurrent.TimeUnit; import javax.inject.Inject; -import org.jclouds.blobstore.BlobMap; import org.jclouds.blobstore.AsyncBlobStore; +import org.jclouds.blobstore.BlobMap; import org.jclouds.blobstore.KeyNotFoundException; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.ListContainerOptions; @@ -40,6 +40,7 @@ import org.jclouds.blobstore.strategy.ListBlobMetadataStrategy; import org.jclouds.util.Utils; import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Collections2; import com.google.common.collect.Sets; @@ -118,15 +119,13 @@ public class BlobMapImpl extends BaseBlobMap implements BlobMap { return stripPrefix(connection.getBlob(containerName, realKey).get( requestTimeoutMilliseconds, TimeUnit.MILLISECONDS)); } catch (Exception e) { - if (e instanceof KeyNotFoundException) + Throwable cause = Throwables.getRootCause(e); + if (cause instanceof KeyNotFoundException) { return null; - // the following will unwrap any exceptions, so we should double-check that it - // wasn't unwrapped to a KNFE - e = Utils. rethrowIfRuntimeOrSameType(e); - if (e instanceof KeyNotFoundException) - return null; - throw new BlobRuntimeException(String.format("Error geting object %1$s:%2$s", - containerName, realKey), e); + } else { + throw new BlobRuntimeException(String.format("Error geting object %1$s:%2$s", + containerName, realKey), cause); + } } } diff --git a/blobstore/src/main/java/org/jclouds/blobstore/internal/InputStreamMapImpl.java b/blobstore/src/main/java/org/jclouds/blobstore/internal/InputStreamMapImpl.java index 6336ba976d..22bea9ef9e 100755 --- a/blobstore/src/main/java/org/jclouds/blobstore/internal/InputStreamMapImpl.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/internal/InputStreamMapImpl.java @@ -49,6 +49,7 @@ import org.jclouds.util.Utils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Collections2; import com.google.common.collect.Sets; @@ -86,7 +87,10 @@ public class InputStreamMapImpl extends BaseBlobMap implements Inpu } catch (KeyNotFoundException e) { return null; } catch (Exception e) { - Utils. rethrowIfRuntimeOrSameType(e); + Throwable cause = Throwables.getRootCause(e); + if (cause instanceof KeyNotFoundException) + return null; + Throwables.propagateIfInstanceOf(e, BlobRuntimeException.class); throw new BlobRuntimeException(String.format("Error geting object %1$s:%2$s", containerName, realKey), e); } diff --git a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/GetAllBlobsInListAndRetryOnFailure.java b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/GetAllBlobsInListAndRetryOnFailure.java index 69dd3b0b42..d348b9c054 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/GetAllBlobsInListAndRetryOnFailure.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/GetAllBlobsInListAndRetryOnFailure.java @@ -42,6 +42,7 @@ import org.jclouds.blobstore.strategy.ListBlobMetadataStrategy; import org.jclouds.util.Utils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.inject.Inject; @@ -107,11 +108,15 @@ public class GetAllBlobsInListAndRetryOnFailure implements GetBlobsInListStrateg object.getMetadata().setName(key); objects.add(object); return; - } catch (KeyNotFoundException e) { - Thread.sleep(requestRetryMilliseconds); - value = connection.getBlob(container, key); + } catch (Exception e) { + Throwable cause = Throwables.getRootCause(e); + if (cause instanceof KeyNotFoundException) { + Thread.sleep(requestRetryMilliseconds); + value = connection.getBlob(container, key); + } else { + Throwables.propagate(e); + } } } } - } \ No newline at end of file diff --git a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/StubAsyncBlobStore.java b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/StubAsyncBlobStore.java index 21707d1688..d83532f14e 100755 --- a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/StubAsyncBlobStore.java +++ b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/StubAsyncBlobStore.java @@ -19,6 +19,8 @@ package org.jclouds.blobstore.integration.internal; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; +import static com.google.common.util.concurrent.Futures.immediateFuture; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -41,11 +43,8 @@ import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import javax.inject.Inject; import javax.ws.rs.core.HttpHeaders; @@ -154,109 +153,99 @@ public class StubAsyncBlobStore implements AsyncBlobStore { } public Future getBlob(final String bucketName, final String key) { - return new FutureBase() { - public Blob get() throws InterruptedException, ExecutionException { - if (!getContainerToBlobs().containsKey(bucketName)) - throw new ContainerNotFoundException(bucketName); - Map realContents = getContainerToBlobs().get(bucketName); - if (!realContents.containsKey(key)) - throw new KeyNotFoundException(bucketName, key); - - Blob object = realContents.get(key); - Blob returnVal = blobProvider.create(copy(object.getMetadata())); - returnVal.setPayload(object.getContent()); - return returnVal; - } - }; + if (!getContainerToBlobs().containsKey(bucketName)) + return immediateFailedFuture(new ContainerNotFoundException(bucketName)); + Map realContents = getContainerToBlobs().get(bucketName); + if (!realContents.containsKey(key)) + return immediateFailedFuture(new KeyNotFoundException(bucketName, key)); + Blob object = realContents.get(key); + Blob returnVal = blobProvider.create(copy(object.getMetadata())); + returnVal.setPayload(object.getContent()); + return immediateFuture(returnVal); } public Future> list( final String name, ListContainerOptions... optionsList) { final ListContainerOptions options = (optionsList.length == 0) ? new ListContainerOptions() : optionsList[0]; - return new FutureBase>() { - public ListContainerResponse get() throws InterruptedException, - ExecutionException { - final Map realContents = getContainerToBlobs().get(name); - if (realContents == null) - throw new ContainerNotFoundException(name); + final Map realContents = getContainerToBlobs().get(name); - SortedSet contents = Sets.newTreeSet(Iterables.transform(realContents - .keySet(), new Function() { - public ResourceMetadata apply(String key) { - MutableBlobMetadata md = copy(realContents.get(key).getMetadata()); - if (isDirectoryStrategy.execute(md)) - md.setType(ResourceType.RELATIVE_PATH); - return md; - } - })); + if (realContents == null) + return immediateFailedFuture(new ContainerNotFoundException(name)); - if (options.getMarker() != null) { - final String finalMarker = options.getMarker(); - ResourceMetadata lastMarkerMetadata = Iterables.find(contents, - new Predicate() { - public boolean apply(ResourceMetadata metadata) { - return metadata.getName().equals(finalMarker); - } - }); - contents = contents.tailSet(lastMarkerMetadata); - contents.remove(lastMarkerMetadata); - } - - final String prefix = options.getDir(); - if (prefix != null) { - contents = Sets.newTreeSet(Iterables.filter(contents, - new Predicate() { - public boolean apply(ResourceMetadata o) { - return (o != null && o.getName().startsWith(prefix)); - } - })); - } - - int maxResults = contents.size(); - boolean truncated = false; - String marker = null; - if (options.getMaxResults() != null && contents.size() > 0) { - SortedSet contentsSlice = firstSliceOfSize(contents, options - .getMaxResults().intValue()); - maxResults = options.getMaxResults(); - if (!contentsSlice.contains(contents.last())) { - // Partial listing - truncated = true; - marker = contentsSlice.last().getName(); - } else { - marker = null; - } - contents = contentsSlice; - } - - final String delimiter = options.isRecursive() ? null : "/"; - if (delimiter != null) { - SortedSet commonPrefixes = null; - Iterable iterable = Iterables.transform(contents, new CommonPrefixes( - prefix != null ? prefix : null, delimiter)); - commonPrefixes = iterable != null ? Sets.newTreeSet(iterable) - : new TreeSet(); - commonPrefixes.remove(CommonPrefixes.NO_PREFIX); - - contents = Sets.newTreeSet(Iterables.filter(contents, new DelimiterFilter( - prefix != null ? prefix : null, delimiter))); - - Iterables. addAll(contents, Iterables.transform(commonPrefixes, - new Function() { - public ResourceMetadata apply(String o) { - MutableResourceMetadata md = new MutableResourceMetadataImpl(); - md.setType(ResourceType.RELATIVE_PATH); - md.setName(o); - return md; - } - })); - } - return new ListContainerResponseImpl(contents, prefix, marker, - maxResults, truncated); + SortedSet contents = Sets.newTreeSet(Iterables.transform(realContents + .keySet(), new Function() { + public ResourceMetadata apply(String key) { + MutableBlobMetadata md = copy(realContents.get(key).getMetadata()); + if (isDirectoryStrategy.execute(md)) + md.setType(ResourceType.RELATIVE_PATH); + return md; } - }; + })); + + if (options.getMarker() != null) { + final String finalMarker = options.getMarker(); + ResourceMetadata lastMarkerMetadata = Iterables.find(contents, + new Predicate() { + public boolean apply(ResourceMetadata metadata) { + return metadata.getName().equals(finalMarker); + } + }); + contents = contents.tailSet(lastMarkerMetadata); + contents.remove(lastMarkerMetadata); + } + + final String prefix = options.getDir(); + if (prefix != null) { + contents = Sets.newTreeSet(Iterables.filter(contents, new Predicate() { + public boolean apply(ResourceMetadata o) { + return (o != null && o.getName().startsWith(prefix)); + } + })); + } + + int maxResults = contents.size(); + boolean truncated = false; + String marker = null; + if (options.getMaxResults() != null && contents.size() > 0) { + SortedSet contentsSlice = firstSliceOfSize(contents, options + .getMaxResults().intValue()); + maxResults = options.getMaxResults(); + if (!contentsSlice.contains(contents.last())) { + // Partial listing + truncated = true; + marker = contentsSlice.last().getName(); + } else { + marker = null; + } + contents = contentsSlice; + } + + final String delimiter = options.isRecursive() ? null : "/"; + if (delimiter != null) { + SortedSet commonPrefixes = null; + Iterable iterable = Iterables.transform(contents, new CommonPrefixes( + prefix != null ? prefix : null, delimiter)); + commonPrefixes = iterable != null ? Sets.newTreeSet(iterable) : new TreeSet(); + commonPrefixes.remove(CommonPrefixes.NO_PREFIX); + + contents = Sets.newTreeSet(Iterables.filter(contents, new DelimiterFilter( + prefix != null ? prefix : null, delimiter))); + + Iterables. addAll(contents, Iterables.transform(commonPrefixes, + new Function() { + public ResourceMetadata apply(String o) { + MutableResourceMetadata md = new MutableResourceMetadataImpl(); + md.setType(ResourceType.RELATIVE_PATH); + md.setName(o); + return md; + } + })); + } + return immediateFuture(new ListContainerResponseImpl(contents, prefix, + marker, maxResults, truncated)); + } public MutableBlobMetadata copy(MutableBlobMetadata in) { @@ -288,96 +277,54 @@ public class StubAsyncBlobStore implements AsyncBlobStore { return newMd; } - public BlobMetadata metadata(final String container, final String key) { - if (!getContainerToBlobs().containsKey(container)) - throw new ContainerNotFoundException(container); - Map realContents = getContainerToBlobs().get(container); - if (!realContents.containsKey(key)) - throw new KeyNotFoundException(container, key); - return copy(realContents.get(key).getMetadata()); - } + // public BlobMetadata metadata(final String container, final String key) { + // if (!getContainerToBlobs().containsKey(container)) + // return immediateFailedFuture(new ContainerNotFoundException(container)); + // Map realContents = getContainerToBlobs().get(container); + // if (!realContents.containsKey(key)) + // return immediateFailedFuture(new KeyNotFoundException(container, key)); + // return copy(realContents.get(key).getMetadata()); + // } public Future removeBlob(final String container, final String key) { - return new FutureBase() { - public Void get() throws InterruptedException, ExecutionException { - if (getContainerToBlobs().containsKey(container)) { - getContainerToBlobs().get(container).remove(key); - } - return null; - } - }; + if (getContainerToBlobs().containsKey(container)) { + getContainerToBlobs().get(container).remove(key); + } + return immediateFuture(null); } public Future deleteContainer(final String container) { - return new FutureBase() { - public Void get() throws InterruptedException, ExecutionException { - if (getContainerToBlobs().containsKey(container)) { - getContainerToBlobs().remove(container); - } - return null; - } - }; + if (getContainerToBlobs().containsKey(container)) { + getContainerToBlobs().remove(container); + } + return immediateFuture(null); } public Future deleteContainerImpl(final String container) { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - if (getContainerToBlobs().containsKey(container)) { - if (getContainerToBlobs().get(container).size() == 0) - getContainerToBlobs().remove(container); - else - return false; - } - return true; - } - }; + Boolean returnVal = true; + if (getContainerToBlobs().containsKey(container)) { + if (getContainerToBlobs().get(container).size() == 0) + getContainerToBlobs().remove(container); + else + returnVal = false; + } + return immediateFuture(returnVal); } public Future containerExists(final String container) { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - return getContainerToBlobs().containsKey(container); - } - }; - } - - public static abstract class FutureBase implements Future { - public boolean cancel(boolean b) { - return false; - } - - public boolean isCancelled() { - return false; - } - - public boolean isDone() { - return true; - } - - public V get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, - TimeoutException { - return get(); - } + return immediateFuture(getContainerToBlobs().containsKey(container)); } public Future> list() { - return new FutureBase>() { - - public ListResponse get() throws InterruptedException, - ExecutionException { - return new ListResponseImpl(Iterables.transform(getContainerToBlobs() - .keySet(), new Function() { - public ResourceMetadata apply(String name) { - MutableResourceMetadata cmd = create(); - cmd.setName(name); - cmd.setType(ResourceType.CONTAINER); - return cmd; - } - - }), null, null, false); - } - - }; + return immediateFuture(new ListResponseImpl(Iterables.transform( + getContainerToBlobs().keySet(), new Function() { + public ResourceMetadata apply(String name) { + MutableResourceMetadata cmd = create(); + cmd.setName(name); + cmd.setType(ResourceType.CONTAINER); + return cmd; + } + }), null, null, false)); } @@ -386,14 +333,10 @@ public class StubAsyncBlobStore implements AsyncBlobStore { } public Future createContainer(final String name) { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - if (!getContainerToBlobs().containsKey(name)) { - getContainerToBlobs().put(name, new ConcurrentHashMap()); - } - return getContainerToBlobs().containsKey(name); - } - }; + if (!getContainerToBlobs().containsKey(name)) { + getContainerToBlobs().put(name, new ConcurrentHashMap()); + } + return immediateFuture(getContainerToBlobs().containsKey(name)); } public String getFirstQueryOrNull(String string, @Nullable HttpRequestOptions options) { @@ -452,11 +395,11 @@ public class StubAsyncBlobStore implements AsyncBlobStore { return Sets.newTreeSet(slices.get(0)); } - public void throwResponseException(int code) throws ExecutionException { + public HttpResponseException returnResponseException(int code) { HttpResponse response = null; response = new HttpResponse(); // TODO: Get real object URL? response.setStatusCode(code); - throw new ExecutionException(new HttpResponseException(new HttpCommand() { + return new HttpResponseException(new HttpCommand() { public int getRedirectCount() { return 0; @@ -499,7 +442,7 @@ public class StubAsyncBlobStore implements AsyncBlobStore { @Override public void redirectPath(String newPath) { } - }, response)); + }, response); } public Future putBlob(final String bucketName, final Blob object) { @@ -531,14 +474,9 @@ public class StubAsyncBlobStore implements AsyncBlobStore { for (Entry userMD : newMd.getUserMetadata().entrySet()) { blob.getAllHeaders().put(userMD.getKey(), userMD.getValue()); } - - return new FutureBase() { - public String get() throws InterruptedException, ExecutionException { - return eTag; - } - }; + return immediateFuture(eTag); } catch (IOException e) { - throw new RuntimeException(e); + return immediateFailedFuture(new RuntimeException(e)); } } @@ -546,95 +484,88 @@ public class StubAsyncBlobStore implements AsyncBlobStore { public Future getBlob(final String bucketName, final String key, GetOptions... optionsList) { final GetOptions options = (optionsList.length == 0) ? new GetOptions() : optionsList[0]; - return new FutureBase() { - public Blob get() throws InterruptedException, ExecutionException { - if (!getContainerToBlobs().containsKey(bucketName)) - throw new ContainerNotFoundException(bucketName); - Map realContents = getContainerToBlobs().get(bucketName); - if (!realContents.containsKey(key)) - throw new KeyNotFoundException(bucketName, key); + if (!getContainerToBlobs().containsKey(bucketName)) + return immediateFailedFuture(new ContainerNotFoundException(bucketName)); + Map realContents = getContainerToBlobs().get(bucketName); + if (!realContents.containsKey(key)) + return immediateFailedFuture(new KeyNotFoundException(bucketName, key)); - Blob object = realContents.get(key); + Blob object = realContents.get(key); - if (options.getIfMatch() != null) { - if (!object.getMetadata().getETag().equals(options.getIfMatch())) - throwResponseException(412); - } - if (options.getIfNoneMatch() != null) { - if (object.getMetadata().getETag().equals(options.getIfNoneMatch())) - throwResponseException(304); - } - if (options.getIfModifiedSince() != null) { - Date modifiedSince = options.getIfModifiedSince(); - if (object.getMetadata().getLastModified().before(modifiedSince)) { - HttpResponse response = new HttpResponse(); - response.setStatusCode(304); - throw new ExecutionException(new HttpResponseException(String.format( - "%1$s is before %2$s", object.getMetadata().getLastModified(), - modifiedSince), null, response)); - } - - } - if (options.getIfUnmodifiedSince() != null) { - Date unmodifiedSince = options.getIfUnmodifiedSince(); - if (object.getMetadata().getLastModified().after(unmodifiedSince)) { - HttpResponse response = new HttpResponse(); - response.setStatusCode(412); - throw new ExecutionException(new HttpResponseException(String.format( - "%1$s is after %2$s", object.getMetadata().getLastModified(), - unmodifiedSince), null, response)); - } - } - Blob returnVal = copyBlob(object); - - if (options.getRanges() != null && options.getRanges().size() > 0) { - byte[] data; - try { - data = ByteStreams.toByteArray(returnVal.getPayload().getContent()); - } catch (IOException e) { - throw new RuntimeException(e); - } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - for (String s : options.getRanges()) { - if (s.startsWith("-")) { - int length = Integer.parseInt(s.substring(1)); - out.write(data, data.length - length, length); - } else if (s.endsWith("-")) { - int offset = Integer.parseInt(s.substring(0, s.length() - 1)); - out.write(data, offset, data.length - offset); - } else if (s.contains("-")) { - String[] firstLast = s.split("\\-"); - int offset = Integer.parseInt(firstLast[0]); - int last = Integer.parseInt(firstLast[1]); - int length = (last < data.length) ? last + 1 : data.length - offset; - out.write(data, offset, length); - } else { - throw new IllegalArgumentException("first and last were null!"); - } - - } - returnVal.setPayload(out.toByteArray()); - returnVal.setContentLength(out.size()); - returnVal.getMetadata().setSize(new Long(data.length)); - } - returnVal.setPayload(returnVal.getPayload()); - return returnVal; + if (options.getIfMatch() != null) { + if (!object.getMetadata().getETag().equals(options.getIfMatch())) + return immediateFailedFuture(returnResponseException(412)); + } + if (options.getIfNoneMatch() != null) { + if (object.getMetadata().getETag().equals(options.getIfNoneMatch())) + return immediateFailedFuture(returnResponseException(304)); + } + if (options.getIfModifiedSince() != null) { + Date modifiedSince = options.getIfModifiedSince(); + if (object.getMetadata().getLastModified().before(modifiedSince)) { + HttpResponse response = new HttpResponse(); + response.setStatusCode(304); + return immediateFailedFuture(new HttpResponseException(String.format( + "%1$s is before %2$s", object.getMetadata().getLastModified(), modifiedSince), + null, response)); } - }; + + } + if (options.getIfUnmodifiedSince() != null) { + Date unmodifiedSince = options.getIfUnmodifiedSince(); + if (object.getMetadata().getLastModified().after(unmodifiedSince)) { + HttpResponse response = new HttpResponse(); + response.setStatusCode(412); + return immediateFailedFuture(new HttpResponseException( + String.format("%1$s is after %2$s", object.getMetadata().getLastModified(), + unmodifiedSince), null, response)); + } + } + Blob returnVal = copyBlob(object); + + if (options.getRanges() != null && options.getRanges().size() > 0) { + byte[] data; + try { + data = ByteStreams.toByteArray(returnVal.getPayload().getContent()); + } catch (IOException e) { + return immediateFailedFuture(new RuntimeException(e)); + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (String s : options.getRanges()) { + if (s.startsWith("-")) { + int length = Integer.parseInt(s.substring(1)); + out.write(data, data.length - length, length); + } else if (s.endsWith("-")) { + int offset = Integer.parseInt(s.substring(0, s.length() - 1)); + out.write(data, offset, data.length - offset); + } else if (s.contains("-")) { + String[] firstLast = s.split("\\-"); + int offset = Integer.parseInt(firstLast[0]); + int last = Integer.parseInt(firstLast[1]); + int length = (last < data.length) ? last + 1 : data.length - offset; + out.write(data, offset, length); + } else { + return immediateFailedFuture(new IllegalArgumentException( + "first and last were null!")); + } + + } + returnVal.setPayload(out.toByteArray()); + returnVal.setContentLength(out.size()); + returnVal.getMetadata().setSize(new Long(data.length)); + } + returnVal.setPayload(returnVal.getPayload()); + return immediateFuture(returnVal); } public Future blobMetadata(final String container, final String key) { - return new FutureBase() { - public BlobMetadata get() throws InterruptedException, ExecutionException { - try { - return copy(getBlob(container, key).get().getMetadata()); - } catch (Exception e) { - Utils. rethrowIfRuntimeOrSameType(e); - Utils. rethrowIfRuntimeOrSameType(e); - throw new RuntimeException(e);// TODO - } - } - }; + try { + return immediateFuture((BlobMetadata) copy(getBlob(container, key).get().getMetadata())); + } catch (Exception e) { + Utils. rethrowIfRuntimeOrSameType(e); + Utils. rethrowIfRuntimeOrSameType(e); + return immediateFailedFuture(e); + } } private Blob copyBlob(Blob object) { @@ -648,12 +579,8 @@ public class StubAsyncBlobStore implements AsyncBlobStore { } public Future clearContainer(final String container) { - return new FutureBase() { - public Void get() throws InterruptedException, ExecutionException { - getContainerToBlobs().get(container).clear(); - return null; - } - }; + getContainerToBlobs().get(container).clear(); + return immediateFuture(null); } public Future createDirectory(final String container, final String directory) { diff --git a/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/RetryOnNotFoundGetAllBlobsStrategyTest.java b/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/RetryOnNotFoundGetAllBlobsStrategyTest.java index f0bf3ae546..12d56b1b37 100644 --- a/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/RetryOnNotFoundGetAllBlobsStrategyTest.java +++ b/blobstore/src/test/java/org/jclouds/blobstore/strategy/internal/RetryOnNotFoundGetAllBlobsStrategyTest.java @@ -57,7 +57,8 @@ public class RetryOnNotFoundGetAllBlobsStrategyTest { @BeforeTest void setUp() { - blobProvider = Guice.createInjector(new BlobStoreObjectModule()).getInstance(Blob.Factory.class); + blobProvider = Guice.createInjector(new BlobStoreObjectModule()).getInstance( + Blob.Factory.class); } @SuppressWarnings("unchecked") @@ -81,7 +82,8 @@ public class RetryOnNotFoundGetAllBlobsStrategyTest { map.ifNotFoundRetryOtherwiseAddToSet("container", "key", futureObject, objects); // should have retried once assert System.currentTimeMillis() >= time + map.requestRetryMilliseconds; - assertEquals(Utils.toStringAndClose((InputStream) objects.iterator().next().getContent()), "goo"); + assertEquals(Utils.toStringAndClose((InputStream) objects.iterator().next().getContent()), + "goo"); assert !objects.contains(null); } diff --git a/core/src/main/java/org/jclouds/concurrent/FutureExceptionParser.java b/core/src/main/java/org/jclouds/concurrent/FutureExceptionParser.java index 688ffed196..4254d3d300 100755 --- a/core/src/main/java/org/jclouds/concurrent/FutureExceptionParser.java +++ b/core/src/main/java/org/jclouds/concurrent/FutureExceptionParser.java @@ -19,30 +19,35 @@ package org.jclouds.concurrent; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.jclouds.logging.Logger; import com.google.common.base.Function; +import com.google.common.util.concurrent.ListenableFuture; /** * Transforms the result of a future as soon as it is available. * + * Temporarily here until the following is resolved: guava issue 310 + * * @author Adrian Cole */ -public class FutureExceptionParser implements Future { +public class FutureExceptionParser implements ListenableFuture { - private final Future delegate; + private final ListenableFuture delegate; private final Function function; private final Logger logger; - public FutureExceptionParser(Future delegate, Function function) { + public FutureExceptionParser(ListenableFuture delegate, Function function) { this(delegate, function, Logger.NULL); } - public FutureExceptionParser(Future delegate, Function function, Logger logger) { + public FutureExceptionParser(ListenableFuture delegate, Function function, + Logger logger) { this.delegate = delegate; this.function = function; this.logger = logger; @@ -89,4 +94,9 @@ public class FutureExceptionParser implements Future { return delegate.isDone(); } + @Override + public void addListener(Runnable listener, Executor exec) { + delegate.addListener(listener, exec); + } + } diff --git a/core/src/main/java/org/jclouds/concurrent/FutureFunctionCallable.java b/core/src/main/java/org/jclouds/concurrent/FutureFunctionCallable.java deleted file mode 100755 index 96394e8ec8..0000000000 --- a/core/src/main/java/org/jclouds/concurrent/FutureFunctionCallable.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * - * Copyright (C) 2009 Cloud Conscious, LLC. - * - * ==================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ==================================================================== - */ -package org.jclouds.concurrent; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.jclouds.logging.Logger; - -import com.google.common.base.Function; - -/** - * Transforms the result of a future as soon as it is available. - * - * @author Adrian Cole - */ -public class FutureFunctionCallable implements Callable { - - private final Future future; - private final Function function; - private final Logger logger; - - public FutureFunctionCallable(Future future, Function function) { - this(future, function, Logger.NULL); - } - - public FutureFunctionCallable(Future future, Function function, Logger logger) { - this.future = future; - this.function = function; - this.logger = logger; - } - - public T call() throws Exception { - try { - F input = future.get(); - logger.debug("Processing intermediate result for: %s", input); - T result = function.apply(input); - logger.debug("Processed intermediate result for: %s", input); - return result; - } catch (ExecutionException e) { - if (e.getCause() instanceof Error) - throw (Error) e.getCause(); - throw (Exception) e.getCause(); - } - } - -} diff --git a/core/src/main/java/org/jclouds/concurrent/FutureFunctionWrapper.java b/core/src/main/java/org/jclouds/concurrent/FutureFunctionWrapper.java deleted file mode 100644 index 753b55a174..0000000000 --- a/core/src/main/java/org/jclouds/concurrent/FutureFunctionWrapper.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * - * Copyright (C) 2009 Cloud Conscious, LLC. - * - * ==================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ==================================================================== - */ -package org.jclouds.concurrent; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.jclouds.logging.Logger; - -import com.google.common.base.Function; - -/** - * Transforms the result of a future once requested via get(). - * - * @author Adrian Cole - */ -public class FutureFunctionWrapper implements Future, Function { - - private final Future future; - private final Function function; - private final Logger logger; - - public FutureFunctionWrapper(Future future, Function function) { - this(future, function, Logger.NULL); - } - - public FutureFunctionWrapper(Future future, Function function, Logger logger) { - this.future = future; - this.function = function; - this.logger = logger; - } - - public T apply(F input) { - logger.debug("Processing intermediate result for: %s", input); - T result = function.apply(input); - logger.debug("Processed intermediate result for: %s", input); - return result; - } - - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); - } - - public T get() throws InterruptedException, ExecutionException { - return apply(future.get()); - } - - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, - TimeoutException { - return apply(future.get(timeout, unit)); - } - - public boolean isCancelled() { - return future.isCancelled(); - } - - public boolean isDone() { - return future.isDone(); - } - -} diff --git a/core/src/main/java/org/jclouds/concurrent/RunnableFutureTask.java b/core/src/main/java/org/jclouds/concurrent/RunnableFutureTask.java deleted file mode 100755 index 0a25c663aa..0000000000 --- a/core/src/main/java/org/jclouds/concurrent/RunnableFutureTask.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * - * Copyright (C) 2009 Cloud Conscious, LLC. - * - * ==================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ==================================================================== - */ -package org.jclouds.concurrent; - -import static com.google.common.base.Preconditions.checkState; - -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Future that is a result of a task ran within a single thread. As such, cancel is not valid, as - * the operation is already complete. - * - * This class is like {@link FutureTask} except that it does not attempt {@code Thread.interrupt() } - * which is sometimes prohibited. - * - * @author Adrian Cole - */ -public class RunnableFutureTask implements Future { - private ExecutionException executionException; - private InterruptedException interruptedException; - private CancellationException cancellationException; - private V value; - private boolean ran = false; - - private final Callable task; - - public RunnableFutureTask(Callable task) { - this.task = task; - } - - /** - * {@inheritDoc} - * - * @param mayInterruptIfRunning - * - ignored as this cannot be called at the same time as the task is running - */ - public boolean cancel(boolean mayInterruptIfRunning) { - if (ran) { - return false; - } else { - cancellationException = new CancellationException(); - } - return true; - } - - public boolean isCancelled() { - return cancellationException != null; - } - - public boolean isDone() { - return ran || cancellationException != null || interruptedException != null - || executionException != null; - } - - public V get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, - TimeoutException { - return get(); - } - - public void run() { - try { - value = task.call(); - } catch (InterruptedException e) { - interruptedException = e; - } catch (Exception e) { - executionException = new ExecutionException(e); - } finally { - ran = true; - } - } - - /** - * {@inheritDoc} - */ - public V get() throws InterruptedException, ExecutionException { - if (cancellationException != null) - throw cancellationException; - if (interruptedException != null) - throw interruptedException; - if (executionException != null) - throw executionException; - checkState(ran, "run() was never called"); - return value; - } -} diff --git a/core/src/main/java/org/jclouds/http/HttpCommandRendezvous.java b/core/src/main/java/org/jclouds/http/HttpCommandRendezvous.java index 276bfc0c87..c891b1ef29 100644 --- a/core/src/main/java/org/jclouds/http/HttpCommandRendezvous.java +++ b/core/src/main/java/org/jclouds/http/HttpCommandRendezvous.java @@ -18,9 +18,10 @@ */ package org.jclouds.http; -import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; +import com.google.common.util.concurrent.ListenableFuture; + /** * Used for passing objects for response processing * @@ -31,10 +32,11 @@ public class HttpCommandRendezvous { private final HttpCommand command; @SuppressWarnings("unchecked") private final SynchronousQueue rendezvous; - private final Future future; + private final ListenableFuture future; @SuppressWarnings("unchecked") - public HttpCommandRendezvous(HttpCommand command, SynchronousQueue rendezvous, Future future) { + public HttpCommandRendezvous(HttpCommand command, SynchronousQueue rendezvous, + ListenableFuture future) { this.command = command; this.rendezvous = rendezvous; this.future = future; @@ -58,7 +60,7 @@ public class HttpCommandRendezvous { return command; } - public Future getFuture() { + public ListenableFuture getFuture() { return future; } diff --git a/core/src/main/java/org/jclouds/http/TransformingHttpCommand.java b/core/src/main/java/org/jclouds/http/TransformingHttpCommand.java index 9712bb22a0..5d40de3e15 100644 --- a/core/src/main/java/org/jclouds/http/TransformingHttpCommand.java +++ b/core/src/main/java/org/jclouds/http/TransformingHttpCommand.java @@ -19,7 +19,8 @@ package org.jclouds.http; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; + +import com.google.common.util.concurrent.ListenableFuture; /** * Command that utilizes RESTFul apis and extracts T from the HttpResponse. @@ -36,5 +37,5 @@ public interface TransformingHttpCommand extends HttpCommand { * @throws ExecutionException * if there is a fatal error preventing the command from invoking */ - Future execute() throws ExecutionException; + ListenableFuture execute() throws ExecutionException; } diff --git a/core/src/main/java/org/jclouds/http/TransformingHttpCommandExecutorService.java b/core/src/main/java/org/jclouds/http/TransformingHttpCommandExecutorService.java index 5bb26d82e3..1b2c6a0c11 100644 --- a/core/src/main/java/org/jclouds/http/TransformingHttpCommandExecutorService.java +++ b/core/src/main/java/org/jclouds/http/TransformingHttpCommandExecutorService.java @@ -21,7 +21,7 @@ package org.jclouds.http; import java.util.concurrent.Future; import com.google.common.base.Function; -import com.google.inject.internal.Nullable; +import com.google.common.util.concurrent.ListenableFuture; /** * Executor which will invoke and transform the response of an {@code EndpointCommand} into generic @@ -40,11 +40,8 @@ public interface TransformingHttpCommandExecutorService { * what to execute * @param responseTransformer * how to transform the response from the above command - * @param exceptionTransformer - * maps any non-critical exceptions to the return type {@code } * @return value of the intended response. */ - public Future submit(HttpCommand command, Function responseTransformer, - @Nullable Function exceptionTransformer); + public ListenableFuture submit(HttpCommand command, Function responseTransformer); } diff --git a/core/src/main/java/org/jclouds/http/TransformingHttpCommandExecutorServiceImpl.java b/core/src/main/java/org/jclouds/http/TransformingHttpCommandExecutorServiceImpl.java index 9f39afe2cf..5e691ded54 100644 --- a/core/src/main/java/org/jclouds/http/TransformingHttpCommandExecutorServiceImpl.java +++ b/core/src/main/java/org/jclouds/http/TransformingHttpCommandExecutorServiceImpl.java @@ -18,15 +18,16 @@ */ package org.jclouds.http; -import java.util.concurrent.Callable; +import static com.google.common.util.concurrent.Futures.compose; +import static com.google.common.util.concurrent.Futures.makeListenable; + import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import org.jclouds.concurrent.FutureFunctionCallable; -import org.jclouds.logging.Logger.LoggerFactory; +import javax.inject.Inject; import com.google.common.base.Function; -import javax.inject.Inject; +import com.google.common.util.concurrent.ListenableFuture; /** * Executor which will invoke and transform the response of an {@code EndpointCommand} into generic @@ -38,25 +39,20 @@ public class TransformingHttpCommandExecutorServiceImpl implements TransformingHttpCommandExecutorService { private final HttpCommandExecutorService client; private final ExecutorService executorService; - private final LoggerFactory logFactory; @Inject public TransformingHttpCommandExecutorServiceImpl(HttpCommandExecutorService client, - ExecutorService executorService, LoggerFactory logFactory) { + ExecutorService executorService) { this.client = client; this.executorService = executorService; - this.logFactory = logFactory; } /** * {@inheritDoc} */ - public Future submit(HttpCommand command, Function responseTransformer, - Function exceptionTransformer) { + public ListenableFuture submit(HttpCommand command, Function responseTransformer) { Future responseFuture = client.submit(command); - Callable valueCallable = new FutureFunctionCallable(responseFuture, - responseTransformer, logFactory.getLogger(responseTransformer.getClass().getName())); - return executorService.submit(valueCallable); + return compose(makeListenable(responseFuture), responseTransformer, executorService); } } diff --git a/core/src/main/java/org/jclouds/http/TransformingHttpCommandImpl.java b/core/src/main/java/org/jclouds/http/TransformingHttpCommandImpl.java index b2175a6f3d..1c7d657b90 100644 --- a/core/src/main/java/org/jclouds/http/TransformingHttpCommandImpl.java +++ b/core/src/main/java/org/jclouds/http/TransformingHttpCommandImpl.java @@ -20,7 +20,6 @@ package org.jclouds.http; import java.util.Collections; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import javax.annotation.Resource; import javax.inject.Inject; @@ -32,7 +31,7 @@ import org.jclouds.logging.Logger; import org.jclouds.rest.internal.GeneratedHttpRequest; import com.google.common.base.Function; -import com.google.inject.internal.Nullable; +import com.google.common.util.concurrent.ListenableFuture; /** * Executor which will invoke and transform the response of an {@code EndpointCommand} into generic @@ -46,7 +45,6 @@ public class TransformingHttpCommandImpl implements TransformingHttpCommand transformer; - private final Function exceptionTransformer; private GeneratedHttpRequest request; private volatile int failureCount; @@ -59,19 +57,17 @@ public class TransformingHttpCommandImpl implements TransformingHttpCommand request, Function transformer, - @Nullable Function exceptionTransformer) { + GeneratedHttpRequest request, Function transformer) { this.request = request; this.executorService = executorService; this.transformer = transformer; - this.exceptionTransformer = exceptionTransformer; this.failureCount = 0; } - public Future execute() throws ExecutionException { + public ListenableFuture execute() throws ExecutionException { if (exception != null) throw new ExecutionException(exception); - return executorService.submit(this, transformer, exceptionTransformer); + return executorService.submit(this, transformer); } public int getFailureCount() { diff --git a/core/src/main/java/org/jclouds/rest/config/RestModule.java b/core/src/main/java/org/jclouds/rest/config/RestModule.java index 1aeb7df5e8..98d77b0332 100755 --- a/core/src/main/java/org/jclouds/rest/config/RestModule.java +++ b/core/src/main/java/org/jclouds/rest/config/RestModule.java @@ -53,9 +53,8 @@ public class RestModule extends AbstractModule { @SuppressWarnings("unchecked") public TransformingHttpCommand create(GeneratedHttpRequest request, - Function transformer, Function exceptionTransformer) { - return new TransformingHttpCommandImpl(executorService, request, transformer, - exceptionTransformer); + Function transformer) { + return new TransformingHttpCommandImpl(executorService, request, transformer); } } diff --git a/core/src/main/java/org/jclouds/rest/internal/AsyncRestClientProxy.java b/core/src/main/java/org/jclouds/rest/internal/AsyncRestClientProxy.java index dc7b09996b..5bdebba8e1 100755 --- a/core/src/main/java/org/jclouds/rest/internal/AsyncRestClientProxy.java +++ b/core/src/main/java/org/jclouds/rest/internal/AsyncRestClientProxy.java @@ -42,10 +42,10 @@ import org.jclouds.logging.Logger; import org.jclouds.rest.InvocationContext; import com.google.common.base.Function; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.TypeLiteral; -import com.google.inject.internal.Nullable; @Singleton public class AsyncRestClientProxy implements InvocationHandler { @@ -142,7 +142,7 @@ public class AsyncRestClientProxy implements InvocationHandler { .getName(), transformer.getClass().getSimpleName()); logger.debug("Invoking %s.%s", declaring.getSimpleName(), method.getName()); - Future result = commandFactory.create(request, transformer, exceptionParser).execute(); + ListenableFuture result = commandFactory.create(request, transformer).execute(); if (exceptionParser != null) { logger.trace("Exceptions from %s.%s are parsed by %s", declaring.getSimpleName(), @@ -164,8 +164,7 @@ public class AsyncRestClientProxy implements InvocationHandler { public static interface Factory { public TransformingHttpCommand create(GeneratedHttpRequest request, - Function transformer, - @Nullable Function exceptionTransformer); + Function transformer); } @Override diff --git a/core/src/test/java/org/jclouds/concurrent/FutureExceptionParserTest.java b/core/src/test/java/org/jclouds/concurrent/FutureExceptionParserTest.java index 1f9d28b5de..f63454363d 100755 --- a/core/src/test/java/org/jclouds/concurrent/FutureExceptionParserTest.java +++ b/core/src/test/java/org/jclouds/concurrent/FutureExceptionParserTest.java @@ -18,12 +18,12 @@ */ package org.jclouds.concurrent; +import static com.google.common.util.concurrent.Futures.makeListenable; import static org.testng.Assert.assertEquals; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -31,51 +31,52 @@ import org.testng.annotations.Test; import com.google.common.base.Function; import com.google.common.util.concurrent.Executors; +import com.google.common.util.concurrent.ListenableFuture; /** - * Tests behavior of FutureExceptionParser + * Tests behavior of ListenableFutureExceptionParser * * @author Adrian Cole */ -@Test(groups = "unit", testName = "concurrent.FutureExceptionParserTest") +@Test(groups = "unit", testName = "concurrent.ListenableFutureExceptionParserTest") public class FutureExceptionParserTest { ExecutorService executorService = Executors.sameThreadExecutor(); @Test public void testGet() throws InterruptedException, ExecutionException { - Future future = createFuture(new RuntimeException("foo")); + ListenableFuture future = createListenableFuture(new RuntimeException("foo")); assertEquals(future.get(), "foo"); } @Test(expectedExceptions = ExecutionException.class) public void testGetUnmatched() throws InterruptedException, ExecutionException { - Future future = createFuture(new Exception("foo")); + ListenableFuture future = createListenableFuture(new Exception("foo")); assertEquals(future.get(), "foo"); } @Test public void testGetLongTimeUnit() throws InterruptedException, ExecutionException, TimeoutException { - Future future = createFuture(new RuntimeException("foo")); + ListenableFuture future = createListenableFuture(new RuntimeException("foo")); assertEquals(future.get(1, TimeUnit.SECONDS), "foo"); } @Test(expectedExceptions = ExecutionException.class) public void testGetLongTimeUnitUnmatched() throws InterruptedException, ExecutionException, TimeoutException { - Future future = createFuture(new Exception("foo")); + ListenableFuture future = createListenableFuture(new Exception("foo")); assertEquals(future.get(1, TimeUnit.SECONDS), "foo"); } @SuppressWarnings("unchecked") - private Future createFuture(final Exception exception) { - Future future = executorService.submit(new Callable() { + private ListenableFuture createListenableFuture(final Exception exception) { + ListenableFuture future = makeListenable(executorService.submit(new Callable() { public String call() throws Exception { throw exception; } - }); + })); future = new FutureExceptionParser(future, new Function() { diff --git a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java index a1049eafae..bcda844e88 100644 --- a/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java +++ b/core/src/test/java/org/jclouds/http/handlers/BackoffLimitedRetryHandlerTest.java @@ -37,13 +37,10 @@ import org.jclouds.http.TransformingHttpCommandImpl; import org.jclouds.http.functions.ReturnStringIf200; import org.jclouds.http.internal.HttpWire; import org.jclouds.http.internal.JavaUrlHttpCommandExecutorService; -import org.jclouds.logging.Logger; -import org.jclouds.logging.Logger.LoggerFactory; import org.jclouds.rest.internal.RestAnnotationProcessor; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.google.common.base.Function; import com.google.inject.AbstractModule; import com.google.inject.Key; import com.google.inject.TypeLiteral; @@ -60,25 +57,25 @@ public class BackoffLimitedRetryHandlerTest { long startTime = System.nanoTime(); handler.imposeBackoffExponentialDelay(1, "TEST FAILURE: 1"); long elapsedTime = (System.nanoTime() - startTime) / 1000000; - assert(elapsedTime >= 49) : elapsedTime; + assert (elapsedTime >= 49) : elapsedTime; assertTrue(elapsedTime < 50 + acceptableDelay); startTime = System.nanoTime(); handler.imposeBackoffExponentialDelay(2, "TEST FAILURE: 2"); elapsedTime = (System.nanoTime() - startTime) / 1000000; - assert(elapsedTime >= 199) : elapsedTime; + assert (elapsedTime >= 199) : elapsedTime; assertTrue(elapsedTime < 200 + acceptableDelay); startTime = System.nanoTime(); handler.imposeBackoffExponentialDelay(3, "TEST FAILURE: 3"); elapsedTime = (System.nanoTime() - startTime) / 1000000; - assert(elapsedTime >= 449) : elapsedTime; + assert (elapsedTime >= 449) : elapsedTime; assertTrue(elapsedTime < 450 + acceptableDelay); startTime = System.nanoTime(); handler.imposeBackoffExponentialDelay(4, "TEST FAILURE: 4"); elapsedTime = (System.nanoTime() - startTime) / 1000000; - assert(elapsedTime >= 799) : elapsedTime; + assert (elapsedTime >= 799) : elapsedTime; assertTrue(elapsedTime < 800 + acceptableDelay); startTime = System.nanoTime(); @@ -96,14 +93,7 @@ public class BackoffLimitedRetryHandlerTest { JavaUrlHttpCommandExecutorService httpService = new JavaUrlHttpCommandExecutorService( execService, new DelegatingRetryHandler(), new DelegatingErrorHandler(), new HttpWire(Executors.newCachedThreadPool())); - executorService = new TransformingHttpCommandExecutorServiceImpl(httpService, execService, - new LoggerFactory() { - - public Logger getLogger(String category) { - return Logger.NULL; - } - - }); + executorService = new TransformingHttpCommandExecutorServiceImpl(httpService, execService); } @Test @@ -164,14 +154,8 @@ public class BackoffLimitedRetryHandlerTest { private HttpCommand createCommand() throws SecurityException, NoSuchMethodException { Method method = IntegrationTestAsyncClient.class.getMethod("download", String.class); - HttpCommand command = new TransformingHttpCommandImpl(executorService, processor - .createRequest(method, "1"), new ReturnStringIf200(), - new Function() { - public String apply(Exception from) { - return null; - } - }); - return command; + return new TransformingHttpCommandImpl(executorService, processor.createRequest( + method, "1"), new ReturnStringIf200()); } @Test diff --git a/extensions/httpnio/src/main/java/org/jclouds/http/pool/ConnectionPoolTransformingHttpCommandExecutorService.java b/extensions/httpnio/src/main/java/org/jclouds/http/pool/ConnectionPoolTransformingHttpCommandExecutorService.java index f8695df10c..1777a4f1ab 100755 --- a/extensions/httpnio/src/main/java/org/jclouds/http/pool/ConnectionPoolTransformingHttpCommandExecutorService.java +++ b/extensions/httpnio/src/main/java/org/jclouds/http/pool/ConnectionPoolTransformingHttpCommandExecutorService.java @@ -19,6 +19,7 @@ package org.jclouds.http.pool; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.Futures.makeListenable; import java.net.URI; import java.util.concurrent.BlockingQueue; @@ -33,7 +34,6 @@ import java.util.concurrent.TimeoutException; import javax.inject.Inject; -import org.jclouds.concurrent.FutureExceptionParser; import org.jclouds.http.HttpCommand; import org.jclouds.http.HttpCommandRendezvous; import org.jclouds.http.HttpResponse; @@ -45,6 +45,7 @@ import org.jclouds.util.Utils; import com.google.common.base.Function; import com.google.common.collect.MapMaker; +import com.google.common.util.concurrent.ListenableFuture; /** * // TODO: Adrian: Document this! @@ -130,9 +131,8 @@ public class ConnectionPoolTransformingHttpCommandExecutorService extends Bas * will be processed via the {@link #invoke(TransformingHttpCommandExecutorService) invoke} * method. */ - public Future submit(HttpCommand command, - final Function responseTransformer, - Function exceptionTransformer) { + public ListenableFuture submit(HttpCommand command, + final Function responseTransformer) { exceptionIfNotActive(); final SynchronousQueue channel = new SynchronousQueue(); // should block and immediately parse the response on exit. @@ -151,11 +151,9 @@ public class ConnectionPoolTransformingHttpCommandExecutorService extends Bas } }); - HttpCommandRendezvous rendezvous = new HttpCommandRendezvous(command, channel, future); + HttpCommandRendezvous rendezvous = new HttpCommandRendezvous(command, channel, + makeListenable(future)); commandQueue.add(rendezvous); - if (exceptionTransformer != null) { - return new FutureExceptionParser(rendezvous.getFuture(), exceptionTransformer); - } return rendezvous.getFuture(); } @@ -227,4 +225,5 @@ public class ConnectionPoolTransformingHttpCommandExecutorService extends Bas endpoint.getPort())); } } + } diff --git a/rackspace/src/main/java/org/jclouds/rackspace/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java b/rackspace/src/main/java/org/jclouds/rackspace/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java index 42c66b056f..33e9dcaed4 100644 --- a/rackspace/src/main/java/org/jclouds/rackspace/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java +++ b/rackspace/src/main/java/org/jclouds/rackspace/cloudfiles/blobstore/CloudFilesAsyncBlobStore.java @@ -18,6 +18,8 @@ */ package org.jclouds.rackspace.cloudfiles.blobstore; +import static com.google.common.util.concurrent.Futures.compose; +import static com.google.common.util.concurrent.Futures.makeListenable; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import java.util.SortedSet; @@ -83,7 +85,7 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements */ public Future blobMetadata(String container, String key) { - return wrapFuture(async.getObjectInfo(container, key), + return compose(makeListenable(async.getObjectInfo(container, key)), new Function() { @Override @@ -91,7 +93,7 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements return object2BlobMd.apply(from); } - }); + }, service); } public Future clearContainer(final String container) { @@ -129,19 +131,19 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements org.jclouds.blobstore.options.GetOptions... optionsList) { GetOptions httpOptions = blob2ObjectGetOptions.apply(optionsList); Future returnVal = async.getObject(container, key, httpOptions); - return wrapFuture(returnVal, object2Blob); + return compose(makeListenable(returnVal), object2Blob, service); } public Future> list() { - return wrapFuture( - async.listContainers(), + return compose( + makeListenable(async.listContainers()), new Function, org.jclouds.blobstore.domain.ListResponse>() { public org.jclouds.blobstore.domain.ListResponse apply( SortedSet from) { return new ListResponseImpl(Iterables.transform(from, container2ResourceMd), null, null, false); } - }); + }, service); } public Future> list( @@ -150,7 +152,7 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements .apply(optionsList); Future> returnVal = async.listObjects(container, httpOptions); - return wrapFuture(returnVal, container2ResourceList); + return compose(makeListenable(returnVal), container2ResourceList, service); } public Future putBlob(String container, Blob blob) { @@ -174,7 +176,6 @@ public class CloudFilesAsyncBlobStore extends BaseCloudFilesBlobStore implements public Future directoryExists(final String container, final String directory) { return service.submit(new Callable() { - public Boolean call() throws Exception { try { getDirectoryStrategy.execute(CloudFilesAsyncBlobStore.this, container, directory); diff --git a/rackspace/src/main/java/org/jclouds/rackspace/cloudfiles/blobstore/internal/BaseCloudFilesBlobStore.java b/rackspace/src/main/java/org/jclouds/rackspace/cloudfiles/blobstore/internal/BaseCloudFilesBlobStore.java index a79f4115b7..bf3808e121 100644 --- a/rackspace/src/main/java/org/jclouds/rackspace/cloudfiles/blobstore/internal/BaseCloudFilesBlobStore.java +++ b/rackspace/src/main/java/org/jclouds/rackspace/cloudfiles/blobstore/internal/BaseCloudFilesBlobStore.java @@ -21,7 +21,6 @@ package org.jclouds.rackspace.cloudfiles.blobstore.internal; import static com.google.common.base.Preconditions.checkNotNull; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import javax.inject.Inject; @@ -29,7 +28,6 @@ import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.strategy.ClearListStrategy; import org.jclouds.blobstore.strategy.GetDirectoryStrategy; import org.jclouds.blobstore.strategy.MkdirStrategy; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.logging.Logger.LoggerFactory; import org.jclouds.rackspace.cloudfiles.CloudFilesAsyncClient; import org.jclouds.rackspace.cloudfiles.CloudFilesClient; @@ -41,8 +39,6 @@ import org.jclouds.rackspace.cloudfiles.blobstore.functions.ContainerToResourceM import org.jclouds.rackspace.cloudfiles.blobstore.functions.ObjectToBlob; import org.jclouds.rackspace.cloudfiles.blobstore.functions.ObjectToBlobMetadata; -import com.google.common.base.Function; - public class BaseCloudFilesBlobStore { protected final CloudFilesAsyncClient async; protected final CloudFilesClient sync; @@ -88,11 +84,6 @@ public class BaseCloudFilesBlobStore { this.service = checkNotNull(service, "service"); } - protected Future wrapFuture(Future future, Function function) { - return new FutureFunctionWrapper(future, function, logFactory.getLogger(function - .getClass().getName())); - } - public Blob newBlob(String name) { Blob blob = blobFactory.create(null); blob.getMetadata().setName(name); diff --git a/rackspace/src/test/java/org/jclouds/rackspace/cloudfiles/internal/StubCloudFilesAsyncClient.java b/rackspace/src/test/java/org/jclouds/rackspace/cloudfiles/internal/StubCloudFilesAsyncClient.java index 9e6525abfb..afd5dbe02a 100644 --- a/rackspace/src/test/java/org/jclouds/rackspace/cloudfiles/internal/StubCloudFilesAsyncClient.java +++ b/rackspace/src/test/java/org/jclouds/rackspace/cloudfiles/internal/StubCloudFilesAsyncClient.java @@ -19,12 +19,12 @@ package org.jclouds.rackspace.cloudfiles.internal; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.util.concurrent.Futures.immediateFuture; import java.net.URI; import java.util.Map; import java.util.SortedSet; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.inject.Inject; @@ -34,11 +34,8 @@ import org.jclouds.blobstore.domain.BlobMetadata; import org.jclouds.blobstore.domain.ListContainerResponse; import org.jclouds.blobstore.functions.HttpGetOptionsListToGetOptions; import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore; -import org.jclouds.blobstore.integration.internal.StubAsyncBlobStore.FutureBase; import org.jclouds.blobstore.options.ListContainerOptions; -import org.jclouds.concurrent.FutureFunctionWrapper; import org.jclouds.http.options.GetOptions; -import org.jclouds.logging.Logger.LoggerFactory; import org.jclouds.rackspace.cloudfiles.CloudFilesAsyncClient; import org.jclouds.rackspace.cloudfiles.blobstore.functions.BlobToObject; import org.jclouds.rackspace.cloudfiles.blobstore.functions.ListContainerOptionsToBlobStoreListContainerOptions; @@ -56,6 +53,7 @@ import org.jclouds.rackspace.cloudfiles.options.ListCdnContainerOptions; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; /** * Implementation of {@link CloudFilesAsyncClient} which keeps all data in a local Map object. @@ -65,7 +63,6 @@ import com.google.common.collect.Sets; public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient { private final HttpGetOptionsListToGetOptions httpGetOptionsConverter; private final StubAsyncBlobStore blobStore; - private final LoggerFactory logFactory; private final CFObject.Factory objectProvider; private final ObjectToBlob object2Blob; private final BlobToObject blob2Object; @@ -74,7 +71,7 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient { private final ResourceToObjectList resource2ObjectList; @Inject - private StubCloudFilesAsyncClient(StubAsyncBlobStore blobStore, LoggerFactory logFactory, + private StubCloudFilesAsyncClient(StubAsyncBlobStore blobStore, ConcurrentMap> containerToBlobs, CFObject.Factory objectProvider, HttpGetOptionsListToGetOptions httpGetOptionsConverter, ObjectToBlob object2Blob, @@ -82,7 +79,6 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient { ListContainerOptionsToBlobStoreListContainerOptions container2ContainerListOptions, ResourceToObjectList resource2ContainerList) { this.blobStore = blobStore; - this.logFactory = logFactory; this.objectProvider = objectProvider; this.httpGetOptionsConverter = httpGetOptionsConverter; this.object2Blob = checkNotNull(object2Blob, "object2Blob"); @@ -93,17 +89,8 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient { this.resource2ObjectList = checkNotNull(resource2ContainerList, "resource2ContainerList"); } - protected Future wrapFuture(Future future, Function function) { - return new FutureFunctionWrapper(future, function, logFactory.getLogger(function - .getClass().getName())); - } - public Future containerExists(final String container) { - return new FutureBase() { - public Boolean get() throws InterruptedException, ExecutionException { - return blobStore.getContainerToBlobs().containsKey(container); - } - }; + return immediateFuture(blobStore.getContainerToBlobs().containsKey(container)); } public Future createContainer(String container) { @@ -136,11 +123,12 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient { public Future getObject(String container, String key, GetOptions... options) { org.jclouds.blobstore.options.GetOptions getOptions = httpGetOptionsConverter.apply(options); - return wrapFuture(blobStore.getBlob(container, key, getOptions), blob2Object); + return Futures.compose(Futures.makeListenable(blobStore.getBlob(container, key, getOptions)), + blob2Object); } public Future getObjectInfo(String container, String key) { - return wrapFuture(blobStore.blobMetadata(container, key), + return Futures.compose(Futures.makeListenable(blobStore.blobMetadata(container, key)), new Function() { @Override @@ -159,24 +147,19 @@ public class StubCloudFilesAsyncClient implements CloudFilesAsyncClient { public Future> listContainers( org.jclouds.rackspace.cloudfiles.options.ListContainerOptions... options) { - return new FutureBase>() { - - public SortedSet get() throws InterruptedException, ExecutionException { - return Sets.newTreeSet(Iterables.transform(blobStore.getContainerToBlobs().keySet(), - new Function() { - public ContainerMetadata apply(String name) { - return new ContainerMetadata(name, -1, -1); - } - - })); + return immediateFuture(Sets.newTreeSet(Iterables.transform(blobStore.getContainerToBlobs() + .keySet(), new Function() { + public ContainerMetadata apply(String name) { + return new ContainerMetadata(name, -1, -1); } - }; + }))); } public Future> listObjects(String container, org.jclouds.rackspace.cloudfiles.options.ListContainerOptions... optionsList) { ListContainerOptions options = container2ContainerListOptions.apply(optionsList); - return wrapFuture(blobStore.list(container, options), resource2ObjectList); + return Futures.compose(Futures.makeListenable(blobStore.list(container, options)), + resource2ObjectList); } public Future putObject(String container, CFObject object) {