Throw TimeoutException from awaitCompletion

This is a common error that callers should interpret correctly.  For
DeleteAllKeysInList, we integrate into its retry and backoff logic,
and for other callers, we continue to propagate RuntimeException.
This commit is contained in:
Andrew Gaul 2012-10-23 13:19:23 -07:00
parent 2beb5cf7f7
commit d58acbacae
10 changed files with 112 additions and 27 deletions

View File

@ -19,6 +19,7 @@
package org.jclouds.atmos.blobstore.strategy;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Arrays;
@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
@ -109,8 +111,13 @@ public class FindMD5InUserMetadata implements ContainsValueInListStrategy {
}, userExecutor);
responses.put(md.getName(), future);
}
Map<String, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
Map<String, Exception> exceptions;
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
String.format("searching for md5 in container %s", containerName));
} catch (TimeoutException te) {
throw propagate(te);
}
if (exceptions.size() > 0)
throw new BlobRuntimeException(String.format("searching for md5 in container %s: %s", containerName,
exceptions));

View File

@ -18,6 +18,7 @@
*/
package org.jclouds.blobstore.strategy.internal;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
@ -25,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import javax.inject.Named;
@ -42,7 +44,6 @@ import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
import org.jclouds.logging.Logger;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
@ -103,7 +104,7 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
} catch (ExecutionException ee) {
++numErrors;
if (numErrors == maxErrors) {
throw Throwables.propagate(ee.getCause());
throw propagate(ee.getCause());
}
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
continue;
@ -153,7 +154,16 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
}
}
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
} catch (TimeoutException te) {
++numErrors;
if (numErrors == maxErrors) {
throw propagate(te);
}
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
continue;
}
if (!exceptions.isEmpty()) {
++numErrors;
retryHandler.imposeBackoffExponentialDelay(numErrors, message);

View File

@ -18,12 +18,14 @@
*/
package org.jclouds.blobstore.strategy.internal;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import javax.inject.Named;
@ -98,8 +100,12 @@ public class MarkersDeleteDirectoryStrategy implements DeleteDirectoryStrategy {
}
String message = String.format("deleting directory %s in containerName: %s", directory,
containerName);
Map<String, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
message);
Map<String, Exception> exceptions;
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
} catch (TimeoutException te) {
throw propagate(te);
}
if (exceptions.size() > 0)
throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions));
assert !blobstore.directoryExists(containerName, directory) : String.format(

View File

@ -18,11 +18,13 @@
*/
package org.jclouds.blobstore.strategy.internal;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import javax.inject.Named;
@ -71,8 +73,13 @@ public class PutBlobsStrategyImpl implements PutBlobsStrategy {
for (Blob blob : blobs) {
responses.put(blob, ablobstore.putBlob(containerName, blob));
}
Map<Blob, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
Map<Blob, Exception> exceptions;
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
String.format("putting into containerName: %s", containerName));
} catch (TimeoutException te) {
throw propagate(te);
}
if (exceptions.size() > 0)
throw new BlobRuntimeException(String.format("error putting into container %s: %s",
containerName, exceptions));

View File

