Merge pull request #475 from andrewgaul/awaitcompletion-concurrentmap

Collect errors in a thread-safe Map
This commit is contained in:
Adrian Cole 2012-03-19 15:45:45 -07:00
commit 8f4307843d
1 changed files with 4 additions and 3 deletions

View File

@ -21,11 +21,13 @@ package org.jclouds.concurrent;
import static com.google.common.base.Throwables.propagate; import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.newConcurrentMap;
import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Maps.newHashMap;
import static org.jclouds.util.Throwables2.containsThrowable; import static org.jclouds.util.Throwables2.containsThrowable;
import static org.jclouds.util.Throwables2.propagateAuthorizationOrOriginalException; import static org.jclouds.util.Throwables2.propagateAuthorizationOrOriginalException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -45,7 +47,6 @@ import org.jclouds.rest.AuthorizationException;
import com.google.common.annotations.Beta; import com.google.common.annotations.Beta;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject; import com.google.inject.Inject;
/** /**
@ -115,14 +116,14 @@ public class FutureIterables {
public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec, public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
@Nullable Long maxTime, final Logger logger, final String logPrefix) { @Nullable Long maxTime, final Logger logger, final String logPrefix) {
final ConcurrentMap<T, Exception> errorMap = newConcurrentMap();
if (responses.size() == 0) if (responses.size() == 0)
return ImmutableMap.of(); return errorMap;
final int total = responses.size(); final int total = responses.size();
final CountDownLatch doneSignal = new CountDownLatch(total); final CountDownLatch doneSignal = new CountDownLatch(total);
final AtomicInteger complete = new AtomicInteger(0); final AtomicInteger complete = new AtomicInteger(0);
final AtomicInteger errors = new AtomicInteger(0); final AtomicInteger errors = new AtomicInteger(0);
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
final Map<T, Exception> errorMap = newHashMap();
for (final java.util.Map.Entry<T, ? extends Future<?>> future : responses.entrySet()) { for (final java.util.Map.Entry<T, ? extends Future<?>> future : responses.entrySet()) {
Futures.makeListenable(future.getValue(), exec).addListener(new Runnable() { Futures.makeListenable(future.getValue(), exec).addListener(new Runnable() {