diff --git a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/strategy/FindMD5InUserMetadata.java b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/strategy/FindMD5InUserMetadata.java index 8ae8ed25f5..7b7a1766c5 100644 --- a/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/strategy/FindMD5InUserMetadata.java +++ b/apis/atmos/src/main/java/org/jclouds/atmos/blobstore/strategy/FindMD5InUserMetadata.java @@ -19,6 +19,7 @@ package org.jclouds.atmos.blobstore.strategy; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Throwables.propagate; import static org.jclouds.concurrent.FutureIterables.awaitCompletion; import java.util.Arrays; @@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import javax.annotation.Resource; @@ -109,8 +111,13 @@ public class FindMD5InUserMetadata implements ContainsValueInListStrategy { }, userExecutor); responses.put(md.getName(), future); } - Map exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, - String.format("searching for md5 in container %s", containerName)); + Map exceptions; + try { + exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, + String.format("searching for md5 in container %s", containerName)); + } catch (TimeoutException te) { + throw propagate(te); + } if (exceptions.size() > 0) throw new BlobRuntimeException(String.format("searching for md5 in container %s: %s", containerName, exceptions)); diff --git a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java index 71a94fb559..468f61cb1e 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java @@ -18,6 +18,7 @@ */ package org.jclouds.blobstore.strategy.internal; +import static com.google.common.base.Throwables.propagate; import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive; import static org.jclouds.concurrent.FutureIterables.awaitCompletion; @@ -25,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import javax.annotation.Resource; import javax.inject.Named; @@ -42,7 +44,6 @@ import org.jclouds.blobstore.strategy.ClearListStrategy; import org.jclouds.http.handlers.BackoffLimitedRetryHandler; import org.jclouds.logging.Logger; -import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import com.google.inject.Inject; @@ -103,7 +104,7 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr } catch (ExecutionException ee) { ++numErrors; if (numErrors == maxErrors) { - throw Throwables.propagate(ee.getCause()); + throw propagate(ee.getCause()); } retryHandler.imposeBackoffExponentialDelay(numErrors, message); continue; @@ -153,7 +154,16 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr } } - exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message); + try { + exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message); + } catch (TimeoutException te) { + ++numErrors; + if (numErrors == maxErrors) { + throw propagate(te); + } + retryHandler.imposeBackoffExponentialDelay(numErrors, message); + continue; + } if (!exceptions.isEmpty()) { ++numErrors; retryHandler.imposeBackoffExponentialDelay(numErrors, message); diff --git a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/MarkersDeleteDirectoryStrategy.java b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/MarkersDeleteDirectoryStrategy.java index 92449c5906..f6006f7ef9 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/MarkersDeleteDirectoryStrategy.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/MarkersDeleteDirectoryStrategy.java @@ -18,12 +18,14 @@ */ package org.jclouds.blobstore.strategy.internal; +import static com.google.common.base.Throwables.propagate; import static org.jclouds.concurrent.FutureIterables.awaitCompletion; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import javax.annotation.Resource; import javax.inject.Named; @@ -98,11 +100,15 @@ public class MarkersDeleteDirectoryStrategy implements DeleteDirectoryStrategy { } String message = String.format("deleting directory %s in containerName: %s", directory, containerName); - Map exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, - message); + Map exceptions; + try { + exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message); + } catch (TimeoutException te) { + throw propagate(te); + } if (exceptions.size() > 0) throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions)); assert !blobstore.directoryExists(containerName, directory) : String.format( "still exists %s: %s", message, exceptions); } -} \ No newline at end of file +} diff --git a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/PutBlobsStrategyImpl.java b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/PutBlobsStrategyImpl.java index d9cec6f7fc..839bd4f710 100644 --- a/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/PutBlobsStrategyImpl.java +++ b/blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/PutBlobsStrategyImpl.java @@ -18,11 +18,13 @@ */ package org.jclouds.blobstore.strategy.internal; +import static com.google.common.base.Throwables.propagate; import static org.jclouds.concurrent.FutureIterables.awaitCompletion; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import javax.annotation.Resource; import javax.inject.Named; @@ -71,11 +73,16 @@ public class PutBlobsStrategyImpl implements PutBlobsStrategy { for (Blob blob : blobs) { responses.put(blob, ablobstore.putBlob(containerName, blob)); } - Map exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, - String.format("putting into containerName: %s", containerName)); + Map exceptions; + try { + exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, + String.format("putting into containerName: %s", containerName)); + } catch (TimeoutException te) { + throw propagate(te); + } if (exceptions.size() > 0) throw new BlobRuntimeException(String.format("error putting into container %s: %s", containerName, exceptions)); } -} \ No newline at end of file +} diff --git a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java index d194001c7f..2026bf8d0d 100644 --- a/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java +++ b/blobstore/src/test/java/org/jclouds/blobstore/integration/internal/BaseBlobIntegrationTest.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPInputStream; @@ -119,7 +120,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { * http://groups.google.com/group/jclouds/browse_thread/thread/4a7c8d58530b287f */ @Test(groups = { "integration", "live" }) - public void testPutFileParallel() throws InterruptedException, IOException { + public void testPutFileParallel() throws InterruptedException, IOException, TimeoutException { File payloadFile = File.createTempFile("testPutFileParallel", "png"); Files.copy(InputSuppliers.of(getClass().getResource("/testimg.png").openStream()), payloadFile); @@ -166,7 +167,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest { } @Test(groups = { "integration", "live" }) - public void testBigFileGets() throws InterruptedException, IOException { + public void testBigFileGets() throws InterruptedException, IOException, TimeoutException { final String expectedContentDisposition = "attachment; filename=constit.txt"; final String container = getContainerName(); try { diff --git a/compute/src/main/java/org/jclouds/compute/internal/BaseComputeService.java b/compute/src/main/java/org/jclouds/compute/internal/BaseComputeService.java index 65b422682c..33168d2d9c 100644 --- a/compute/src/main/java/org/jclouds/compute/internal/BaseComputeService.java +++ b/compute/src/main/java/org/jclouds/compute/internal/BaseComputeService.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Predicates.and; import static com.google.common.base.Predicates.not; import static com.google.common.base.Predicates.notNull; +import static com.google.common.base.Throwables.propagate; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Maps.newLinkedHashMap; import static com.google.common.collect.Sets.filter; @@ -41,6 +42,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -212,8 +214,12 @@ public class BaseComputeService implements ComputeService { Map> responses = runNodesAndAddToSetStrategy.execute(group, count, template, goodNodes, badNodes, customizationResponses); - Map executionExceptions = awaitCompletion(responses, executor, null, logger, "createNodesInGroup(" - + group + ")"); + Map executionExceptions; + try { + executionExceptions = awaitCompletion(responses, executor, null, logger, "createNodesInGroup(" + group + ")"); + } catch (TimeoutException te) { + throw propagate(te); + } Function fn = persistNodeCredentials.always(template.getOptions().getRunScript()); badNodes = Maps2.transformKeys(badNodes, fn); goodNodes = ImmutableSet.copyOf(Iterables.transform(goodNodes, fn)); @@ -548,7 +554,11 @@ public class BaseComputeService implements ComputeService { responses.put(runner.getNode(), executor.submit(new RunScriptOnNodeAndAddToGoodMapOrPutExceptionIntoBadMap( runner, goodNodes, badNodes))); } - exceptions = awaitCompletion(responses, executor, null, logger, "runScriptOnNodesMatching(" + filter + ")"); + try { + exceptions = awaitCompletion(responses, executor, null, logger, "runScriptOnNodesMatching(" + filter + ")"); + } catch (TimeoutException te) { + throw propagate(te); + } } Function fn = persistNodeCredentials.ifAdminAccess(runScript); diff --git a/core/src/main/java/org/jclouds/concurrent/FutureIterables.java b/core/src/main/java/org/jclouds/concurrent/FutureIterables.java index 119f5807a5..6cbdcb17e0 100644 --- a/core/src/main/java/org/jclouds/concurrent/FutureIterables.java +++ b/core/src/main/java/org/jclouds/concurrent/FutureIterables.java @@ -98,7 +98,11 @@ public class FutureIterables { Future to = function.apply(from); responses.put(from, to); } - exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix); + try { + exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix); + } catch (TimeoutException te) { + throw propagate(te); + } if (exceptions.size() > 0 && !any(exceptions.values(), containsThrowable(AuthorizationException.class))) { fromIterable = exceptions.keySet(); retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries, @@ -116,7 +120,7 @@ public class FutureIterables { } public static Map awaitCompletion(Map> responses, ExecutorService exec, - @Nullable Long maxTime, final Logger logger, final String logPrefix) { + @Nullable Long maxTime, final Logger logger, final String logPrefix) throws TimeoutException { final ConcurrentMap errorMap = newConcurrentMap(); if (responses.size() == 0) return errorMap; @@ -150,7 +154,12 @@ public class FutureIterables { } try { if (maxTime != null) { - doneSignal.await(maxTime, TimeUnit.MILLISECONDS); + if (!doneSignal.await(maxTime, TimeUnit.MILLISECONDS)) { + String message = message(logPrefix, total, complete.get(), errors.get(), start); + TimeoutException te = new TimeoutException(message); + logger.error(te, message); + throw te; + } } else { doneSignal.await(); } @@ -163,11 +172,10 @@ public class FutureIterables { String message = message(logPrefix, total, complete.get(), errors.get(), start); logger.trace(message); } - } catch (InterruptedException e) { + } catch (InterruptedException ie) { String message = message(logPrefix, total, complete.get(), errors.get(), start); - TimeoutException exception = new TimeoutException(message); - logger.error(exception, message); - propagate(exception); + logger.error(ie, message); + throw propagate(ie); } return errorMap; } diff --git a/core/src/test/java/org/jclouds/concurrent/FutureIterablesPerformanceTest.java b/core/src/test/java/org/jclouds/concurrent/FutureIterablesPerformanceTest.java index 27b24218cc..057bdb78bf 100644 --- a/core/src/test/java/org/jclouds/concurrent/FutureIterablesPerformanceTest.java +++ b/core/src/test/java/org/jclouds/concurrent/FutureIterablesPerformanceTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import org.jclouds.logging.Logger; import org.testng.annotations.Test; @@ -61,8 +62,8 @@ public class FutureIterablesPerformanceTest { } @Test(enabled = false) - public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() throws InterruptedException, - ExecutionException { + public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() + throws InterruptedException, ExecutionException, TimeoutException { long expectedMax = IO_DURATION; long expectedMin = IO_DURATION; long expectedOverhead = COUNT + FUDGE; diff --git a/core/src/test/java/org/jclouds/concurrent/FutureIterablesTest.java b/core/src/test/java/org/jclouds/concurrent/FutureIterablesTest.java index 2071454233..7a40f15391 100644 --- a/core/src/test/java/org/jclouds/concurrent/FutureIterablesTest.java +++ b/core/src/test/java/org/jclouds/concurrent/FutureIterablesTest.java @@ -20,8 +20,13 @@ package org.jclouds.concurrent; import static org.jclouds.concurrent.FutureIterables.transformParallel; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.jclouds.logging.Logger; @@ -30,6 +35,7 @@ import org.testng.annotations.Test; import com.google.common.base.Function; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; /** * Tests behavior of FutureIterables @@ -80,4 +86,32 @@ public class FutureIterablesTest { } + public void testAwaitCompletionTimeout() throws Exception { + final long timeoutMs = 1000; + ExecutorService executorService = Executors.newSingleThreadExecutor(); + Map> responses = Maps.newHashMap(); + try { + responses.put(null, executorService.submit(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2 * timeoutMs); + } catch (InterruptedException ie) { + // triggered during shutdown + } + } + })); + Map errors = FutureIterables.awaitCompletion( + responses, executorService, timeoutMs, Logger.CONSOLE, + /*prefix=*/ ""); + if (!errors.isEmpty()) { + throw errors.values().iterator().next(); + } + fail("Did not throw TimeoutException"); + } catch (TimeoutException te) { + // expected + } finally { + executorService.shutdownNow(); + } + } } diff --git a/drivers/gae/src/test/java/org/jclouds/gae/AsyncGaeHttpCommandExecutorServiceIntegrationTest.java b/drivers/gae/src/test/java/org/jclouds/gae/AsyncGaeHttpCommandExecutorServiceIntegrationTest.java index c40ce4bd21..8906ae6e8f 100644 --- a/drivers/gae/src/test/java/org/jclouds/gae/AsyncGaeHttpCommandExecutorServiceIntegrationTest.java +++ b/drivers/gae/src/test/java/org/jclouds/gae/AsyncGaeHttpCommandExecutorServiceIntegrationTest.java @@ -76,7 +76,7 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC } @Test(enabled = false) - public void testPerformanceVsNothing() { + public void testPerformanceVsNothing() throws TimeoutException { setupApiProxy(); int count = 5; final URI fetch = URI.create("http://www.google.com"); @@ -159,7 +159,8 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC } - private Results getTest(int count, String who, Supplier> getSupplier, Consumer consumer) { + private Results getTest(int count, String who, Supplier> getSupplier, Consumer consumer) + throws TimeoutException { Results results = new Results(); results.count = count; results.who = who;