Merge pull request #933 from andrewgaul/clear-container-timeout-exception-1.5.x

Throw TimeoutException from awaitCompletion
This commit is contained in:
Adrian Cole 2012-11-02 09:56:29 -07:00
commit e7354c6f4a
10 changed files with 112 additions and 27 deletions

View File

@ -19,6 +19,7 @@
package org.jclouds.atmos.blobstore.strategy;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Arrays;
@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
@ -109,8 +111,13 @@ public class FindMD5InUserMetadata implements ContainsValueInListStrategy {
}, userExecutor);
responses.put(md.getName(), future);
}
Map<String, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
String.format("searching for md5 in container %s", containerName));
Map<String, Exception> exceptions;
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
String.format("searching for md5 in container %s", containerName));
} catch (TimeoutException te) {
throw propagate(te);
}
if (exceptions.size() > 0)
throw new BlobRuntimeException(String.format("searching for md5 in container %s: %s", containerName,
exceptions));

View File

@ -18,6 +18,7 @@
*/
package org.jclouds.blobstore.strategy.internal;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
@ -25,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import javax.inject.Named;
@ -42,7 +44,6 @@ import org.jclouds.blobstore.strategy.ClearListStrategy;
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
import org.jclouds.logging.Logger;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
@ -103,7 +104,7 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
} catch (ExecutionException ee) {
++numErrors;
if (numErrors == maxErrors) {
throw Throwables.propagate(ee.getCause());
throw propagate(ee.getCause());
}
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
continue;
@ -153,7 +154,16 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
}
}
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
} catch (TimeoutException te) {
++numErrors;
if (numErrors == maxErrors) {
throw propagate(te);
}
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
continue;
}
if (!exceptions.isEmpty()) {
++numErrors;
retryHandler.imposeBackoffExponentialDelay(numErrors, message);

View File

@ -18,12 +18,14 @@
*/
package org.jclouds.blobstore.strategy.internal;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import javax.inject.Named;
@ -98,11 +100,15 @@ public class MarkersDeleteDirectoryStrategy implements DeleteDirectoryStrategy {
}
String message = String.format("deleting directory %s in containerName: %s", directory,
containerName);
Map<String, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
message);
Map<String, Exception> exceptions;
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
} catch (TimeoutException te) {
throw propagate(te);
}
if (exceptions.size() > 0)
throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions));
assert !blobstore.directoryExists(containerName, directory) : String.format(
"still exists %s: %s", message, exceptions);
}
}
}

View File

@ -18,11 +18,13 @@
*/
package org.jclouds.blobstore.strategy.internal;
import static com.google.common.base.Throwables.propagate;
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import javax.inject.Named;
@ -71,11 +73,16 @@ public class PutBlobsStrategyImpl implements PutBlobsStrategy {
for (Blob blob : blobs) {
responses.put(blob, ablobstore.putBlob(containerName, blob));
}
Map<Blob, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
String.format("putting into containerName: %s", containerName));
Map<Blob, Exception> exceptions;
try {
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
String.format("putting into containerName: %s", containerName));
} catch (TimeoutException te) {
throw propagate(te);
}
if (exceptions.size() > 0)
throw new BlobRuntimeException(String.format("error putting into container %s: %s",
containerName, exceptions));
}
}
}

View File