@ -42,6 +42,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;
@ -119,7 +120,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
* http://groups.google.com/group/jclouds/browse_thread/thread/4a7c8d58530b287f
*/
@Test(groups = { "integration", "live" })
public void testPutFileParallel() throws InterruptedException, IOException {
public void testPutFileParallel() throws InterruptedException, IOException, TimeoutException {
File payloadFile = File.createTempFile("testPutFileParallel", "png");
Files.copy(InputSuppliers.of(getClass().getResource("/testimg.png").openStream()), payloadFile);
@ -166,7 +167,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
}
@Test(groups = { "integration", "live" })
public void testBigFileGets() throws InterruptedException, IOException {
public void testBigFileGets() throws InterruptedException, IOException, TimeoutException {
final String expectedContentDisposition = "attachment; filename=constit.txt";
final String container = getContainerName();
try {

View File

@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
import static com.google.common.base.Predicates.notNull;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Maps.newLinkedHashMap;
import static com.google.common.collect.Sets.filter;
@ -41,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -212,8 +214,12 @@ public class BaseComputeService implements ComputeService {
Map<?, Future<Void>> responses = runNodesAndAddToSetStrategy.execute(group, count, template, goodNodes, badNodes,
customizationResponses);
Map<?, Exception> executionExceptions = awaitCompletion(responses, executor, null, logger, "createNodesInGroup("
+ group + ")");
Map<?, Exception> executionExceptions;
try {
executionExceptions = awaitCompletion(responses, executor, null, logger, "createNodesInGroup(" + group + ")");
} catch (TimeoutException te) {
throw propagate(te);
}
Function<NodeMetadata, NodeMetadata> fn = persistNodeCredentials.always(template.getOptions().getRunScript());
badNodes = Maps2.transformKeys(badNodes, fn);
goodNodes = ImmutableSet.copyOf(Iterables.transform(goodNodes, fn));
@ -548,7 +554,11 @@ public class BaseComputeService implements ComputeService {
responses.put(runner.getNode(), executor.submit(new RunScriptOnNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
runner, goodNodes, badNodes)));
}
try {
exceptions = awaitCompletion(responses, executor, null, logger, "runScriptOnNodesMatching(" + filter + ")");
} catch (TimeoutException te) {
throw propagate(te);
}
}
Function<NodeMetadata, NodeMetadata> fn = persistNodeCredentials.ifAdminAccess(runScript);

View File

@ -98,7 +98,11 @@ public class FutureIterables {
Future<? extends T> to = function.apply(from);
responses.put(from, to);
}
try {
exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix);
} catch (TimeoutException te) {
throw propagate(te);
}
if (exceptions.size() > 0 && !any(exceptions.values(), containsThrowable(AuthorizationException.class))) {
fromIterable = exceptions.keySet();
retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries,
@ -116,7 +120,7 @@ public class FutureIterables {
}
public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
@Nullable Long maxTime, final Logger logger, final String logPrefix) {
@Nullable Long maxTime, final Logger logger, final String logPrefix) throws TimeoutException {
final ConcurrentMap<T, Exception> errorMap = newConcurrentMap();
if (responses.size() == 0)
return errorMap;
@ -150,7 +154,12 @@ public class FutureIterables {
}
try {
if (maxTime != null) {
doneSignal.await(maxTime, TimeUnit.MILLISECONDS);
if (!doneSignal.await(maxTime, TimeUnit.MILLISECONDS)) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
TimeoutException te = new TimeoutException(message);
logger.error(te, message);
throw te;
}
} else {
doneSignal.await();
}
@ -163,11 +172,10 @@ public class FutureIterables {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
logger.trace(message);
}
} catch (InterruptedException e) {
} catch (InterruptedException ie) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
TimeoutException exception = new TimeoutException(message);
logger.error(exception, message);
propagate(exception);
logger.error(ie, message);
throw propagate(ie);
}
return errorMap;
}

View File

@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.jclouds.logging.Logger;
import org.testng.annotations.Test;
@ -61,8 +62,8 @@ public class FutureIterablesPerformanceTest {
}
@Test(enabled = false)
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() throws InterruptedException,
ExecutionException {
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures()
throws InterruptedException, ExecutionException, TimeoutException {
long expectedMax = IO_DURATION;
long expectedMin = IO_DURATION;
long expectedOverhead = COUNT + FUDGE;

View File

@ -20,8 +20,13 @@ package org.jclouds.concurrent;
import static org.jclouds.concurrent.FutureIterables.transformParallel;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jclouds.logging.Logger;
@ -30,6 +35,7 @@ import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
/**
* Tests behavior of FutureIterables
@ -80,4 +86,32 @@ public class FutureIterablesTest {
}
public void testAwaitCompletionTimeout() throws Exception {
final long timeoutMs = 1000;
ExecutorService executorService = Executors.newSingleThreadExecutor();
Map<Void, Future<?>> responses = Maps.newHashMap();
try {
responses.put(null, executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2 * timeoutMs);
} catch (InterruptedException ie) {
// triggered during shutdown
}
}
}));
Map<Void, Exception> errors = FutureIterables.awaitCompletion(
responses, executorService, timeoutMs, Logger.CONSOLE,
/*prefix=*/ "");
if (!errors.isEmpty()) {
throw errors.values().iterator().next();
}
fail("Did not throw TimeoutException");
} catch (TimeoutException te) {
// expected
} finally {
executorService.shutdownNow();
}
}
}

View File

@ -76,7 +76,7 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
}
@Test(enabled = false)
public void testPerformanceVsNothing() {
public void testPerformanceVsNothing() throws TimeoutException {
setupApiProxy();
int count = 5;
final URI fetch = URI.create("http://www.google.com");
@ -159,7 +159,8 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
}
private Results getTest(int count, String who, Supplier<ListenableFuture<?>> getSupplier, Consumer consumer) {
private Results getTest(int count, String who, Supplier<ListenableFuture<?>> getSupplier, Consumer consumer)
throws TimeoutException {
Results results = new Results();
results.count = count;
results.who = who;