mirror of https://github.com/apache/jclouds.git
Throw TimeoutException from awaitCompletion
This is a common error that callers should interpret correctly. For DeleteAllKeysInList, we integrate into its retry and backoff logic, and for other callers, we continue to propagate RuntimeException.
This commit is contained in:
parent
97004c7f32
commit
336ccfa2e6
|
@ -19,6 +19,7 @@
|
||||||
package org.jclouds.atmos.blobstore.strategy;
|
package org.jclouds.atmos.blobstore.strategy;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static com.google.common.base.Throwables.propagate;
|
||||||
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -28,6 +29,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
@ -109,8 +111,13 @@ public class FindMD5InUserMetadata implements ContainsValueInListStrategy {
|
||||||
}, userExecutor);
|
}, userExecutor);
|
||||||
responses.put(md.getName(), future);
|
responses.put(md.getName(), future);
|
||||||
}
|
}
|
||||||
Map<String, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
|
Map<String, Exception> exceptions;
|
||||||
String.format("searching for md5 in container %s", containerName));
|
try {
|
||||||
|
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
|
||||||
|
String.format("searching for md5 in container %s", containerName));
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
throw propagate(te);
|
||||||
|
}
|
||||||
if (exceptions.size() > 0)
|
if (exceptions.size() > 0)
|
||||||
throw new BlobRuntimeException(String.format("searching for md5 in container %s: %s", containerName,
|
throw new BlobRuntimeException(String.format("searching for md5 in container %s: %s", containerName,
|
||||||
exceptions));
|
exceptions));
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.jclouds.blobstore.strategy.internal;
|
package org.jclouds.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import static com.google.common.base.Throwables.propagate;
|
||||||
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
|
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
|
||||||
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
||||||
|
|
||||||
|
@ -25,6 +26,7 @@ import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
@ -42,7 +44,6 @@ import org.jclouds.blobstore.strategy.ClearListStrategy;
|
||||||
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
|
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
|
||||||
import org.jclouds.logging.Logger;
|
import org.jclouds.logging.Logger;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
@ -103,7 +104,7 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
|
||||||
} catch (ExecutionException ee) {
|
} catch (ExecutionException ee) {
|
||||||
++numErrors;
|
++numErrors;
|
||||||
if (numErrors == maxErrors) {
|
if (numErrors == maxErrors) {
|
||||||
throw Throwables.propagate(ee.getCause());
|
throw propagate(ee.getCause());
|
||||||
}
|
}
|
||||||
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
|
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
|
||||||
continue;
|
continue;
|
||||||
|
@ -153,7 +154,16 @@ public class DeleteAllKeysInList implements ClearListStrategy, ClearContainerStr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
|
try {
|
||||||
|
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
++numErrors;
|
||||||
|
if (numErrors == maxErrors) {
|
||||||
|
throw propagate(te);
|
||||||
|
}
|
||||||
|
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (!exceptions.isEmpty()) {
|
if (!exceptions.isEmpty()) {
|
||||||
++numErrors;
|
++numErrors;
|
||||||
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
|
retryHandler.imposeBackoffExponentialDelay(numErrors, message);
|
||||||
|
|
|
@ -18,12 +18,14 @@
|
||||||
*/
|
*/
|
||||||
package org.jclouds.blobstore.strategy.internal;
|
package org.jclouds.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import static com.google.common.base.Throwables.propagate;
|
||||||
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
@ -98,8 +100,12 @@ public class MarkersDeleteDirectoryStrategy implements DeleteDirectoryStrategy {
|
||||||
}
|
}
|
||||||
String message = String.format("deleting directory %s in containerName: %s", directory,
|
String message = String.format("deleting directory %s in containerName: %s", directory,
|
||||||
containerName);
|
containerName);
|
||||||
Map<String, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
|
Map<String, Exception> exceptions;
|
||||||
message);
|
try {
|
||||||
|
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger, message);
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
throw propagate(te);
|
||||||
|
}
|
||||||
if (exceptions.size() > 0)
|
if (exceptions.size() > 0)
|
||||||
throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions));
|
throw new BlobRuntimeException(String.format("error %s: %s", message, exceptions));
|
||||||
assert !blobstore.directoryExists(containerName, directory) : String.format(
|
assert !blobstore.directoryExists(containerName, directory) : String.format(
|
||||||
|
|
|
@ -18,11 +18,13 @@
|
||||||
*/
|
*/
|
||||||
package org.jclouds.blobstore.strategy.internal;
|
package org.jclouds.blobstore.strategy.internal;
|
||||||
|
|
||||||
|
import static com.google.common.base.Throwables.propagate;
|
||||||
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
import static org.jclouds.concurrent.FutureIterables.awaitCompletion;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import javax.inject.Named;
|
import javax.inject.Named;
|
||||||
|
@ -71,8 +73,13 @@ public class PutBlobsStrategyImpl implements PutBlobsStrategy {
|
||||||
for (Blob blob : blobs) {
|
for (Blob blob : blobs) {
|
||||||
responses.put(blob, ablobstore.putBlob(containerName, blob));
|
responses.put(blob, ablobstore.putBlob(containerName, blob));
|
||||||
}
|
}
|
||||||
Map<Blob, Exception> exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
|
Map<Blob, Exception> exceptions;
|
||||||
String.format("putting into containerName: %s", containerName));
|
try {
|
||||||
|
exceptions = awaitCompletion(responses, userExecutor, maxTime, logger,
|
||||||
|
String.format("putting into containerName: %s", containerName));
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
throw propagate(te);
|
||||||
|
}
|
||||||
if (exceptions.size() > 0)
|
if (exceptions.size() > 0)
|
||||||
throw new BlobRuntimeException(String.format("error putting into container %s: %s",
|
throw new BlobRuntimeException(String.format("error putting into container %s: %s",
|
||||||
containerName, exceptions));
|
containerName, exceptions));
|
||||||
|
|
|
@ -42,6 +42,7 @@ import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
|
@ -119,7 +120,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
|
||||||
* http://groups.google.com/group/jclouds/browse_thread/thread/4a7c8d58530b287f
|
* http://groups.google.com/group/jclouds/browse_thread/thread/4a7c8d58530b287f
|
||||||
*/
|
*/
|
||||||
@Test(groups = { "integration", "live" })
|
@Test(groups = { "integration", "live" })
|
||||||
public void testPutFileParallel() throws InterruptedException, IOException {
|
public void testPutFileParallel() throws InterruptedException, IOException, TimeoutException {
|
||||||
|
|
||||||
File payloadFile = File.createTempFile("testPutFileParallel", "png");
|
File payloadFile = File.createTempFile("testPutFileParallel", "png");
|
||||||
Files.copy(InputSuppliers.of(getClass().getResource("/testimg.png").openStream()), payloadFile);
|
Files.copy(InputSuppliers.of(getClass().getResource("/testimg.png").openStream()), payloadFile);
|
||||||
|
@ -166,7 +167,7 @@ public class BaseBlobIntegrationTest extends BaseBlobStoreIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(groups = { "integration", "live" })
|
@Test(groups = { "integration", "live" })
|
||||||
public void testBigFileGets() throws InterruptedException, IOException {
|
public void testBigFileGets() throws InterruptedException, IOException, TimeoutException {
|
||||||
final String expectedContentDisposition = "attachment; filename=constit.txt";
|
final String expectedContentDisposition = "attachment; filename=constit.txt";
|
||||||
final String container = getContainerName();
|
final String container = getContainerName();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static com.google.common.base.Predicates.and;
|
import static com.google.common.base.Predicates.and;
|
||||||
import static com.google.common.base.Predicates.not;
|
import static com.google.common.base.Predicates.not;
|
||||||
import static com.google.common.base.Predicates.notNull;
|
import static com.google.common.base.Predicates.notNull;
|
||||||
|
import static com.google.common.base.Throwables.propagate;
|
||||||
import static com.google.common.collect.Iterables.filter;
|
import static com.google.common.collect.Iterables.filter;
|
||||||
import static com.google.common.collect.Maps.newLinkedHashMap;
|
import static com.google.common.collect.Maps.newLinkedHashMap;
|
||||||
import static com.google.common.collect.Sets.filter;
|
import static com.google.common.collect.Sets.filter;
|
||||||
|
@ -41,6 +42,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@ -212,8 +214,12 @@ public class BaseComputeService implements ComputeService {
|
||||||
|
|
||||||
Map<?, Future<Void>> responses = runNodesAndAddToSetStrategy.execute(group, count, template, goodNodes, badNodes,
|
Map<?, Future<Void>> responses = runNodesAndAddToSetStrategy.execute(group, count, template, goodNodes, badNodes,
|
||||||
customizationResponses);
|
customizationResponses);
|
||||||
Map<?, Exception> executionExceptions = awaitCompletion(responses, executor, null, logger, "createNodesInGroup("
|
Map<?, Exception> executionExceptions;
|
||||||
+ group + ")");
|
try {
|
||||||
|
executionExceptions = awaitCompletion(responses, executor, null, logger, "createNodesInGroup(" + group + ")");
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
throw propagate(te);
|
||||||
|
}
|
||||||
Function<NodeMetadata, NodeMetadata> fn = persistNodeCredentials.always(template.getOptions().getRunScript());
|
Function<NodeMetadata, NodeMetadata> fn = persistNodeCredentials.always(template.getOptions().getRunScript());
|
||||||
badNodes = Maps2.transformKeys(badNodes, fn);
|
badNodes = Maps2.transformKeys(badNodes, fn);
|
||||||
goodNodes = ImmutableSet.copyOf(Iterables.transform(goodNodes, fn));
|
goodNodes = ImmutableSet.copyOf(Iterables.transform(goodNodes, fn));
|
||||||
|
@ -548,7 +554,11 @@ public class BaseComputeService implements ComputeService {
|
||||||
responses.put(runner.getNode(), executor.submit(new RunScriptOnNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
|
responses.put(runner.getNode(), executor.submit(new RunScriptOnNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
|
||||||
runner, goodNodes, badNodes)));
|
runner, goodNodes, badNodes)));
|
||||||
}
|
}
|
||||||
exceptions = awaitCompletion(responses, executor, null, logger, "runScriptOnNodesMatching(" + filter + ")");
|
try {
|
||||||
|
exceptions = awaitCompletion(responses, executor, null, logger, "runScriptOnNodesMatching(" + filter + ")");
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
throw propagate(te);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Function<NodeMetadata, NodeMetadata> fn = persistNodeCredentials.ifAdminAccess(runScript);
|
Function<NodeMetadata, NodeMetadata> fn = persistNodeCredentials.ifAdminAccess(runScript);
|
||||||
|
|
|
@ -98,7 +98,11 @@ public class FutureIterables {
|
||||||
Future<? extends T> to = function.apply(from);
|
Future<? extends T> to = function.apply(from);
|
||||||
responses.put(from, to);
|
responses.put(from, to);
|
||||||
}
|
}
|
||||||
exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix);
|
try {
|
||||||
|
exceptions = awaitCompletion(responses, exec, maxTime, logger, logPrefix);
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
throw propagate(te);
|
||||||
|
}
|
||||||
if (exceptions.size() > 0 && !any(exceptions.values(), containsThrowable(AuthorizationException.class))) {
|
if (exceptions.size() > 0 && !any(exceptions.values(), containsThrowable(AuthorizationException.class))) {
|
||||||
fromIterable = exceptions.keySet();
|
fromIterable = exceptions.keySet();
|
||||||
retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries,
|
retryHandler.imposeBackoffExponentialDelay(delayStart, 2, i + 1, maxRetries,
|
||||||
|
@ -116,7 +120,7 @@ public class FutureIterables {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
|
public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
|
||||||
@Nullable Long maxTime, final Logger logger, final String logPrefix) {
|
@Nullable Long maxTime, final Logger logger, final String logPrefix) throws TimeoutException {
|
||||||
final ConcurrentMap<T, Exception> errorMap = newConcurrentMap();
|
final ConcurrentMap<T, Exception> errorMap = newConcurrentMap();
|
||||||
if (responses.size() == 0)
|
if (responses.size() == 0)
|
||||||
return errorMap;
|
return errorMap;
|
||||||
|
@ -150,7 +154,12 @@ public class FutureIterables {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (maxTime != null) {
|
if (maxTime != null) {
|
||||||
doneSignal.await(maxTime, TimeUnit.MILLISECONDS);
|
if (!doneSignal.await(maxTime, TimeUnit.MILLISECONDS)) {
|
||||||
|
String message = message(logPrefix, total, complete.get(), errors.get(), start);
|
||||||
|
TimeoutException te = new TimeoutException(message);
|
||||||
|
logger.error(te, message);
|
||||||
|
throw te;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
doneSignal.await();
|
doneSignal.await();
|
||||||
}
|
}
|
||||||
|
@ -163,11 +172,10 @@ public class FutureIterables {
|
||||||
String message = message(logPrefix, total, complete.get(), errors.get(), start);
|
String message = message(logPrefix, total, complete.get(), errors.get(), start);
|
||||||
logger.trace(message);
|
logger.trace(message);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException ie) {
|
||||||
String message = message(logPrefix, total, complete.get(), errors.get(), start);
|
String message = message(logPrefix, total, complete.get(), errors.get(), start);
|
||||||
TimeoutException exception = new TimeoutException(message);
|
logger.error(ie, message);
|
||||||
logger.error(exception, message);
|
throw propagate(ie);
|
||||||
propagate(exception);
|
|
||||||
}
|
}
|
||||||
return errorMap;
|
return errorMap;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.jclouds.logging.Logger;
|
import org.jclouds.logging.Logger;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
@ -61,8 +62,8 @@ public class FutureIterablesPerformanceTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(enabled = false)
|
@Test(enabled = false)
|
||||||
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures() throws InterruptedException,
|
public void testAwaitCompletionUsingSameThreadExecutorDoesntSerializeFutures()
|
||||||
ExecutionException {
|
throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
long expectedMax = IO_DURATION;
|
long expectedMax = IO_DURATION;
|
||||||
long expectedMin = IO_DURATION;
|
long expectedMin = IO_DURATION;
|
||||||
long expectedOverhead = COUNT + FUDGE;
|
long expectedOverhead = COUNT + FUDGE;
|
||||||
|
|
|
@ -20,8 +20,13 @@ package org.jclouds.concurrent;
|
||||||
|
|
||||||
import static org.jclouds.concurrent.FutureIterables.transformParallel;
|
import static org.jclouds.concurrent.FutureIterables.transformParallel;
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
|
import static org.testng.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.jclouds.logging.Logger;
|
import org.jclouds.logging.Logger;
|
||||||
|
@ -30,6 +35,7 @@ import org.testng.annotations.Test;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests behavior of FutureIterables
|
* Tests behavior of FutureIterables
|
||||||
|
@ -80,4 +86,32 @@ public class FutureIterablesTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testAwaitCompletionTimeout() throws Exception {
|
||||||
|
final long timeoutMs = 1000;
|
||||||
|
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||||
|
Map<Void, Future<?>> responses = Maps.newHashMap();
|
||||||
|
try {
|
||||||
|
responses.put(null, executorService.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(2 * timeoutMs);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
// triggered during shutdown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
Map<Void, Exception> errors = FutureIterables.awaitCompletion(
|
||||||
|
responses, executorService, timeoutMs, Logger.CONSOLE,
|
||||||
|
/*prefix=*/ "");
|
||||||
|
if (!errors.isEmpty()) {
|
||||||
|
throw errors.values().iterator().next();
|
||||||
|
}
|
||||||
|
fail("Did not throw TimeoutException");
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
// expected
|
||||||
|
} finally {
|
||||||
|
executorService.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(enabled = false)
|
@Test(enabled = false)
|
||||||
public void testPerformanceVsNothing() {
|
public void testPerformanceVsNothing() throws TimeoutException {
|
||||||
setupApiProxy();
|
setupApiProxy();
|
||||||
int count = 5;
|
int count = 5;
|
||||||
final URI fetch = URI.create("http://www.google.com");
|
final URI fetch = URI.create("http://www.google.com");
|
||||||
|
@ -159,7 +159,8 @@ public class AsyncGaeHttpCommandExecutorServiceIntegrationTest extends BaseHttpC
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Results getTest(int count, String who, Supplier<ListenableFuture<?>> getSupplier, Consumer consumer) {
|
private Results getTest(int count, String who, Supplier<ListenableFuture<?>> getSupplier, Consumer consumer)
|
||||||
|
throws TimeoutException {
|
||||||
Results results = new Results();
|
Results results = new Results();
|
||||||
results.count = count;
|
results.count = count;
|
||||||
results.who = who;
|
results.who = who;
|
||||||
|
|
Loading…
Reference in New Issue