@ -42,6 +42,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;
@ -119,7 +120,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
* http://groups.google.com/group/jclouds/browse_thread/thread/4a7c8d58530b287f
*/
@Test(groups = { "integration", "live" })
public void testPutFileParallel() throws InterruptedException, IOException {
public void testPutFileParallel() throws InterruptedException, IOException, TimeoutException {
File payloadFile = File.createTempFile("testPutFileParallel", "png");
Files.copy(InputSuppliers.of(getClass().getResource("/testimg.png").openStream()), payloadFile);
@ -166,7 +167,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
}
@Test(groups = { "integration", "live" })
public void testBigFileGets() throws InterruptedException, IOException {
public void testBigFileGets() throws InterruptedException, IOException, TimeoutException {
final String expectedContentDisposition = "attachment; filename=constit.txt";
final String container = getContainerName();
try {

View File

@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
import static com.google.common.base.Predicates.notNull;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Maps.newLinkedHashMap;
import static com.google.common.collect.Sets.filter;
@ -41,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -212,8 +214,12 @@ public class BaseComputeService implements ComputeService {
Map<?, Future<Void>> responses = runNodesAndAddToSetStrategy.execute(group, count, template, goodNodes, badNodes,
customizationResponses);
Map<?, Exception> executionExceptions = awaitCompletion(responses, executor, null, logger, "createNodesInGroup("
+ group + ")");
Map<?, Exception> executionExceptions;
try {
executionExceptions = awaitCompletion(responses, executor, null, logger, "createNodesInGroup(" + group + ")");
} catch (TimeoutException te) {
throw propagate(te);
}
Function<NodeMetadata, NodeMetadata> fn = persistNodeCredentials.always(template.getOptions().getRunScript());
badNodes = Maps2.transformKeys(badNodes, fn);
goodNodes = ImmutableSet.copyOf(Iterables.transform(goodNodes, fn));
@ -548,7 +554,11 @@ public class BaseComputeService implements ComputeService {
responses.put(runner.getNode(), executor.submit(new RunScriptOnNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
runner, goodNodes, badNodes)));
}
exceptions = awaitCompletion(responses, executor, null, logger, "runScriptOnNodesMatching(" + filter + ")");
try {
exceptions = awaitCompletion(responses, executor, null, logger, "runScriptOnNodesMatching(" + filter + ")");
} catch (TimeoutException te) {
throw propagate(te);
}
}
Function<NodeMetadata, NodeMetadata> fn = persistNodeCredentials.ifAdminAccess(runScript);

View File

@ -98,7 +98,11 @@ public class FutureIterables {
Future<? extends T> to = function.apply(from);
responses.put(from, to);
}
exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix);
try {
exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix);
} catch (TimeoutException te) {
throw propagate(te);
}
if (exceptions.size() > 0 && !any(exceptions.values(), containsThrowable(AuthorizationException.class))) {
fromIterable = exceptions.keySet();
retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries,
@ -116,7 +120,7 @@ public class FutureIterables {
}
public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
@Nullable Long maxTime, final Logger logger, final String logPrefix) {
@Nullable Long maxTime, final Logger logger, final String logPrefix) throws TimeoutException {
final ConcurrentMap<T, Exception> errorMap = newConcurrentMap();
if (responses.size() == 0)
return errorMap;
@ -150,7 +154,12 @@ public class FutureIterables {
}
try {
if (maxTime != null) {
doneSignal.await(maxTime, TimeUnit.MILLISECONDS);
if (!doneSignal.await(maxTime, TimeUnit.MILLISECONDS)) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
TimeoutException te = new TimeoutException(message);
logger.error(te, message);
throw te;
}
} else {
doneSignal.await();
}
@ -163,11 +172,10 @@ public class FutureIterables {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
logger.trace(message);
}
} catch (InterruptedException e) {
} catch (InterruptedException ie) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
TimeoutException exception = new TimeoutException(message);
logger.error(exception, message);
propagate(exception);
logger.error(ie, message);
throw propagate(ie);
}
return errorMap;
}

View File

@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.jclouds.logging.Logger;
import org.testng.annotations.Test;
@ -61,8 +62,8 @@ public class FutureIterablesPerformanceTest {
}
@Test(enabled = false)
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() throws InterruptedException,
ExecutionException {
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures()
throws InterruptedException, ExecutionException, TimeoutException {
long expectedMax = IO_DURATION;
long expectedMin = IO_DURATION;
long expectedOverhead = COUNT + FUDGE;

View File

@ -20,8 +20,13 @@ package org.jclouds.concurrent;
import static org.jclouds.concurrent.FutureIterables.transformParallel;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jclouds.logging.Logger;
@ -30,6 +35,7 @@ import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
/**
* Tests behavior of FutureIterables
@ -80,4 +86,32 @@ public class FutureIterablesTest {
}
public void testAwaitCompletionTimeout() throws Exception {
final long timeoutMs = 1000;
ExecutorService executorService = Executors.newSingleThreadExecutor();
Map<Void, Future<?>> responses = Maps.newHashMap();
try {
responses.put(null, executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2 * timeoutMs);
} catch (InterruptedException ie) {
// triggered during shutdown
}
}
}));
Map<Void, Exception> errors = FutureIterables.awaitCompletion(
responses, executorService, timeoutMs, Logger.CONSOLE,
/*prefix=*/ "");
if (!errors.isEmpty()) {
throw errors.values().iterator().next();
}
fail("Did not throw TimeoutException");
} catch (TimeoutException te) {
// expected
} finally {
executorService.shutdownNow();
}
}
}

View File

@ -76,7 +76,7 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
}
@Test(enabled = false)
public void testPerformanceVsNothing() {
public void testPerformanceVsNothing() throws TimeoutException {
setupApiProxy();
int count = 5;
final URI fetch = URI.create("http://www.google.com");
@ -159,7 +159,8 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
}
private Results getTest(int count, String who, Supplier<ListenableFuture<?>> getSupplier, Consumer consumer) {
private Results getTest(int count, String who, Supplier<ListenableFuture<?>> getSupplier, Consumer consumer)
throws TimeoutException {
Results results = new Results();
results.count = count;
results.who = who;