updated to use standard guava Futures and ListenableFuture

This commit is contained in:
Adrian Cole 2013-01-13 09:34:19 -08:00
parent 6a728f8cb3
commit 5664eeb127
7 changed files with 49 additions and 69 deletions

View File

@ -23,8 +23,6 @@ import static com.google.common.collect.Maps.newHashMap;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
@ -38,6 +36,8 @@ import org.jclouds.chef.config.ChefProperties;
import org.jclouds.chef.strategy.DeleteAllClientsInList;
import org.jclouds.logging.Logger;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
/**
@ -50,7 +50,7 @@ public class DeleteAllClientsInListImpl implements DeleteAllClientsInList {
protected final ChefApi chefApi;
protected final ChefAsyncApi chefAsyncApi;
protected final ExecutorService userExecutor;
protected final ListeningExecutorService userExecutor;
@Resource
@Named(ChefProperties.CHEF_LOGGER)
protected Logger logger = Logger.NULL;
@ -60,7 +60,7 @@ public class DeleteAllClientsInListImpl implements DeleteAllClientsInList {
protected Long maxTime;
@Inject
DeleteAllClientsInListImpl(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userExecutor, ChefApi getAllApi,
DeleteAllClientsInListImpl(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor, ChefApi getAllApi,
ChefAsyncApi ablobstore) {
this.userExecutor = userExecutor;
this.chefAsyncApi = ablobstore;
@ -70,13 +70,12 @@ public class DeleteAllClientsInListImpl implements DeleteAllClientsInList {
@Override
public void execute(Iterable<String> names) {
Map<String, Exception> exceptions = newHashMap();
Map<String, Future<?>> responses = newHashMap();
Map<String, ListenableFuture<?>> responses = newHashMap();
for (String name : names) {
responses.put(name, chefAsyncApi.deleteClient(name));
}
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
String.format("deleting apis: %s", names));
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, String.format("deleting apis: %s", names));
} catch (TimeoutException e) {
propagate(e);
}

View File

@ -23,8 +23,6 @@ import static com.google.common.collect.Maps.newHashMap;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
@ -38,6 +36,8 @@ import org.jclouds.chef.config.ChefProperties;
import org.jclouds.chef.strategy.DeleteAllNodesInList;
import org.jclouds.logging.Logger;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
/**
@ -50,7 +50,7 @@ public class DeleteAllNodesInListImpl implements DeleteAllNodesInList {
protected final ChefApi chefApi;
protected final ChefAsyncApi chefAsyncApi;
protected final ExecutorService userExecutor;
protected final ListeningExecutorService userExecutor;
@Resource
@Named(ChefProperties.CHEF_LOGGER)
protected Logger logger = Logger.NULL;
@ -60,7 +60,7 @@ public class DeleteAllNodesInListImpl implements DeleteAllNodesInList {
protected Long maxTime;
@Inject
DeleteAllNodesInListImpl(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userExecutor, ChefApi getAllNode,
DeleteAllNodesInListImpl(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor, ChefApi getAllNode,
ChefAsyncApi ablobstore) {
this.userExecutor = userExecutor;
this.chefAsyncApi = ablobstore;
@ -70,13 +70,12 @@ public class DeleteAllNodesInListImpl implements DeleteAllNodesInList {
@Override
public void execute(Iterable<String> names) {
Map<String, Exception> exceptions = newHashMap();
Map<String, Future<?>> responses = newHashMap();
Map<String, ListenableFuture<?>> responses = newHashMap();
for (String name : names) {
responses.put(name, chefAsyncApi.deleteNode(name));
}
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
String.format("deleting nodes: %s", names));
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, String.format("deleting nodes: %s", names));
} catch (TimeoutException e) {
propagate(e);
}

View File

@ -21,16 +21,13 @@ package org.jclouds.chef.strategy.internal;
import static com.google.common.collect.Iterables.filter;
import static org.jclouds.concurrent.FutureIterables.transformParallel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.annotation.Resource;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.chef.ChefAsyncApi;
import org.jclouds.chef.ChefApi;
import org.jclouds.chef.ChefAsyncApi;
import org.jclouds.chef.config.ChefProperties;
import org.jclouds.chef.domain.Client;
import org.jclouds.chef.strategy.ListClients;
@ -38,6 +35,8 @@ import org.jclouds.logging.Logger;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
/**
@ -50,7 +49,7 @@ public class ListClientsImpl implements ListClients {
protected final ChefApi chefApi;
protected final ChefAsyncApi chefAsyncApi;
protected final ExecutorService userExecutor;
protected final ListeningExecutorService userExecutor;
@Resource
@Named(ChefProperties.CHEF_LOGGER)
protected Logger logger = Logger.NULL;
@ -60,7 +59,7 @@ public class ListClientsImpl implements ListClients {
protected Long maxTime;
@Inject
ListClientsImpl(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userExecutor, ChefApi getAllApi,
ListClientsImpl(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor, ChefApi getAllApi,
ChefAsyncApi ablobstore) {
this.userExecutor = userExecutor;
this.chefAsyncApi = ablobstore;
@ -79,13 +78,10 @@ public class ListClientsImpl implements ListClients {
@Override
public Iterable<? extends Client> execute(Iterable<String> toGet) {
return transformParallel(toGet, new Function<String, Future<? extends Client>>() {
@Override
public Future<Client> apply(String from) {
return transformParallel(toGet, new Function<String, ListenableFuture<? extends Client>>() {
public ListenableFuture<Client> apply(String from) {
return chefAsyncApi.getClient(from);
}
}, userExecutor, maxTime, logger, "getting apis");
}

View File

@ -23,16 +23,13 @@ import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static org.jclouds.concurrent.FutureIterables.transformParallel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.annotation.Resource;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.chef.ChefAsyncApi;
import org.jclouds.chef.ChefApi;
import org.jclouds.chef.ChefAsyncApi;
import org.jclouds.chef.config.ChefProperties;
import org.jclouds.chef.domain.CookbookVersion;
import org.jclouds.chef.strategy.ListCookbookVersions;
@ -40,6 +37,8 @@ import org.jclouds.logging.Logger;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
/**
@ -52,7 +51,7 @@ public class ListCookbookVersionsImpl implements ListCookbookVersions {
protected final ChefApi chefApi;
protected final ChefAsyncApi chefAsyncApi;
protected final ExecutorService userExecutor;
protected final ListeningExecutorService userExecutor;
@Resource
@Named(ChefProperties.CHEF_LOGGER)
protected Logger logger = Logger.NULL;
@ -62,7 +61,7 @@ public class ListCookbookVersionsImpl implements ListCookbookVersions {
protected Long maxTime;
@Inject
ListCookbookVersionsImpl(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userExecutor,
ListCookbookVersionsImpl(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
ChefApi getAllCookbookVersion, ChefAsyncApi ablobstore) {
this.userExecutor = userExecutor;
this.chefAsyncApi = ablobstore;
@ -87,13 +86,10 @@ public class ListCookbookVersionsImpl implements ListCookbookVersions {
public Iterable<? extends CookbookVersion> apply(final String cookbook) {
// TODO getting each version could also go parallel
return transformParallel(chefApi.getVersionsOfCookbook(cookbook),
new Function<String, Future<? extends CookbookVersion>>() {
@Override
public Future<CookbookVersion> apply(String version) {
new Function<String, ListenableFuture<? extends CookbookVersion>>() {
public ListenableFuture<CookbookVersion> apply(String version) {
return chefAsyncApi.getCookbook(cookbook, version);
}
}, userExecutor, maxTime, logger, "getting versions of cookbook " + cookbook);
}

View File

@ -21,16 +21,13 @@ package org.jclouds.chef.strategy.internal;
import static com.google.common.collect.Iterables.filter;
import static org.jclouds.concurrent.FutureIterables.transformParallel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.annotation.Resource;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.Constants;
import org.jclouds.chef.ChefAsyncApi;
import org.jclouds.chef.ChefApi;
import org.jclouds.chef.ChefAsyncApi;
import org.jclouds.chef.config.ChefProperties;
import org.jclouds.chef.domain.Node;
import org.jclouds.chef.strategy.ListNodes;
@ -38,6 +35,8 @@ import org.jclouds.logging.Logger;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
/**
@ -50,7 +49,7 @@ public class ListNodesImpl implements ListNodes {
protected final ChefApi chefApi;
protected final ChefAsyncApi chefAsyncApi;
protected final ExecutorService userExecutor;
protected final ListeningExecutorService userExecutor;
@Resource
@Named(ChefProperties.CHEF_LOGGER)
protected Logger logger = Logger.NULL;
@ -60,7 +59,7 @@ public class ListNodesImpl implements ListNodes {
protected Long maxTime;
@Inject
ListNodesImpl(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userExecutor, ChefApi getAllNode,
ListNodesImpl(@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor, ChefApi getAllNode,
ChefAsyncApi ablobstore) {
this.userExecutor = userExecutor;
this.chefAsyncApi = ablobstore;
@ -79,15 +78,11 @@ public class ListNodesImpl implements ListNodes {
@Override
public Iterable<? extends Node> execute(Iterable<String> toGet) {
return transformParallel(toGet, new Function<String, Future<? extends Node>>() {
@Override
public Future<Node> apply(String from) {
return transformParallel(toGet, new Function<String, ListenableFuture<? extends Node>>() {
public ListenableFuture<Node> apply(String from) {
return chefAsyncApi.getNode(from);
}
}, userExecutor, maxTime, logger, "getting nodes");
}
}

View File

@ -22,14 +22,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Sets.newLinkedHashSet;
import static org.jclouds.concurrent.Futures.compose;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.transform;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
@ -56,8 +56,8 @@ import org.jclouds.io.Payload;
import org.jclouds.util.Strings2;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
/**
* In-memory chef simulator.
@ -95,18 +95,18 @@ public class TransientChefAsyncApi implements ChefAsyncApi {
}
private final LocalAsyncBlobStore databags;
private final ExecutorService executor;
private final ListeningExecutorService userExecutor;
private final BlobToDatabagItem blobToDatabagItem;
private final StorageMetadataToName storageMetadataToName;
@Inject
TransientChefAsyncApi(@Named("databags") LocalAsyncBlobStore databags,
StorageMetadataToName storageMetadataToName, BlobToDatabagItem blobToDatabagItem,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService executor) {
@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor) {
this.databags = checkNotNull(databags, "databags");
this.storageMetadataToName = checkNotNull(storageMetadataToName, "storageMetadataToName");
this.blobToDatabagItem = checkNotNull(blobToDatabagItem, "blobToDatabagItem");
this.executor = checkNotNull(executor, "executor");
this.userExecutor = checkNotNull(userExecutor, "userExecutor");
}
@Override
@ -131,13 +131,10 @@ public class TransientChefAsyncApi implements ChefAsyncApi {
@Override
public ListenableFuture<Void> createDatabag(String databagName) {
return Futures.transform(databags.createContainerInLocation(null, databagName), new Function<Boolean, Void>(){
@Override
return transform(databags.createContainerInLocation(null, databagName), new Function<Boolean, Void>(){
public Void apply(Boolean input) {
return null;
}
});
}
@ -145,7 +142,7 @@ public class TransientChefAsyncApi implements ChefAsyncApi {
public ListenableFuture<DatabagItem> createDatabagItem(String databagName, DatabagItem databagItem) {
Blob blob = databags.blobBuilder(databagItem.getId()).payload(databagItem.toString()).build();
databags.putBlob(databagName, blob);
return Futures.immediateFuture(databagItem);
return immediateFuture(databagItem);
}
@Override
@ -185,7 +182,7 @@ public class TransientChefAsyncApi implements ChefAsyncApi {
@Override
public ListenableFuture<DatabagItem> deleteDatabagItem(String databagName, String databagItemId) {
return Futures.immediateFuture(blobToDatabagItem.apply(databags.getContext().createBlobMap(databagName).remove(databagItemId)));
return immediateFuture(blobToDatabagItem.apply(databags.getContext().createBlobMap(databagName).remove(databagItemId)));
}
@Override
@ -215,7 +212,7 @@ public class TransientChefAsyncApi implements ChefAsyncApi {
@Override
public ListenableFuture<DatabagItem> getDatabagItem(String databagName, String databagItemId) {
return compose(databags.getBlob(databagName, databagItemId), blobToDatabagItem, executor);
return transform(databags.getBlob(databagName, databagItemId), blobToDatabagItem, userExecutor);
}
@Override
@ -250,12 +247,12 @@ public class TransientChefAsyncApi implements ChefAsyncApi {
@Override
public ListenableFuture<Set<String>> listDatabagItems(String databagName) {
return compose(databags.list(databagName), storageMetadataToName, executor);
return transform(databags.list(databagName), storageMetadataToName, userExecutor);
}
@Override
public ListenableFuture<Set<String>> listDatabags() {
return compose(databags.list(), storageMetadataToName, executor);
return transform(databags.list(), storageMetadataToName, userExecutor);
}
@Override

View File

@ -18,6 +18,7 @@
*/
package org.jclouds.chef.test.config;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.jclouds.rest.config.BinderUtils.bindBlockingApi;
import java.io.IOException;
@ -38,7 +39,6 @@ import org.jclouds.chef.functions.ClientForGroup;
import org.jclouds.chef.functions.RunListForGroup;
import org.jclouds.chef.test.TransientChefApi;
import org.jclouds.chef.test.TransientChefAsyncApi;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.concurrent.config.ExecutorServiceModule;
import org.jclouds.crypto.Crypto;
import org.jclouds.crypto.Pems;
@ -77,11 +77,9 @@ public class TransientChefApiModule extends AbstractModule {
bind(LocalAsyncBlobStore.class).annotatedWith(Names.named("databags")).toInstance(
ContextBuilder
.newBuilder(new TransientApiMetadata())
.modules(
ImmutableSet.<Module> of(new ExecutorServiceModule(MoreExecutors.sameThreadExecutor(),
MoreExecutors.sameThreadExecutor()))).buildInjector()
.getInstance(LocalAsyncBlobStore.class));
.modules(ImmutableSet.<Module> of(
new ExecutorServiceModule(sameThreadExecutor(), sameThreadExecutor())))
.buildInjector().getInstance(LocalAsyncBlobStore.class));
bind(Statement.class).annotatedWith(Names.named("installChefGems")).to(InstallChefGems.class);
}