mirror of https://github.com/apache/jclouds.git
removed custom version of guava Futures, awkward DescribedFuture, and migrated to native ListenableFuture
This commit is contained in:
parent
103d327354
commit
713a498c22
|
@ -27,16 +27,14 @@ import static org.jclouds.util.Throwables2.containsThrowable;
|
|||
import static org.jclouds.util.Throwables2.propagateAuthorizationOrOriginalException;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Named;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
|
@ -47,6 +45,8 @@ import org.jclouds.rest.AuthorizationException;
|
|||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
|
@ -69,21 +69,21 @@ public class FutureIterables {
|
|||
private static BackoffLimitedRetryHandler retryHandler = BackoffLimitedRetryHandler.INSTANCE;
|
||||
|
||||
public static <F, T> Iterable<T> transformParallel(final Iterable<F> fromIterable,
|
||||
final Function<? super F, Future<? extends T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger,
|
||||
final Function<? super F, ListenableFuture<? extends T>> function, ListeningExecutorService exec, @Nullable Long maxTime, Logger logger,
|
||||
String logPrefix) {
|
||||
return transformParallel(fromIterable, function, exec, maxTime, logger, logPrefix, retryHandler, maxRetries);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <F, T> Iterable<T> transformParallel(Iterable<F> fromIterable,
|
||||
Function<? super F, Future<? extends T>> function, ExecutorService exec, @Nullable Long maxTime, Logger logger,
|
||||
Function<? super F, ListenableFuture<? extends T>> function, ListeningExecutorService exec, @Nullable Long maxTime, Logger logger,
|
||||
String logPrefix, BackoffLimitedRetryHandler retryHandler, int maxRetries) {
|
||||
Map<F, Exception> exceptions = newHashMap();
|
||||
Map<F, Future<? extends T>> responses = newHashMap();
|
||||
Map<F, ListenableFuture<? extends T>> responses = newHashMap();
|
||||
for (int i = 0; i < maxRetries; i++) {
|
||||
|
||||
for (F from : fromIterable) {
|
||||
Future<? extends T> to = function.apply(from);
|
||||
ListenableFuture<? extends T> to = function.apply(from);
|
||||
responses.put(from, to);
|
||||
}
|
||||
try {
|
||||
|
@ -107,9 +107,10 @@ public class FutureIterables {
|
|||
return unwrap(responses.values());
|
||||
}
|
||||
|
||||
public static <T> Map<T, Exception> awaitCompletion(Map<T, ? extends Future<?>> responses, ExecutorService exec,
|
||||
@Nullable Long maxTime, final Logger logger, final String logPrefix) throws TimeoutException {
|
||||
final ConcurrentMap<T, Exception> errorMap = newConcurrentMap();
|
||||
public static <F> Map<F, Exception> awaitCompletion(Map<F, ? extends ListenableFuture<?>> responses,
|
||||
ListeningExecutorService exec, @Nullable Long maxTime, final Logger logger, final String logPrefix)
|
||||
throws TimeoutException {
|
||||
final ConcurrentMap<F, Exception> errorMap = newConcurrentMap();
|
||||
if (responses.size() == 0)
|
||||
return errorMap;
|
||||
final int total = responses.size();
|
||||
|
@ -117,8 +118,8 @@ public class FutureIterables {
|
|||
final AtomicInteger complete = new AtomicInteger(0);
|
||||
final AtomicInteger errors = new AtomicInteger(0);
|
||||
final long start = System.currentTimeMillis();
|
||||
for (final java.util.Map.Entry<T, ? extends Future<?>> future : responses.entrySet()) {
|
||||
Futures.makeListenable(future.getValue(), exec).addListener(new Runnable() {
|
||||
for (final Entry<F, ? extends ListenableFuture<?>> future : responses.entrySet()) {
|
||||
future.getValue().addListener(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -168,10 +169,10 @@ public class FutureIterables {
|
|||
return errorMap;
|
||||
}
|
||||
|
||||
private static <T> Iterable<T> unwrap(Iterable<Future<? extends T>> values) {
|
||||
return transform(values, new Function<Future<? extends T>, T>() {
|
||||
private static <T> Iterable<T> unwrap(Iterable<ListenableFuture<? extends T>> values) {
|
||||
return transform(values, new Function<ListenableFuture<? extends T>, T>() {
|
||||
@Override
|
||||
public T apply(Future<? extends T> from) {
|
||||
public T apply(ListenableFuture<? extends T> from) {
|
||||
try {
|
||||
return from.get();
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -1,282 +0,0 @@
|
|||
/**
|
||||
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||
* contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. jclouds licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jclouds.concurrent;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ForwardingObject;
|
||||
import com.google.common.util.concurrent.ExecutionList;
|
||||
import com.google.common.util.concurrent.ForwardingFuture;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* functions related to or replacing those in {@link com.google.common.util.concurrent.Futures}
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@Beta
|
||||
public class Futures {
|
||||
@VisibleForTesting
|
||||
static class CallGetAndRunExecutionList<T> implements Runnable {
|
||||
private final Future<T> delegate;
|
||||
private final ExecutionList executionList;
|
||||
|
||||
CallGetAndRunExecutionList(Future<T> delegate, ExecutionList executionList) {
|
||||
this.delegate = checkNotNull(delegate, "delegate");
|
||||
this.executionList = checkNotNull(executionList, "executionList");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
delegate.get();
|
||||
} catch (InterruptedException e) {
|
||||
// This thread was interrupted. This should never happen, so we
|
||||
// throw an IllegalStateException.
|
||||
Thread.currentThread().interrupt();
|
||||
// TODO we cannot inspect the executionList at the moment to make a reasonable
|
||||
// toString()
|
||||
throw new IllegalStateException(String.format(
|
||||
"interrupted calling get() on [%s], so could not run listeners", delegate), e);
|
||||
} catch (Throwable e) {
|
||||
// ExecutionException / CancellationException / RuntimeException
|
||||
// The task is done, run the listeners.
|
||||
}
|
||||
executionList.execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[delegate=" + delegate + ", executionList=" + executionList + "]";
|
||||
}
|
||||
}
|
||||
|
||||
// Adapted from Guava
|
||||
//
|
||||
// * to allow us to enforce supply of an adapterExecutor
|
||||
// note that this is done so that we can operate in Google AppEngine which
|
||||
// restricts thread creation
|
||||
// * to allow us to print debug info about what the delegate was doing
|
||||
private static class FutureListener<T> {
|
||||
|
||||
private final ExecutorService adapterExecutor;
|
||||
|
||||
// The execution list to hold our listeners.
|
||||
private final ExecutionList executionList = new ExecutionList();
|
||||
|
||||
// This allows us to only start up a thread waiting on the delegate future
|
||||
// when the first listener is added.
|
||||
private final AtomicBoolean hasListeners = new AtomicBoolean(false);
|
||||
|
||||
// The delegate future.
|
||||
private final Future<T> delegate;
|
||||
|
||||
private static <T> FutureListener<T> create(Future<T> delegate, ExecutorService adapterExecutor) {
|
||||
return new FutureListener<T>(delegate, adapterExecutor);
|
||||
}
|
||||
|
||||
private FutureListener(Future<T> delegate, ExecutorService adapterExecutor) {
|
||||
this.delegate = checkNotNull(delegate, "delegate");
|
||||
this.adapterExecutor = checkNotNull(adapterExecutor, "adapterExecutor");
|
||||
}
|
||||
|
||||
private void addListener(Runnable listener, Executor exec) {
|
||||
executionList.add(listener, exec);
|
||||
|
||||
// When a listener is first added, we run a task that will wait for
|
||||
// the delegate to finish, and when it is done will run the listeners.
|
||||
if (hasListeners.compareAndSet(false, true)) {
|
||||
if (delegate.isDone()) {
|
||||
// If the delegate is already done, run the execution list
|
||||
// immediately on the current thread.
|
||||
executionList.execute();
|
||||
return;
|
||||
}
|
||||
adapterExecutor.execute(new CallGetAndRunExecutionList<T>(delegate, executionList));
|
||||
}
|
||||
}
|
||||
|
||||
private Future<T> getFuture() {
|
||||
return delegate;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ListenableFutureAdapter<T> extends ForwardingFuture<T> implements ListenableFuture<T> {
|
||||
private final FutureListener<T> futureListener;
|
||||
|
||||
private static <T> ListenableFutureAdapter<T> create(Future<T> future, ExecutorService executor) {
|
||||
return new ListenableFutureAdapter<T>(future, executor);
|
||||
}
|
||||
|
||||
private ListenableFutureAdapter(Future<T> future, ExecutorService executor) {
|
||||
this.futureListener = FutureListener.create(future, executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Future<T> delegate() {
|
||||
return futureListener.getFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Runnable listener, Executor exec) {
|
||||
futureListener.addListener(listener, exec);
|
||||
}
|
||||
}
|
||||
|
||||
private static class LazyListenableFutureFunctionAdapter<I, O> extends ForwardingObject implements
|
||||
ListenableFuture<O> {
|
||||
private final FutureListener<I> futureListener;
|
||||
private final Function<? super I, ? extends O> function;
|
||||
|
||||
private static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(Future<I> future,
|
||||
Function<? super I, ? extends O> function, ExecutorService executor) {
|
||||
return new LazyListenableFutureFunctionAdapter<I, O>(future, function, executor);
|
||||
}
|
||||
|
||||
private static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(FutureListener<I> futureListener,
|
||||
Function<? super I, ? extends O> function) {
|
||||
return new LazyListenableFutureFunctionAdapter<I, O>(futureListener, function);
|
||||
}
|
||||
|
||||
private LazyListenableFutureFunctionAdapter(Future<I> future, Function<? super I, ? extends O> function,
|
||||
ExecutorService executor) {
|
||||
this(FutureListener.create(future, executor), function);
|
||||
}
|
||||
|
||||
private LazyListenableFutureFunctionAdapter(FutureListener<I> futureListener,
|
||||
Function<? super I, ? extends O> function) {
|
||||
this.futureListener = checkNotNull(futureListener, "futureListener");
|
||||
this.function = checkNotNull(function, "function");
|
||||
}
|
||||
|
||||
/*
|
||||
* Concurrency detail:
|
||||
*
|
||||
* <p>To preserve the idempotency of calls to this.get(*) calls to the function are only
|
||||
* applied once. A lock is required to prevent multiple applications of the function. The
|
||||
* calls to future.get(*) are performed outside the lock, as is required to prevent calls to
|
||||
* get(long, TimeUnit) to persist beyond their timeout.
|
||||
*
|
||||
* <p>Calls to future.get(*) on every call to this.get(*) also provide the cancellation
|
||||
* behavior for this.
|
||||
*
|
||||
* <p>(Consider: in thread A, call get(), in thread B call get(long, TimeUnit). Thread B may
|
||||
* have to wait for Thread A to finish, which would be unacceptable.)
|
||||
*
|
||||
* <p>Note that each call to Future<O>.get(*) results in a call to Future<I>.get(*), but the
|
||||
* function is only applied once, so Future<I>.get(*) is assumed to be idempotent.
|
||||
*/
|
||||
|
||||
private final Object lock = new Object();
|
||||
private boolean set = false;
|
||||
private O value = null;
|
||||
|
||||
@Override
|
||||
public O get() throws InterruptedException, ExecutionException {
|
||||
return apply(futureListener.getFuture().get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public O get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
return apply(futureListener.getFuture().get(timeout, unit));
|
||||
}
|
||||
|
||||
private O apply(I raw) {
|
||||
synchronized (lock) {
|
||||
if (!set) {
|
||||
value = function.apply(raw);
|
||||
set = true;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return futureListener.getFuture().cancel(mayInterruptIfRunning);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return futureListener.getFuture().isCancelled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return futureListener.getFuture().isDone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(Runnable listener, Executor exec) {
|
||||
futureListener.addListener(listener, exec);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object delegate() {
|
||||
return futureListener.getFuture();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Just like {@code Futures#compose} except that we check the type of the executorService before
|
||||
* creating the Future. If we are single threaded, invoke the function lazy as opposed to
|
||||
* chaining, so that we don't invoke get() early.
|
||||
*/
|
||||
public static <I, O> ListenableFuture<O> compose(Future<I> future, final Function<? super I, ? extends O> function,
|
||||
ExecutorService executorService) {
|
||||
if (future instanceof Futures.ListenableFutureAdapter<?>) {
|
||||
Futures.ListenableFutureAdapter<I> lf = (ListenableFutureAdapter<I>) future;
|
||||
if (lf.futureListener.adapterExecutor.getClass().isAnnotationPresent(SingleThreaded.class))
|
||||
return Futures.LazyListenableFutureFunctionAdapter.create(
|
||||
((org.jclouds.concurrent.Futures.ListenableFutureAdapter<I>) future).futureListener, function);
|
||||
else
|
||||
return com.google.common.util.concurrent.Futures.transform(lf, function, executorService);
|
||||
} else if (executorService.getClass().isAnnotationPresent(SingleThreaded.class)) {
|
||||
return Futures.LazyListenableFutureFunctionAdapter.create(future, function, executorService);
|
||||
} else {
|
||||
return com.google.common.util.concurrent.Futures.transform(Futures.makeListenable(future, executorService),
|
||||
function, executorService);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Just like {@code Futures#makeListenable} except that we pass in an executorService.
|
||||
* <p/>
|
||||
* Temporary hack until http://code.google.com/p/guava-libraries/issues/detail?id=317 is fixed.
|
||||
*/
|
||||
public static <T> ListenableFuture<T> makeListenable(Future<T> future, ExecutorService executorService) {
|
||||
if (future instanceof ListenableFuture<?>) {
|
||||
return (ListenableFuture<T>) future;
|
||||
}
|
||||
return ListenableFutureAdapter.create(future, executorService);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,130 +0,0 @@
|
|||
/**
|
||||
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||
* contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. jclouds licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jclouds.concurrent.config;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
class DescribedFuture<T> implements Future<T> {
|
||||
protected final Future<T> delegate;
|
||||
private final String description;
|
||||
private StackTraceElement[] submissionTrace;
|
||||
|
||||
DescribedFuture(Future<T> delegate, String description, StackTraceElement[] submissionTrace) {
|
||||
this.delegate = delegate;
|
||||
this.description = description;
|
||||
this.submissionTrace = submissionTrace;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean arg0) {
|
||||
return delegate.cancel(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get() throws InterruptedException, ExecutionException {
|
||||
try {
|
||||
return delegate.get();
|
||||
} catch (ExecutionException e) {
|
||||
throw ensureCauseHasSubmissionTrace(e);
|
||||
} catch (InterruptedException e) {
|
||||
throw ensureCauseHasSubmissionTrace(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
try {
|
||||
return delegate.get(arg0, arg1);
|
||||
} catch (ExecutionException e) {
|
||||
throw ensureCauseHasSubmissionTrace(e);
|
||||
} catch (InterruptedException e) {
|
||||
throw ensureCauseHasSubmissionTrace(e);
|
||||
} catch (TimeoutException e) {
|
||||
throw ensureCauseHasSubmissionTrace(e);
|
||||
}
|
||||
}
|
||||
|
||||
/** This method does the work to ensure _if_ a submission stack trace was provided,
|
||||
* it is included in the exception. most errors are thrown from the frame of the
|
||||
* Future.get call, with a cause that took place in the executor's thread.
|
||||
* We extend the stack trace of that cause with the submission stack trace.
|
||||
* (An alternative would be to put the stack trace as a root cause,
|
||||
* at the bottom of the stack, or appended to all traces, or inserted
|
||||
* after the second cause, etc ... but since we can't change the "Caused by:"
|
||||
* method in Throwable the compromise made here seems best.)
|
||||
*/
|
||||
private <ET extends Exception> ET ensureCauseHasSubmissionTrace(ET e) {
|
||||
if (submissionTrace==null) return e;
|
||||
if (e.getCause()==null) {
|
||||
ExecutionException ee = new ExecutionException("task submitted from the following trace", null);
|
||||
e.initCause(ee);
|
||||
return e;
|
||||
}
|
||||
Throwable cause = e.getCause();
|
||||
StackTraceElement[] causeTrace = cause.getStackTrace();
|
||||
boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length;
|
||||
for (int i=0; causeIncludesSubmissionTrace && i<submissionTrace.length; i++) {
|
||||
if (!causeTrace[causeTrace.length-1-i].equals(submissionTrace[submissionTrace.length-1-i])) {
|
||||
causeIncludesSubmissionTrace = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!causeIncludesSubmissionTrace) {
|
||||
cause.setStackTrace(merge(causeTrace, submissionTrace));
|
||||
}
|
||||
|
||||
return e;
|
||||
}
|
||||
|
||||
private StackTraceElement[] merge(StackTraceElement[] t1, StackTraceElement[] t2) {
|
||||
StackTraceElement[] t12 = new StackTraceElement[t1.length + t2.length];
|
||||
System.arraycopy(t1, 0, t12, 0, t1.length);
|
||||
System.arraycopy(t2, 0, t12, t1.length, t2.length);
|
||||
return t12;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return delegate.isCancelled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return delegate.isDone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return delegate.equals(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return delegate.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return description;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,124 +0,0 @@
|
|||
/**
|
||||
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||
* contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. jclouds licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jclouds.concurrent.config;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static org.jclouds.concurrent.config.ExecutorServiceModule.getStackTraceHere;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class DescribingExecutorService implements ExecutorService {
|
||||
|
||||
protected final ExecutorService delegate;
|
||||
|
||||
public DescribingExecutorService(ExecutorService delegate) {
|
||||
this.delegate = checkNotNull(delegate, "delegate");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
return delegate.awaitTermination(timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
|
||||
return delegate.invokeAll(tasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
return delegate.invokeAll(tasks, timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
|
||||
return delegate.invokeAny(tasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
return delegate.invokeAny(tasks, timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() {
|
||||
return delegate.isShutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTerminated() {
|
||||
return delegate.isTerminated();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
delegate.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Runnable> shutdownNow() {
|
||||
return delegate.shutdownNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
return new DescribedFuture<T>(delegate.submit(task), task.toString(), getStackTraceHere());
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public Future<?> submit(Runnable task) {
|
||||
return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Runnable task, T result) { // NO_UCD
|
||||
return new DescribedFuture<T>(delegate.submit(task, result), task.toString(), getStackTraceHere());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable arg0) {
|
||||
delegate.execute(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return delegate.equals(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return delegate.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return delegate.toString();
|
||||
}
|
||||
|
||||
}
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.jclouds.concurrent.config;
|
||||
|
||||
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
|
||||
|
@ -30,19 +31,19 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.jclouds.lifecycle.Closer;
|
||||
import org.jclouds.logging.Logger;
|
||||
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
|
||||
/**
|
||||
* Configures {@link ExecutorService}.
|
||||
* Configures {@link ListeningExecutorService}.
|
||||
*
|
||||
* Note that this uses threads.
|
||||
*
|
||||
|
@ -60,9 +61,9 @@ public class ExecutorServiceModule extends AbstractModule {
|
|||
@Resource
|
||||
private Logger logger = Logger.NULL;
|
||||
|
||||
private final ExecutorService service;
|
||||
private final ListeningExecutorService service;
|
||||
|
||||
private ShutdownExecutorOnClose(ExecutorService service) {
|
||||
private ShutdownExecutorOnClose(ListeningExecutorService service) {
|
||||
this.service = service;
|
||||
}
|
||||
|
||||
|
@ -74,26 +75,23 @@ public class ExecutorServiceModule extends AbstractModule {
|
|||
}
|
||||
}
|
||||
|
||||
final ExecutorService userExecutorFromConstructor;
|
||||
|
||||
final ExecutorService ioExecutorFromConstructor;
|
||||
|
||||
@Inject
|
||||
public ExecutorServiceModule(@Named(PROPERTY_USER_THREADS) ExecutorService userThreads,
|
||||
@Named(PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
|
||||
this.userExecutorFromConstructor = addToStringOnSubmit(userThreads);
|
||||
this.ioExecutorFromConstructor = addToStringOnSubmit(ioThreads);
|
||||
}
|
||||
|
||||
private ExecutorService addToStringOnSubmit(ExecutorService executor) {
|
||||
if (executor != null) {
|
||||
return new DescribingExecutorService(executor);
|
||||
}
|
||||
return executor;
|
||||
}
|
||||
final ListeningExecutorService userExecutorFromConstructor;
|
||||
final ListeningExecutorService ioExecutorFromConstructor;
|
||||
|
||||
public ExecutorServiceModule() {
|
||||
this(null, null);
|
||||
this.userExecutorFromConstructor = null;
|
||||
this.ioExecutorFromConstructor = null;
|
||||
}
|
||||
|
||||
public ExecutorServiceModule(@Named(PROPERTY_USER_THREADS) ExecutorService userExecutor,
|
||||
@Named(PROPERTY_IO_WORKER_THREADS) ExecutorService ioExecutor) {
|
||||
this(listeningDecorator(userExecutor), listeningDecorator(ioExecutor));
|
||||
}
|
||||
|
||||
public ExecutorServiceModule(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||
@Named(PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor) {
|
||||
this.userExecutorFromConstructor = WithSubmissionTrace.wrap(userExecutor);
|
||||
this.ioExecutorFromConstructor = WithSubmissionTrace.wrap(ioExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -103,50 +101,55 @@ public class ExecutorServiceModule extends AbstractModule {
|
|||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_USER_THREADS)
|
||||
ExecutorService provideExecutorService(@Named(PROPERTY_USER_THREADS) int count, Closer closer) { // NO_UCD
|
||||
ListeningExecutorService provideListeningUserExecutorService(@Named(PROPERTY_USER_THREADS) int count, Closer closer) { // NO_UCD
|
||||
if (userExecutorFromConstructor != null)
|
||||
return userExecutorFromConstructor;
|
||||
return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("user thread %d", count)), closer);
|
||||
return shutdownOnClose(WithSubmissionTrace.wrap(newThreadPoolNamed("user thread %d", count)), closer);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_IO_WORKER_THREADS)
|
||||
ExecutorService provideIOExecutor(@Named(PROPERTY_IO_WORKER_THREADS) int count, Closer closer) { // NO_UCD
|
||||
ListeningExecutorService provideListeningIOExecutorService(@Named(PROPERTY_IO_WORKER_THREADS) int count,
|
||||
Closer closer) { // NO_UCD
|
||||
if (ioExecutorFromConstructor != null)
|
||||
return ioExecutorFromConstructor;
|
||||
return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("i/o thread %d", count)), closer);
|
||||
return shutdownOnClose(WithSubmissionTrace.wrap(newThreadPoolNamed("i/o thread %d", count)), closer);
|
||||
}
|
||||
|
||||
static <T extends ExecutorService> T shutdownOnClose(final T service, Closer closer) {
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_USER_THREADS)
|
||||
ExecutorService provideUserExecutorService(@Named(PROPERTY_USER_THREADS) ListeningExecutorService in) { // NO_UCD
|
||||
return in;
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_IO_WORKER_THREADS)
|
||||
ExecutorService provideIOExecutorService(@Named(PROPERTY_IO_WORKER_THREADS) ListeningExecutorService in) { // NO_UCD
|
||||
return in;
|
||||
}
|
||||
|
||||
static <T extends ListeningExecutorService> T shutdownOnClose(final T service, Closer closer) {
|
||||
closer.addToClose(new ShutdownExecutorOnClose(service));
|
||||
return service;
|
||||
}
|
||||
|
||||
private ExecutorService newCachedThreadPoolNamed(String name) {
|
||||
return Executors.newCachedThreadPool(namedThreadFactory(name));
|
||||
private ListeningExecutorService newCachedThreadPoolNamed(String name) {
|
||||
return listeningDecorator(Executors.newCachedThreadPool(namedThreadFactory(name)));
|
||||
}
|
||||
|
||||
private ExecutorService newThreadPoolNamed(String name, int maxCount) {
|
||||
private ListeningExecutorService newThreadPoolNamed(String name, int maxCount) {
|
||||
return maxCount == 0 ? newCachedThreadPoolNamed(name) : newScalingThreadPoolNamed(name, maxCount);
|
||||
}
|
||||
|
||||
private ExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
|
||||
return newScalingThreadPool(1, maxCount, 60L * 1000, namedThreadFactory(name));
|
||||
private ListeningExecutorService newScalingThreadPoolNamed(String name, int maxCount) {
|
||||
return listeningDecorator(newScalingThreadPool(1, maxCount, 60L * 1000, namedThreadFactory(name)));
|
||||
}
|
||||
|
||||
private ThreadFactory namedThreadFactory(String name) {
|
||||
return new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(Executors.defaultThreadFactory()).build();
|
||||
}
|
||||
|
||||
/** returns the stack trace at the caller */
|
||||
static StackTraceElement[] getStackTraceHere() {
|
||||
// remove the first two items in the stack trace (because the first one refers to the call to
|
||||
// Thread.getStackTrace, and the second one is us)
|
||||
StackTraceElement[] fullSubmissionTrace = Thread.currentThread().getStackTrace();
|
||||
StackTraceElement[] cleanedSubmissionTrace = new StackTraceElement[fullSubmissionTrace.length - 2];
|
||||
System.arraycopy(fullSubmissionTrace, 2, cleanedSubmissionTrace, 0, cleanedSubmissionTrace.length);
|
||||
return cleanedSubmissionTrace;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,22 +1,21 @@
|
|||
package org.jclouds.concurrent.config;
|
||||
|
||||
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
|
||||
import static java.util.concurrent.Executors.defaultThreadFactory;
|
||||
import static java.util.concurrent.Executors.newScheduledThreadPool;
|
||||
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
|
||||
import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS;
|
||||
import static org.jclouds.concurrent.config.ExecutorServiceModule.getStackTraceHere;
|
||||
import static org.jclouds.concurrent.config.ExecutorServiceModule.shutdownOnClose;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.jclouds.lifecycle.Closer;
|
||||
|
||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
|
@ -32,76 +31,28 @@ import com.google.inject.Provides;
|
|||
*
|
||||
*/
|
||||
public class ScheduledExecutorServiceModule extends AbstractModule {
|
||||
|
||||
private static class DescribingScheduledExecutorService extends DescribingExecutorService implements
|
||||
ScheduledExecutorService {
|
||||
|
||||
private DescribingScheduledExecutorService(ScheduledExecutorService delegate) {
|
||||
super(delegate);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
return new DescribedScheduledFuture(((ScheduledExecutorService) delegate).schedule(command, delay, unit),
|
||||
command.toString(), getStackTraceHere());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
return new DescribedScheduledFuture<V>(((ScheduledExecutorService) delegate).schedule(callable, delay, unit),
|
||||
callable.toString(), getStackTraceHere());
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
return new DescribedScheduledFuture(((ScheduledExecutorService) delegate).scheduleAtFixedRate(command,
|
||||
initialDelay, period, unit), command.toString(), getStackTraceHere());
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
return new DescribedScheduledFuture(((ScheduledExecutorService) delegate).scheduleWithFixedDelay(command,
|
||||
initialDelay, delay, unit), command.toString(), getStackTraceHere());
|
||||
}
|
||||
}
|
||||
|
||||
private static class DescribedScheduledFuture<T> extends DescribedFuture<T> implements ScheduledFuture<T> {
|
||||
|
||||
private DescribedScheduledFuture(ScheduledFuture<T> delegate, String description,
|
||||
StackTraceElement[] submissionTrace) {
|
||||
super(delegate, description, submissionTrace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
return ((ScheduledFuture<T>) delegate).getDelay(unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
return ((ScheduledFuture<T>) delegate).compareTo(o);
|
||||
}
|
||||
}
|
||||
|
||||
private static ScheduledExecutorService addToStringOnSchedule(ScheduledExecutorService executor) {
|
||||
return (executor != null) ? new DescribingScheduledExecutorService(executor) : executor;
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_SCHEDULER_THREADS)
|
||||
ListeningScheduledExecutorService provideListeningScheduledExecutorService(
|
||||
@Named(PROPERTY_SCHEDULER_THREADS) int count, Closer closer) {
|
||||
return shutdownOnClose(WithSubmissionTrace.wrap(newScheduledThreadPoolNamed("scheduler thread %d", count)),
|
||||
closer);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(PROPERTY_SCHEDULER_THREADS)
|
||||
ScheduledExecutorService provideScheduledExecutor(@Named(PROPERTY_SCHEDULER_THREADS) int count, Closer closer) {
|
||||
return shutdownOnClose(addToStringOnSchedule(newScheduledThreadPoolNamed("scheduler thread %d", count)), closer);
|
||||
ScheduledExecutorService provideScheduledExecutor(
|
||||
@Named(PROPERTY_SCHEDULER_THREADS) ListeningScheduledExecutorService in) {
|
||||
return in;
|
||||
}
|
||||
|
||||
private static ScheduledExecutorService newScheduledThreadPoolNamed(String name, int maxCount) {
|
||||
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(name)
|
||||
.setThreadFactory(Executors.defaultThreadFactory()).build();
|
||||
return maxCount == 0 ? Executors.newSingleThreadScheduledExecutor(factory) : Executors.newScheduledThreadPool(
|
||||
maxCount, factory);
|
||||
private static ListeningScheduledExecutorService newScheduledThreadPoolNamed(String name, int maxCount) {
|
||||
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(defaultThreadFactory())
|
||||
.build();
|
||||
return listeningDecorator(maxCount == 0 ? newSingleThreadScheduledExecutor(factory) : newScheduledThreadPool(
|
||||
maxCount, factory));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,255 @@
|
|||
/**
|
||||
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||
* contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. jclouds licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jclouds.concurrent.config;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.collect.Iterables.filter;
|
||||
import static com.google.common.collect.Iterables.toArray;
|
||||
import static com.google.common.collect.ObjectArrays.concat;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.util.concurrent.ForwardingFuture;
|
||||
import com.google.common.util.concurrent.ForwardingListenableFuture;
|
||||
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class WithSubmissionTrace {
|
||||
|
||||
private WithSubmissionTrace() {
|
||||
}
|
||||
|
||||
public static ListeningExecutorService wrap(com.google.common.util.concurrent.ListeningExecutorService delegate) {
|
||||
return new ListeningExecutorService(delegate);
|
||||
}
|
||||
|
||||
private static class ListeningExecutorService extends ForwardingListeningExecutorService {
|
||||
|
||||
private final com.google.common.util.concurrent.ListeningExecutorService delegate;
|
||||
|
||||
private ListeningExecutorService(com.google.common.util.concurrent.ListeningExecutorService delegate) {
|
||||
this.delegate = checkNotNull(delegate, "delegate");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected com.google.common.util.concurrent.ListeningExecutorService delegate() {
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> com.google.common.util.concurrent.ListenableFuture<T> submit(Callable<T> task) {
|
||||
return new ListenableFuture<T>(delegate().submit(task));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public com.google.common.util.concurrent.ListenableFuture<?> submit(Runnable task) {
|
||||
return new ListenableFuture(delegate().submit(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> com.google.common.util.concurrent.ListenableFuture<T> submit(Runnable task, T result) {
|
||||
return new ListenableFuture<T>(delegate().submit(task, result));
|
||||
}
|
||||
}
|
||||
|
||||
private static class ListenableFuture<T> extends ForwardingListenableFuture<T> {
|
||||
private final com.google.common.util.concurrent.ListenableFuture<T> delegate;
|
||||
private final StackTraceElement[] submissionTrace;
|
||||
|
||||
ListenableFuture(com.google.common.util.concurrent.ListenableFuture<T> delegate) {
|
||||
this.delegate = checkNotNull(delegate, "delegate");
|
||||
this.submissionTrace = getStackTraceHere();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected com.google.common.util.concurrent.ListenableFuture<T> delegate() {
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get() throws InterruptedException, ExecutionException {
|
||||
try {
|
||||
return delegate().get();
|
||||
} catch (ExecutionException e) {
|
||||
throw addSubmissionTrace(submissionTrace, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
try {
|
||||
return delegate().get(arg0, arg1);
|
||||
} catch (ExecutionException e) {
|
||||
throw addSubmissionTrace(submissionTrace, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final static Set<String> stackTracesToTrim = ImmutableSet.of(WithSubmissionTrace.class.getName(),
|
||||
ListeningExecutorService.class.getName(), ListenableFuture.class.getName(),
|
||||
ListeningScheduledExecutorService.class.getName(), ScheduledFuture.class.getName());
|
||||
|
||||
/** returns the stack trace at the caller */
|
||||
private static StackTraceElement[] getStackTraceHere() {
|
||||
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
|
||||
return filterTrace(trace);
|
||||
}
|
||||
|
||||
private static StackTraceElement[] filterTrace(StackTraceElement[] trace) {
|
||||
return toArray(filter(Arrays.asList(trace), new Predicate<StackTraceElement>() {
|
||||
public boolean apply(StackTraceElement input) {
|
||||
String className = input.getClassName();
|
||||
return !stackTracesToTrim.contains(className);
|
||||
}
|
||||
}), StackTraceElement.class);
|
||||
}
|
||||
|
||||
private static ExecutionException addSubmissionTrace(StackTraceElement[] submissionTrace, ExecutionException e) {
|
||||
if (e.getCause() == null) {
|
||||
return filterTrace(e);
|
||||
}
|
||||
Throwable cause = e.getCause();
|
||||
StackTraceElement[] combined = filterTrace(concat(cause.getStackTrace(), submissionTrace, StackTraceElement.class));
|
||||
cause.setStackTrace(combined);
|
||||
return filterTrace(e);
|
||||
}
|
||||
|
||||
private static ExecutionException filterTrace(ExecutionException e) {
|
||||
StackTraceElement[] withoutHere = filterTrace(e.getStackTrace());
|
||||
e.setStackTrace(withoutHere);
|
||||
return e;
|
||||
}
|
||||
|
||||
public static ListeningScheduledExecutorService wrap(
|
||||
com.google.common.util.concurrent.ListeningScheduledExecutorService delegate) {
|
||||
return new ListeningScheduledExecutorService(delegate);
|
||||
}
|
||||
|
||||
private static class ListeningScheduledExecutorService extends ListeningExecutorService implements
|
||||
com.google.common.util.concurrent.ListeningScheduledExecutorService {
|
||||
|
||||
private ListeningScheduledExecutorService(
|
||||
com.google.common.util.concurrent.ListeningScheduledExecutorService delegate) {
|
||||
super(delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected com.google.common.util.concurrent.ListeningScheduledExecutorService delegate() {
|
||||
return com.google.common.util.concurrent.ListeningScheduledExecutorService.class.cast(super.delegate());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> ListenableFuture<T> submit(Callable<T> task) {
|
||||
return new ListenableFuture<T>(delegate().submit(task));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public ListenableFuture<?> submit(Runnable task) {
|
||||
return new ListenableFuture(delegate().submit(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> ListenableFuture<T> submit(Runnable task, T result) {
|
||||
return new ListenableFuture<T>(delegate().submit(task, result));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
return new ScheduledFuture(delegate().schedule(command, delay, unit));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
return new ScheduledFuture<V>(delegate().schedule(callable, delay, unit));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
return new ScheduledFuture(delegate().scheduleAtFixedRate(command, initialDelay, period, unit));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
return new ScheduledFuture(delegate().scheduleWithFixedDelay(command, initialDelay, delay, unit));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class ScheduledFuture<T> extends ForwardingFuture<T> implements
|
||||
java.util.concurrent.ScheduledFuture<T> {
|
||||
|
||||
private final java.util.concurrent.ScheduledFuture<T> delegate;
|
||||
private final StackTraceElement[] submissionTrace;
|
||||
|
||||
private ScheduledFuture(java.util.concurrent.ScheduledFuture<T> delegate) {
|
||||
this.delegate = checkNotNull(delegate, "delegate");
|
||||
this.submissionTrace = getStackTraceHere();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected java.util.concurrent.ScheduledFuture<T> delegate() {
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit arg0) {
|
||||
return delegate().getDelay(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed arg0) {
|
||||
return delegate().compareTo(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get() throws InterruptedException, ExecutionException {
|
||||
try {
|
||||
return delegate().get();
|
||||
} catch (ExecutionException e) {
|
||||
throw addSubmissionTrace(submissionTrace, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
try {
|
||||
return delegate().get(arg0, arg1);
|
||||
} catch (ExecutionException e) {
|
||||
throw addSubmissionTrace(submissionTrace, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -22,8 +22,6 @@ package org.jclouds.events.config;
|
|||
import static com.google.inject.Scopes.SINGLETON;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
|
@ -33,6 +31,7 @@ import org.jclouds.events.handlers.DeadEventLoggingHandler;
|
|||
|
||||
import com.google.common.eventbus.AsyncEventBus;
|
||||
import com.google.common.eventbus.EventBus;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
|
||||
|
@ -56,9 +55,9 @@ public class EventBusModule extends AbstractModule {
|
|||
*/
|
||||
@Provides
|
||||
@Singleton
|
||||
AsyncEventBus provideAsyncEventBus(@Named(PROPERTY_USER_THREADS) ExecutorService executor,
|
||||
AsyncEventBus provideAsyncEventBus(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||
DeadEventLoggingHandler deadEventsHandler) {// NO_UCD
|
||||
AsyncEventBus asyncBus = new AsyncEventBus("jclouds-async-event-bus", executor);
|
||||
AsyncEventBus asyncBus = new AsyncEventBus("jclouds-async-event-bus", userExecutor);
|
||||
asyncBus.register(deadEventsHandler);
|
||||
return asyncBus;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.jclouds.http;
|
|||
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
/**
|
||||
* Capable of invoking http commands.
|
||||
*
|
||||
|
@ -35,5 +37,5 @@ public interface HttpCommandExecutorService {
|
|||
* that generates requests
|
||||
* @return {@link Future} containing the response from the {@code endpoint}
|
||||
*/
|
||||
Future<HttpResponse> submit(HttpCommand command);
|
||||
ListenableFuture<HttpResponse> submit(HttpCommand command);
|
||||
}
|
||||
|
|
|
@ -27,8 +27,6 @@ import java.io.FilterInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Inject;
|
||||
|
@ -50,6 +48,8 @@ import org.jclouds.logging.Logger;
|
|||
import org.jclouds.util.Throwables2;
|
||||
|
||||
import com.google.common.io.NullOutputStream;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -62,7 +62,7 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
|
|||
private final DelegatingRetryHandler retryHandler;
|
||||
private final IOExceptionRetryHandler ioRetryHandler;
|
||||
private final DelegatingErrorHandler errorHandler;
|
||||
private final ExecutorService ioWorkerExecutor;
|
||||
private final ListeningExecutorService ioExecutor;
|
||||
|
||||
@Resource
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
@ -74,7 +74,7 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
|
|||
|
||||
@Inject
|
||||
protected BaseHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire) {
|
||||
this.utils = checkNotNull(utils, "utils");
|
||||
|
@ -82,7 +82,7 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
|
|||
this.retryHandler = checkNotNull(retryHandler, "retryHandler");
|
||||
this.ioRetryHandler = checkNotNull(ioRetryHandler, "ioRetryHandler");
|
||||
this.errorHandler = checkNotNull(errorHandler, "errorHandler");
|
||||
this.ioWorkerExecutor = checkNotNull(ioWorkerExecutor, "ioWorkerExecutor");
|
||||
this.ioExecutor = checkNotNull(ioExecutor, "ioExecutor");
|
||||
this.wire = checkNotNull(wire, "wire");
|
||||
}
|
||||
|
||||
|
@ -124,12 +124,12 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
|
|||
}
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
HttpRequest request = command.getCurrentRequest();
|
||||
checkRequestHasContentLengthOrChunkedEncoding(request,
|
||||
"if the request has a payload, it must be set to chunked encoding or specify a content length: "
|
||||
+ request);
|
||||
return ioWorkerExecutor.submit(new HttpResponseCallable(command));
|
||||
return ioExecutor.submit(new HttpResponseCallable(command));
|
||||
}
|
||||
|
||||
public class HttpResponseCallable implements Callable<HttpResponse> {
|
||||
|
|
|
@ -44,7 +44,6 @@ import java.net.SocketAddress;
|
|||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
|
@ -69,6 +68,7 @@ import com.google.common.base.Supplier;
|
|||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.collect.ImmutableMultimap.Builder;
|
||||
import com.google.common.io.CountingOutputStream;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
|
@ -90,12 +90,12 @@ public class JavaUrlHttpCommandExecutorService extends BaseHttpCommandExecutorSe
|
|||
|
||||
@Inject
|
||||
public JavaUrlHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, @Named("untrusted") HostnameVerifier verifier,
|
||||
@Named("untrusted") Supplier<SSLContext> untrustedSSLContextProvider) throws SecurityException,
|
||||
NoSuchFieldException {
|
||||
super(utils, contentMetadataCodec, ioWorkerExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
if (utils.getMaxConnections() > 0)
|
||||
System.setProperty("http.maxConnections", String.valueOf(checkNotNull(utils, "utils").getMaxConnections()));
|
||||
this.untrustedSSLContextProvider = checkNotNull(untrustedSSLContextProvider, "untrustedSSLContextProvider");
|
||||
|
|
|
@ -20,17 +20,17 @@ package org.jclouds.lifecycle;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.jclouds.logging.Logger;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Atomics;
|
||||
|
||||
import org.jclouds.logging.Logger;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
/**
|
||||
* // TODO: Adrian: Document this!
|
||||
|
@ -41,14 +41,14 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle {
|
|||
@Resource
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
||||
protected final ExecutorService executorService;
|
||||
protected final ListeningExecutorService userExecutor;
|
||||
protected final List<LifeCycle> dependencies;
|
||||
protected final Object statusLock;
|
||||
protected volatile Status status;
|
||||
protected AtomicReference<Exception> exception = Atomics.newReference();
|
||||
|
||||
public BaseLifeCycle(ExecutorService executor, LifeCycle... dependencies) {
|
||||
this.executorService = executor;
|
||||
public BaseLifeCycle(ListeningExecutorService userExecutor, LifeCycle... dependencies) {
|
||||
this.userExecutor = userExecutor;
|
||||
this.dependencies = Lists.newArrayList();
|
||||
this.dependencies.addAll(Arrays.asList(dependencies));
|
||||
this.statusLock = new Object();
|
||||
|
@ -118,7 +118,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle {
|
|||
|
||||
this.status = Status.ACTIVE;
|
||||
}
|
||||
executorService.execute(this);
|
||||
userExecutor.execute(this);
|
||||
}
|
||||
|
||||
protected void exceptionIfDependenciesNotActive() {
|
||||
|
|
|
@ -23,23 +23,25 @@ import static com.google.common.collect.Sets.newHashSet;
|
|||
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
|
||||
import static com.google.inject.matcher.Matchers.any;
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.inject.Named;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.lifecycle.Closer;
|
||||
|
||||
import com.google.common.util.concurrent.ExecutionList;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Stage;
|
||||
|
@ -70,14 +72,14 @@ public class LifeCycleModule extends AbstractModule {
|
|||
|
||||
Closeable executorCloser = new Closeable() {
|
||||
@Inject
|
||||
@Named(Constants.PROPERTY_USER_THREADS)
|
||||
ExecutorService userExecutor;
|
||||
@Named(PROPERTY_USER_THREADS)
|
||||
ListeningExecutorService userExecutor;
|
||||
@Inject
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS)
|
||||
ExecutorService ioExecutor;
|
||||
@Named(PROPERTY_IO_WORKER_THREADS)
|
||||
ListeningExecutorService ioExecutor;
|
||||
// ScheduledExecutor is defined in an optional module
|
||||
@Inject(optional = true)
|
||||
@Named(Constants.PROPERTY_SCHEDULER_THREADS)
|
||||
@Named(PROPERTY_SCHEDULER_THREADS)
|
||||
ScheduledExecutorService scheduledExecutor;
|
||||
|
||||
public void close() throws IOException {
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.jclouds.rest;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.jclouds.crypto.Crypto;
|
||||
import org.jclouds.date.DateService;
|
||||
|
@ -31,6 +30,7 @@ import org.jclouds.xml.XMLParser;
|
|||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.eventbus.EventBus;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.ImplementedBy;
|
||||
import com.google.inject.Injector;
|
||||
|
||||
|
@ -103,19 +103,19 @@ public interface Utils {
|
|||
*/
|
||||
DateService date();
|
||||
|
||||
ExecutorService getUserExecutor();
|
||||
ListeningExecutorService getUserExecutor();
|
||||
|
||||
/**
|
||||
* #see #getUserExecutor
|
||||
*/
|
||||
ExecutorService userExecutor();
|
||||
ListeningExecutorService userExecutor();
|
||||
|
||||
ExecutorService getIoExecutor();
|
||||
ListeningExecutorService getIoExecutor();
|
||||
|
||||
/**
|
||||
* #see #getIoExecutor
|
||||
*/
|
||||
ExecutorService ioExecutor();
|
||||
ListeningExecutorService ioExecutor();
|
||||
|
||||
@Beta
|
||||
EventBus getEventBus();
|
||||
|
|
|
@ -23,9 +23,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.util.concurrent.Futures.transform;
|
||||
import static com.google.common.util.concurrent.Futures.withFallback;
|
||||
import static org.jclouds.concurrent.Futures.makeListenable;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Inject;
|
||||
|
@ -52,6 +49,7 @@ import com.google.common.reflect.Invokable;
|
|||
import com.google.common.reflect.TypeToken;
|
||||
import com.google.common.util.concurrent.FutureFallback;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.TypeLiteral;
|
||||
|
||||
|
@ -66,7 +64,7 @@ public class InvokeHttpMethod<S, A> implements Function<Invocation, Result> {
|
|||
private final RestAnnotationProcessor<A> annotationProcessor;
|
||||
private final HttpCommandExecutorService http;
|
||||
private final TransformerForRequest<A> transformerForRequest;
|
||||
private final ExecutorService userThreads;
|
||||
private final ListeningExecutorService userExecutor;
|
||||
private final BlockOnFuture.Factory blocker;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -74,13 +72,13 @@ public class InvokeHttpMethod<S, A> implements Function<Invocation, Result> {
|
|||
private InvokeHttpMethod(Injector injector, TypeLiteral<A> enclosingType,
|
||||
Cache<Invokable<?, ?>, Invokable<?, ?>> sync2AsyncInvokables, RestAnnotationProcessor<A> annotationProcessor,
|
||||
HttpCommandExecutorService http, TransformerForRequest<A> transformerForRequest,
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads, BlockOnFuture.Factory blocker) {
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor, BlockOnFuture.Factory blocker) {
|
||||
this.injector = injector;
|
||||
this.enclosingType = (TypeToken<A>) TypeToken.of(enclosingType.getType());
|
||||
this.sync2AsyncInvokables = sync2AsyncInvokables;
|
||||
this.annotationProcessor = annotationProcessor;
|
||||
this.http = http;
|
||||
this.userThreads = userThreads;
|
||||
this.userExecutor = userExecutor;
|
||||
this.blocker = blocker;
|
||||
this.transformerForRequest = transformerForRequest;
|
||||
}
|
||||
|
@ -125,8 +123,7 @@ public class InvokeHttpMethod<S, A> implements Function<Invocation, Result> {
|
|||
logger.trace("<< response from %s is parsed by %s", name, transformer.getClass().getSimpleName());
|
||||
|
||||
logger.debug(">> invoking %s", name);
|
||||
ListenableFuture<?> result = transform(makeListenable(http.submit(new HttpCommand(request)), userThreads),
|
||||
transformer);
|
||||
ListenableFuture<?> result = transform(http.submit(new HttpCommand(request)), transformer, userExecutor);
|
||||
|
||||
FutureFallback<?> fallback = fallbacks.getUnchecked(invocation.getInvokable());
|
||||
if (fallback instanceof InvocationContext) {
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.jclouds.rest.internal;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
|
@ -37,6 +36,7 @@ import org.jclouds.xml.XMLParser;
|
|||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.eventbus.EventBus;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
|
@ -51,8 +51,8 @@ public class UtilsImpl implements Utils {
|
|||
private final HttpAsyncClient simpleAsyncClient;
|
||||
private final Crypto encryption;
|
||||
private final DateService date;
|
||||
private final ExecutorService userExecutor;
|
||||
private final ExecutorService ioExecutor;
|
||||
private final ListeningExecutorService userExecutor;
|
||||
private final ListeningExecutorService ioExecutor;
|
||||
private final EventBus eventBus;
|
||||
private final Map<String, Credentials> credentialStore;
|
||||
private final LoggerFactory loggerFactory;
|
||||
|
@ -61,8 +61,8 @@ public class UtilsImpl implements Utils {
|
|||
|
||||
@Inject
|
||||
protected UtilsImpl(Injector injector, Json json, XMLParser xml, HttpClient simpleClient, HttpAsyncClient simpleAsyncClient,
|
||||
Crypto encryption, DateService date, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads, EventBus eventBus,
|
||||
Crypto encryption, DateService date, @Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor, EventBus eventBus,
|
||||
Map<String, Credentials> credentialStore, LoggerFactory loggerFactory) {
|
||||
this.injector = injector;
|
||||
this.json = json;
|
||||
|
@ -70,8 +70,8 @@ public class UtilsImpl implements Utils {
|
|||
this.simpleAsyncClient = simpleAsyncClient;
|
||||
this.encryption = encryption;
|
||||
this.date = date;
|
||||
this.userExecutor = userThreads;
|
||||
this.ioExecutor = ioThreads;
|
||||
this.userExecutor = userExecutor;
|
||||
this.ioExecutor = ioExecutor;
|
||||
this.eventBus = eventBus;
|
||||
this.credentialStore = credentialStore;
|
||||
this.loggerFactory = loggerFactory;
|
||||
|
@ -119,22 +119,22 @@ public class UtilsImpl implements Utils {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService getIoExecutor() {
|
||||
public ListeningExecutorService getIoExecutor() {
|
||||
return ioExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService getUserExecutor() {
|
||||
public ListeningExecutorService getUserExecutor() {
|
||||
return userExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService ioExecutor() {
|
||||
public ListeningExecutorService ioExecutor() {
|
||||
return ioExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService userExecutor() {
|
||||
public ListeningExecutorService userExecutor() {
|
||||
return userExecutor;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,257 +0,0 @@
|
|||
/**
|
||||
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||
* contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. jclouds licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package com.google.common.util.concurrent;
|
||||
|
||||
import static com.google.common.base.Throwables.propagate;
|
||||
import static com.google.common.collect.Maps.newHashMap;
|
||||
import static java.util.concurrent.Executors.newCachedThreadPool;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* In google appengine, we can get a future without using an executorservice, using its async http
|
||||
* fetch command. However, we still may need to do some conversions, or add listeners. In
|
||||
* googleappengine, we cannot employ a *real* executorservice, but we can employ a same thread
|
||||
* executor. This test identifies efficiencies that can be made by strengthening guava's handling of
|
||||
* same thread execution.
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* We simulate an i/o future by running a callable that simply sleeps. How this is created isn't
|
||||
* important.
|
||||
*
|
||||
* <ol>
|
||||
* <li>{@code IO_DURATION} is the time that the source future spends doing work</li>
|
||||
* <li>{@code LISTENER_DURATION} is the time that the attached listener or function</li>
|
||||
* </ol>
|
||||
*
|
||||
* The execution time of a transformd task within a composite should not be more than {@code
|
||||
* IO_DURATION} + {@code LISTENER_DURATION} + overhead when a threadpool is used. This is because
|
||||
* the listener should be invoked as soon as the result is available.
|
||||
* <p/>
|
||||
* The execution time of a transformd task within a composite should not be more than {@code
|
||||
* IO_DURATION} + {@code LISTENER_DURATION} * {@code COUNT} + overhead when caller thread is used
|
||||
* for handling the listeners.
|
||||
* <p/>
|
||||
* This test shows that Futures.transform eagerly issues a get() on the source future. code iterating
|
||||
* over futures and assigning listeners will take the same amount of time as calling get() on each
|
||||
* one, if using a within thread executor. This exposes an inefficiency which can make some use
|
||||
* cases in google appengine impossible to achieve within the cutoff limits.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@Test(groups = "performance", enabled = false, singleThreaded = true, testName = "FuturesTransformPerformanceTest")
|
||||
public class FuturesTransformPerformanceTest {
|
||||
private static final int FUDGE = 5;
|
||||
private static final int COUNT = 100;
|
||||
private static final int IO_DURATION = 50;
|
||||
private static final int LISTENER_DURATION = 100;
|
||||
|
||||
ExecutorService ioFunctionExecutor = newCachedThreadPool();
|
||||
|
||||
/**
|
||||
* When we use threadpools for both the chain and invoking listener, user experience is
|
||||
* consistent.
|
||||
*/
|
||||
@Test(enabled = false)
|
||||
public void whenCachedThreadPoolIsUsedForChainAndListenerMaxDurationIsSumOfCallableAndListener()
|
||||
throws InterruptedException, ExecutionException {
|
||||
long expectedMax = IO_DURATION + LISTENER_DURATION;
|
||||
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||
long expectedOverhead = COUNT * 4 + FUDGE;
|
||||
|
||||
ExecutorService userthreads = newCachedThreadPool();
|
||||
try {
|
||||
ExecutorService chainExecutor = userthreads;
|
||||
ExecutorService listenerExecutor = userthreads;
|
||||
|
||||
checkThresholdsUsingFuturesTransform(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||
} finally {
|
||||
userthreads.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When we use threadpools for the chain, but same thread for invoking listener, user experience
|
||||
* is still consistent.
|
||||
*/
|
||||
@Test(enabled = false)
|
||||
public void whenCachedThreadPoolIsUsedForChainButSameThreadForListenerMaxDurationIsSumOfCallableAndListener()
|
||||
throws InterruptedException, ExecutionException {
|
||||
long expectedMax = IO_DURATION + LISTENER_DURATION;
|
||||
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||
long expectedOverhead = COUNT + FUDGE;
|
||||
|
||||
ExecutorService userthreads = newCachedThreadPool();
|
||||
try {
|
||||
ExecutorService chainExecutor = userthreads;
|
||||
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
||||
|
||||
checkThresholdsUsingFuturesTransform(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||
} finally {
|
||||
userthreads.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When using same thread for the chain, the futures are being called (get()) eagerly, resulting
|
||||
* in the max duration being the sum of all i/o plus the cost of executing the listeners. In this
|
||||
* case, listeners are executed in a different thread pool.
|
||||
*
|
||||
*/
|
||||
@Test(enabled = false)
|
||||
public void whenSameThreadIsUsedForChainButCachedThreadPoolForListenerMaxDurationIsSumOfAllIOAndOneListener()
|
||||
throws InterruptedException, ExecutionException {
|
||||
long expectedMax = (IO_DURATION * COUNT) + LISTENER_DURATION;
|
||||
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||
long expectedOverhead = COUNT + FUDGE;
|
||||
|
||||
ExecutorService userthreads = newCachedThreadPool();
|
||||
try {
|
||||
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
||||
ExecutorService listenerExecutor = userthreads;
|
||||
|
||||
checkThresholdsUsingFuturesTransform(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||
} finally {
|
||||
userthreads.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This case can be optimized for sure. The side effect of the eager get() is that all i/o must
|
||||
* complete before *any* listeners are run. In this case, if you are inside google appengine and
|
||||
* using same thread executors, worst experience is sum of all io duration plus the sum of all
|
||||
* listener duration. An efficient implementation would call get() on the i/o future lazily. Such
|
||||
* an impl would have a max duration of I/O + Listener * count.
|
||||
*/
|
||||
@Test(enabled = false)
|
||||
public void whenSameThreadIsUsedForChainAndListenerMaxDurationIsSumOfAllIOAndAllListeners()
|
||||
throws InterruptedException, ExecutionException {
|
||||
|
||||
long expectedMax = (IO_DURATION * COUNT) + (LISTENER_DURATION * COUNT);
|
||||
long expectedMin = IO_DURATION + LISTENER_DURATION;
|
||||
long expectedOverhead = COUNT + FUDGE;
|
||||
|
||||
ExecutorService userthreads = newCachedThreadPool();
|
||||
try {
|
||||
ExecutorService chainExecutor = MoreExecutors.sameThreadExecutor();
|
||||
ExecutorService listenerExecutor = MoreExecutors.sameThreadExecutor();
|
||||
|
||||
checkThresholdsUsingFuturesTransform(expectedMin, expectedMax, expectedOverhead, chainExecutor, listenerExecutor);
|
||||
} finally {
|
||||
userthreads.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkThresholdsUsingFuturesTransform(long expectedMin, long expectedMax, long expectedOverhead,
|
||||
ExecutorService chainExecutor, final ExecutorService listenerExecutor) {
|
||||
long start = System.currentTimeMillis();
|
||||
Map<String, Future<Long>> responses = newHashMap();
|
||||
for (int i = 0; i < COUNT; i++)
|
||||
responses.put(i + "", Futures.transform(JdkFutureAdapters.listenInPoolThread(simultateIO(), chainExecutor),
|
||||
new Function<Long, Long>() {
|
||||
|
||||
@Override
|
||||
public Long apply(Long from) {
|
||||
try {
|
||||
Thread.sleep(LISTENER_DURATION);
|
||||
} catch (InterruptedException e) {
|
||||
propagate(e);
|
||||
}
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
}, listenerExecutor));
|
||||
checkTimeThresholds(expectedMin, expectedMax, expectedOverhead, start, responses);
|
||||
}
|
||||
|
||||
private Future<Long> simultateIO() {
|
||||
return ioFunctionExecutor.submit(new Callable<Long>() {
|
||||
|
||||
@Override
|
||||
public Long call() throws Exception {
|
||||
Thread.sleep(IO_DURATION);
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private static long getMaxIn(Map<String, Future<Long>> responses) {
|
||||
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
|
||||
|
||||
@Override
|
||||
public Long apply(Future<Long> from) {
|
||||
try {
|
||||
return from.get();
|
||||
} catch (InterruptedException e) {
|
||||
} catch (ExecutionException e) {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
});
|
||||
long time = Collections.max(Sets.newHashSet(collection));
|
||||
return time;
|
||||
}
|
||||
|
||||
private static long getMinIn(Map<String, Future<Long>> responses) {
|
||||
Iterable<Long> collection = Iterables.transform(responses.values(), new Function<Future<Long>, Long>() {
|
||||
|
||||
@Override
|
||||
public Long apply(Future<Long> from) {
|
||||
try {
|
||||
return from.get();
|
||||
} catch (InterruptedException e) {
|
||||
} catch (ExecutionException e) {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
});
|
||||
long time = Collections.min(Sets.newHashSet(collection));
|
||||
return time;
|
||||
}
|
||||
|
||||
private static void checkTimeThresholds(long expectedMin, long expectedMax, long expectedOverhead, long start,
|
||||
Map<String, Future<Long>> responses) {
|
||||
long time = getMaxIn(responses) - start;
|
||||
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
|
||||
expectedMax, time);
|
||||
|
||||
time = getMinIn(responses) - start;
|
||||
assert time >= expectedMin && time < expectedMin + expectedOverhead : String.format("expectedMin %d, min %d",
|
||||
expectedMin, time);
|
||||
|
||||
time = getMaxIn(responses) - start;
|
||||
assert time >= expectedMax && time < expectedMax + expectedOverhead : String.format("expectedMax %d, max %d",
|
||||
expectedMax, time);
|
||||
}
|
||||
}
|
|
@ -26,9 +26,7 @@ import static org.testng.Assert.assertEquals;
|
|||
import static org.testng.Assert.fail;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -38,6 +36,9 @@ import org.testng.annotations.Test;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
|
||||
/**
|
||||
* Tests behavior of FutureIterables
|
||||
|
@ -51,15 +52,12 @@ public class FutureIterablesTest {
|
|||
final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
try {
|
||||
transformParallel(ImmutableSet.of("hello", "goodbye"), new Function<String, Future<? extends String>>() {
|
||||
|
||||
@Override
|
||||
public Future<String> apply(String input) {
|
||||
transformParallel(ImmutableSet.of("hello", "goodbye"), new Function<String, ListenableFuture<? extends String>>() {
|
||||
public ListenableFuture<String> apply(String input) {
|
||||
counter.incrementAndGet();
|
||||
return immediateFailedFuture(new AuthorizationException());
|
||||
}
|
||||
|
||||
}, sameThreadExecutor(), null, Logger.CONSOLE, "");
|
||||
}, sameThreadExecutor(), null, Logger.NULL, "");
|
||||
fail("Expected AuthorizationException");
|
||||
} catch (AuthorizationException e) {
|
||||
assertEquals(counter.get(), 2);
|
||||
|
@ -71,14 +69,11 @@ public class FutureIterablesTest {
|
|||
final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
try {
|
||||
transformParallel(ImmutableSet.of("hello", "goodbye"), new Function<String, Future<? extends String>>() {
|
||||
|
||||
@Override
|
||||
public Future<String> apply(String input) {
|
||||
transformParallel(ImmutableSet.of("hello", "goodbye"), new Function<String, ListenableFuture<? extends String>>() {
|
||||
public ListenableFuture<String> apply(String input) {
|
||||
counter.incrementAndGet();
|
||||
return immediateFailedFuture(new RuntimeException());
|
||||
}
|
||||
|
||||
}, sameThreadExecutor(), null, Logger.CONSOLE, "");
|
||||
fail("Expected TransformParallelException");
|
||||
} catch (TransformParallelException e) {
|
||||
|
@ -90,10 +85,10 @@ public class FutureIterablesTest {
|
|||
|
||||
public void testAwaitCompletionTimeout() throws Exception {
|
||||
final long timeoutMs = 1000;
|
||||
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
Map<Void, Future<?>> responses = newHashMap();
|
||||
ListeningExecutorService userExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
|
||||
Map<Void, ListenableFuture<?>> responses = newHashMap();
|
||||
try {
|
||||
responses.put(null, executorService.submit(new Runnable() {
|
||||
responses.put(null, userExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
@ -103,9 +98,8 @@ public class FutureIterablesTest {
|
|||
}
|
||||
}
|
||||
}));
|
||||
Map<Void, Exception> errors = FutureIterables.awaitCompletion(responses, executorService, timeoutMs,
|
||||
Logger.CONSOLE,
|
||||
/* prefix= */"");
|
||||
Map<Void, Exception> errors = FutureIterables.awaitCompletion(responses, userExecutor, timeoutMs, Logger.NULL,
|
||||
/* prefix= */"");
|
||||
if (!errors.isEmpty()) {
|
||||
throw errors.values().iterator().next();
|
||||
}
|
||||
|
@ -113,7 +107,7 @@ public class FutureIterablesTest {
|
|||
} catch (TimeoutException te) {
|
||||
// expected
|
||||
} finally {
|
||||
executorService.shutdownNow();
|
||||
userExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,94 +0,0 @@
|
|||
/**
|
||||
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||
* contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. jclouds licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jclouds.concurrent;
|
||||
|
||||
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.fail;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.jclouds.concurrent.Futures.CallGetAndRunExecutionList;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.common.util.concurrent.ExecutionList;
|
||||
|
||||
/**
|
||||
* Tests behavior of Futures
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@Test(groups = "unit")
|
||||
public class FuturesTest {
|
||||
ExecutorService executorService = sameThreadExecutor();
|
||||
|
||||
@Test
|
||||
public void testCallGetAndRunRunnableRunsListOnRuntimeException() throws InterruptedException, ExecutionException {
|
||||
|
||||
Runnable runnable = createMock(Runnable.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Future<String> future = createMock(Future.class);
|
||||
runnable.run();
|
||||
expect(future.get()).andThrow(new RuntimeException());
|
||||
replay(runnable);
|
||||
replay(future);
|
||||
|
||||
ExecutionList executionList = new ExecutionList();
|
||||
executionList.add(runnable, executorService);
|
||||
|
||||
CallGetAndRunExecutionList<String> caller = new CallGetAndRunExecutionList<String>(future, executionList);
|
||||
caller.run();
|
||||
|
||||
verify(runnable);
|
||||
verify(future);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCallGetAndRunRunnableInterruptsAndThrowsIllegalStateExceptionOnInterruptedException()
|
||||
throws InterruptedException, ExecutionException {
|
||||
|
||||
Runnable runnable = createMock(Runnable.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
Future<String> future = createMock(Future.class);
|
||||
expect(future.get()).andThrow(new InterruptedException());
|
||||
replay(runnable);
|
||||
replay(future);
|
||||
|
||||
ExecutionList executionList = new ExecutionList();
|
||||
executionList.add(runnable, executorService);
|
||||
|
||||
CallGetAndRunExecutionList<String> caller = new CallGetAndRunExecutionList<String>(future, executionList);
|
||||
try {
|
||||
caller.run();
|
||||
fail("Expected IllegalStateException");
|
||||
} catch (IllegalStateException e) {
|
||||
assertEquals(e.getMessage(), "interrupted calling get() on [EasyMock for interface java.util.concurrent.Future], so could not run listeners");
|
||||
}
|
||||
|
||||
verify(runnable);
|
||||
verify(future);
|
||||
}
|
||||
|
||||
}
|
|
@ -18,29 +18,31 @@
|
|||
*/
|
||||
package org.jclouds.concurrent.config;
|
||||
|
||||
import static com.google.common.base.Throwables.getStackTraceAsString;
|
||||
import static com.google.inject.name.Names.named;
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.fail;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
import static org.testng.Assert.assertFalse;
|
||||
import static org.testng.Assert.assertNull;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.lifecycle.Closer;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.name.Names;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -49,12 +51,35 @@ import com.google.inject.name.Names;
|
|||
@Test
|
||||
public class ExecutorServiceModuleTest {
|
||||
|
||||
private Injector injector;
|
||||
|
||||
@BeforeMethod
|
||||
private void setupExecutorModule() {
|
||||
ExecutorServiceModule module = new ExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(named(PROPERTY_USER_THREADS)).to(1);
|
||||
super.configure();
|
||||
}
|
||||
};
|
||||
|
||||
injector = Guice.createInjector(module);
|
||||
assertNull(module.userExecutorFromConstructor);
|
||||
assertNull(module.ioExecutorFromConstructor);
|
||||
}
|
||||
|
||||
@AfterMethod
|
||||
private void close() throws IOException {
|
||||
injector.getInstance(Closer.class).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownOnClose() throws IOException {
|
||||
Injector i = Guice.createInjector();
|
||||
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
ExecutorService executor = createMock(ExecutorService.class);
|
||||
ListeningExecutorService executor = createMock(ListeningExecutorService.class);
|
||||
ExecutorServiceModule.shutdownOnClose(executor, closer);
|
||||
|
||||
expect(executor.shutdownNow()).andReturn(ImmutableList.<Runnable> of()).atLeastOnce();
|
||||
|
@ -68,192 +93,64 @@ public class ExecutorServiceModuleTest {
|
|||
@Test
|
||||
public void testShutdownOnCloseThroughModule() throws IOException {
|
||||
|
||||
ExecutorServiceModule module = new ExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1);
|
||||
super.configure();
|
||||
}
|
||||
};
|
||||
|
||||
Injector i = Guice.createInjector(module);
|
||||
assertEquals(module.userExecutorFromConstructor, null);
|
||||
assertEquals(module.ioExecutorFromConstructor, null);
|
||||
ListeningExecutorService user = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_USER_THREADS)));
|
||||
ListeningExecutorService io = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_IO_WORKER_THREADS)));
|
||||
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
assertFalse(user.isShutdown());
|
||||
assertFalse(io.isShutdown());
|
||||
|
||||
ExecutorService user = i
|
||||
.getInstance(Key.get(ExecutorService.class, Names.named(Constants.PROPERTY_USER_THREADS)));
|
||||
ExecutorService io = i.getInstance(Key.get(ExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_IO_WORKER_THREADS)));
|
||||
|
||||
assert !user.isShutdown();
|
||||
assert !io.isShutdown();
|
||||
|
||||
closer.close();
|
||||
|
||||
assert user.isShutdown();
|
||||
assert io.isShutdown();
|
||||
injector.getInstance(Closer.class).close();
|
||||
|
||||
assertTrue(user.isShutdown());
|
||||
assertTrue(io.isShutdown());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDescribedFutureToString() throws Exception {
|
||||
|
||||
ExecutorServiceModule module = new ExecutorServiceModule() {
|
||||
public void testExceptionInSubmitRunnableIncludesSubmissionTrace() throws Exception {
|
||||
ListeningExecutorService user = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_USER_THREADS)));
|
||||
ListeningExecutorService io = injector.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_IO_WORKER_THREADS)));
|
||||
|
||||
for (ListeningExecutorService exec : ImmutableList.of(user, io)) {
|
||||
String submission = null;
|
||||
try {
|
||||
// this is sensitive to formatting as we are looking for the stack traces to match. if you wrap the below
|
||||
// line again, you'll need to change incrementInitialElement to 3 line numbers instead of 2.
|
||||
submission = getStackTraceAsString(incrementInitialElement(new RuntimeException(), 2)).replaceFirst(".*\n",
|
||||
"");
|
||||
exec.submit(runnableThrowsRTE()).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertTraceHasSubmission(getStackTraceAsString(e), submission);
|
||||
assertTraceHasSubmission(getStackTraceAsString(e.getCause()), submission);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void assertTraceHasSubmission(String trace, String expected) {
|
||||
assertTrue(trace.indexOf(WithSubmissionTrace.class.getName()) == -1, trace);
|
||||
assertTrue(trace.indexOf(expected) != -1, trace + " " + expected);
|
||||
}
|
||||
|
||||
static <E extends Exception> E incrementInitialElement(E ex, int lines) {
|
||||
StackTraceElement[] trace = ex.getStackTrace();
|
||||
StackTraceElement initialElement = trace[0];
|
||||
trace[0] = new StackTraceElement(initialElement.getClassName(), initialElement.getMethodName(),
|
||||
initialElement.getFileName(), initialElement.getLineNumber() + lines);
|
||||
ex.setStackTrace(trace);
|
||||
return ex;
|
||||
}
|
||||
|
||||
static Runnable runnableThrowsRTE() {
|
||||
return new Runnable() {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1);
|
||||
super.configure();
|
||||
public void run() {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
Injector i = Guice.createInjector(module);
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
|
||||
ExecutorService user = i
|
||||
.getInstance(Key.get(ExecutorService.class, Names.named(Constants.PROPERTY_USER_THREADS)));
|
||||
ExecutorService io = i.getInstance(Key.get(ExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_IO_WORKER_THREADS)));
|
||||
|
||||
ConfigurableRunner t1 = new ConfigurableRunner();
|
||||
t1.result = "okay";
|
||||
|
||||
Future<Object> euc = performSubmissionInSeparateMethod1(user, t1);
|
||||
assert euc.toString().indexOf("ConfigurableRunner") >= 0;
|
||||
assert euc.get().equals("okay");
|
||||
|
||||
Future<Object> eic = performSubmissionInSeparateMethod1(io, t1);
|
||||
assert eic.toString().indexOf("ConfigurableRunner") >= 0;
|
||||
assert eic.get().equals("okay");
|
||||
|
||||
|
||||
closer.close();
|
||||
}
|
||||
|
||||
/*
|
||||
* The decoration makes sure that the stack trace looks like the following.
|
||||
* Note the last three included trace elements: this details where the task was submitted _from_
|
||||
* (technically it is a different stack frame, since it is across threads; but logically it is the same)
|
||||
*
|
||||
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: foo
|
||||
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
|
||||
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
|
||||
at org.jclouds.concurrent.config.ExecutorServiceModule$DescribedFuture.get(ExecutorServiceModule.java:232)
|
||||
at org.jclouds.concurrent.config.ExecutorServiceModuleTest.checkFutureGetFailsWith(ExecutorServiceModuleTest.java:186)
|
||||
at org.jclouds.concurrent.config.ExecutorServiceModuleTest.testDescribedFutureExceptionIncludesSubmissionTrace(ExecutorServiceModuleTest.java:171)
|
||||
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
|
||||
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
|
||||
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
|
||||
at java.lang.reflect.Method.invoke(Method.java:597)
|
||||
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:80)
|
||||
at org.testng.internal.Invoker.invokeMethod(Invoker.java:691)
|
||||
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:883)
|
||||
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1208)
|
||||
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:127)
|
||||
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:111)
|
||||
at org.testng.TestRunner.privateRun(TestRunner.java:753)
|
||||
at org.testng.TestRunner.run(TestRunner.java:613)
|
||||
at org.testng.SuiteRunner.runTest(SuiteRunner.java:335)
|
||||
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:330)
|
||||
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:292)
|
||||
at org.testng.SuiteRunner.run(SuiteRunner.java:241)
|
||||
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
|
||||
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
|
||||
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1169)
|
||||
at org.testng.TestNG.runSuitesLocally(TestNG.java:1094)
|
||||
at org.testng.TestNG.run(TestNG.java:1006)
|
||||
at org.testng.remote.RemoteTestNG.run(RemoteTestNG.java:107)
|
||||
at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:199)
|
||||
at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:170)
|
||||
Caused by: java.lang.IllegalStateException: foo
|
||||
at org.jclouds.concurrent.config.ExecutorServiceModuleTest$ConfigurableRunner.call(ExecutorServiceModuleTest.java:206)
|
||||
at org.jclouds.concurrent.config.ExecutorServiceModuleTest$ConfigurableRunner.run(ExecutorServiceModuleTest.java:203)
|
||||
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
|
||||
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
|
||||
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
|
||||
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
|
||||
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
|
||||
at java.lang.Thread.run(Thread.java:637)
|
||||
at org.jclouds.concurrent.config.ExecutorServiceModule$DescribingExecutorService.submit(ExecutorServiceModule.java:188)
|
||||
at org.jclouds.concurrent.config.ExecutorServiceModuleTest.performSubmissionInSeparateMethod2(ExecutorServiceModuleTest.java:181)
|
||||
at org.jclouds.concurrent.config.ExecutorServiceModuleTest.testDescribedFutureExceptionIncludesSubmissionTrace(ExecutorServiceModuleTest.java:170)
|
||||
... 24 more
|
||||
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testDescribedFutureExceptionIncludesSubmissionTrace() throws Exception {
|
||||
|
||||
ExecutorServiceModule module = new ExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1);
|
||||
super.configure();
|
||||
}
|
||||
};
|
||||
|
||||
Injector i = Guice.createInjector(module);
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
|
||||
ExecutorService user = i
|
||||
.getInstance(Key.get(ExecutorService.class, Names.named(Constants.PROPERTY_USER_THREADS)));
|
||||
ExecutorService io = i.getInstance(Key.get(ExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_IO_WORKER_THREADS)));
|
||||
|
||||
ConfigurableRunner t1 = new ConfigurableRunner();
|
||||
t1.failMessage = "foo";
|
||||
t1.result = "shouldn't happen";
|
||||
|
||||
Future<Object> euc = performSubmissionInSeparateMethod1(user, t1);
|
||||
checkFutureGetFailsWith(euc, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod1");
|
||||
|
||||
Future<Object> eur = performSubmissionInSeparateMethod2(user, t1);
|
||||
checkFutureGetFailsWith(eur, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod2");
|
||||
|
||||
Future<Object> eic = performSubmissionInSeparateMethod1(io, t1);
|
||||
checkFutureGetFailsWith(eic, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod1");
|
||||
|
||||
Future<Object> eir = performSubmissionInSeparateMethod2(io, t1);
|
||||
checkFutureGetFailsWith(eir, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod2");
|
||||
|
||||
closer.close();
|
||||
}
|
||||
|
||||
static Future<Object> performSubmissionInSeparateMethod1(ExecutorService user, ConfigurableRunner t1) {
|
||||
return user.submit((Callable<Object>)t1);
|
||||
}
|
||||
|
||||
static Future<Object> performSubmissionInSeparateMethod2(ExecutorService io, ConfigurableRunner t1) {
|
||||
return io.submit((Runnable)t1, (Object)"shouldn't happen");
|
||||
}
|
||||
|
||||
static void checkFutureGetFailsWith(Future<?> task, String ...requiredPhrases) throws Exception {
|
||||
try {
|
||||
task.get();
|
||||
fail("task should have failed");
|
||||
} catch (ExecutionException e) {
|
||||
String trace = Throwables.getStackTraceAsString(e);
|
||||
for (String requiredPhrase : requiredPhrases) {
|
||||
assert trace.indexOf(requiredPhrase) >= 0 : "stack trace should have contained '"+requiredPhrase+"'";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class ConfigurableRunner implements Runnable, Callable<Object> {
|
||||
Object result;
|
||||
String failMessage;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
call();
|
||||
}
|
||||
public Object call() {
|
||||
if (failMessage!=null) throw new IllegalStateException(failMessage);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,46 +17,66 @@
|
|||
* under the License.
|
||||
*/
|
||||
package org.jclouds.concurrent.config;
|
||||
|
||||
import static com.google.common.base.Throwables.getStackTraceAsString;
|
||||
import static com.google.inject.name.Names.named;
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.jclouds.concurrent.config.ExecutorServiceModuleTest.checkFutureGetFailsWith;
|
||||
import static org.jclouds.Constants.PROPERTY_SCHEDULER_THREADS;
|
||||
import static org.jclouds.concurrent.config.ExecutorServiceModuleTest.assertTraceHasSubmission;
|
||||
import static org.jclouds.concurrent.config.ExecutorServiceModuleTest.incrementInitialElement;
|
||||
import static org.testng.Assert.assertFalse;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.easymock.EasyMock;
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.concurrent.config.ExecutorServiceModuleTest.ConfigurableRunner;
|
||||
import org.jclouds.lifecycle.Closer;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.name.Names;
|
||||
|
||||
/**
|
||||
* Unit tests for the {@link ScheduledExecutorServiceModule} class.
|
||||
*
|
||||
* @author Ignasi Barrera
|
||||
*
|
||||
* @see ExecutorServiceModuleTest
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@Test(groups = "unit")
|
||||
@Test
|
||||
public class ScheduledExecutorServiceModuleTest {
|
||||
|
||||
@Test(groups = "unit")
|
||||
private Injector injector;
|
||||
|
||||
@BeforeMethod
|
||||
private void setupExecutorModule() {
|
||||
ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(named(PROPERTY_SCHEDULER_THREADS)).to(1);
|
||||
super.configure();
|
||||
}
|
||||
};
|
||||
|
||||
injector = Guice.createInjector(module);
|
||||
}
|
||||
|
||||
@AfterMethod
|
||||
private void close() throws IOException {
|
||||
injector.getInstance(Closer.class).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdownOnClose() throws IOException {
|
||||
Injector i = Guice.createInjector();
|
||||
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
|
||||
ListeningScheduledExecutorService executor = createMock(ListeningScheduledExecutorService.class);
|
||||
ExecutorServiceModule.shutdownOnClose(executor, closer);
|
||||
|
||||
expect(executor.shutdownNow()).andReturn(ImmutableList.<Runnable> of()).atLeastOnce();
|
||||
|
@ -67,106 +87,61 @@ public class ScheduledExecutorServiceModuleTest {
|
|||
verify(executor);
|
||||
}
|
||||
|
||||
@Test(groups = "unit")
|
||||
@Test
|
||||
public void testShutdownOnCloseThroughModule() throws IOException {
|
||||
|
||||
ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() {
|
||||
ListeningScheduledExecutorService sched = injector.getInstance(Key.get(ListeningScheduledExecutorService.class,
|
||||
named(PROPERTY_SCHEDULER_THREADS)));
|
||||
|
||||
assertFalse(sched.isShutdown());
|
||||
|
||||
injector.getInstance(Closer.class).close();
|
||||
|
||||
assertTrue(sched.isShutdown());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionInSubmitRunnableIncludesSubmissionTrace() throws Exception {
|
||||
ListeningScheduledExecutorService sched = injector.getInstance(Key.get(ListeningScheduledExecutorService.class,
|
||||
named(PROPERTY_SCHEDULER_THREADS)));
|
||||
String submission = null;
|
||||
try {
|
||||
// this is sensitive to formatting as we are looking for the stack traces to match. if you wrap the below
|
||||
// line again, you'll need to change incrementInitialElement to 3 line numbers instead of 2.
|
||||
submission = getStackTraceAsString(incrementInitialElement(new RuntimeException(), 2))
|
||||
.replaceFirst(".*\n", "");
|
||||
sched.submit(runnableThrowsRTE()).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertTraceHasSubmission(getStackTraceAsString(e), submission);
|
||||
assertTraceHasSubmission(getStackTraceAsString(e.getCause()), submission);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionInScheduleWithFixedDelayRunnableIncludesSubmissionTrace() throws Exception {
|
||||
ListeningScheduledExecutorService sched = injector.getInstance(Key.get(ListeningScheduledExecutorService.class,
|
||||
named(PROPERTY_SCHEDULER_THREADS)));
|
||||
String submission = null;
|
||||
try {
|
||||
// this is sensitive to formatting as we are looking for the stack traces to match. if you wrap the below
|
||||
// line again, you'll need to change incrementInitialElement to 3 line numbers instead of 2.
|
||||
submission = getStackTraceAsString(incrementInitialElement(new RuntimeException(), 2))
|
||||
.replaceFirst(".*\n", "");
|
||||
sched.scheduleWithFixedDelay(runnableThrowsRTE(), 0, 1, TimeUnit.MICROSECONDS).get();
|
||||
} catch (ExecutionException e) {
|
||||
assertTraceHasSubmission(getStackTraceAsString(e), submission);
|
||||
assertTraceHasSubmission(getStackTraceAsString(e.getCause()), submission);
|
||||
}
|
||||
}
|
||||
|
||||
static Runnable runnableThrowsRTE() {
|
||||
return new Runnable() {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1);
|
||||
super.configure();
|
||||
public void run() {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
Injector i = Guice.createInjector(module);
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
|
||||
ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_SCHEDULER_THREADS)));
|
||||
|
||||
assert !sched.isShutdown();
|
||||
|
||||
closer.close();
|
||||
|
||||
assert sched.isShutdown();
|
||||
}
|
||||
|
||||
@Test(groups = "unit")
|
||||
public void testDescribedFutureToString() throws Exception {
|
||||
|
||||
ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1);
|
||||
super.configure();
|
||||
}
|
||||
};
|
||||
|
||||
Injector i = Guice.createInjector(module);
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
|
||||
ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_SCHEDULER_THREADS)));
|
||||
|
||||
ConfigurableRunner t1 = new ConfigurableRunner();
|
||||
t1.result = "okay";
|
||||
|
||||
ScheduledFuture<Object> esc = performScheduleInSeparateMethod1(sched, t1);
|
||||
assert esc.toString().indexOf("ConfigurableRunner") >= 0;
|
||||
assert esc.get().equals("okay");
|
||||
|
||||
closer.close();
|
||||
}
|
||||
|
||||
@Test(groups = "unit")
|
||||
public void testDescribedFutureExceptionIncludesSubmissionTrace() throws Exception {
|
||||
|
||||
ScheduledExecutorServiceModule module = new ScheduledExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_SCHEDULER_THREADS)).to(1);
|
||||
super.configure();
|
||||
}
|
||||
};
|
||||
|
||||
Injector i = Guice.createInjector(module);
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
|
||||
ScheduledExecutorService sched = i.getInstance(Key.get(ScheduledExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_SCHEDULER_THREADS)));
|
||||
|
||||
ConfigurableRunner t1 = new ConfigurableRunner();
|
||||
t1.failMessage = "foo";
|
||||
t1.result = "shouldn't happen";
|
||||
|
||||
ScheduledFuture<Object> esc = performScheduleInSeparateMethod1(sched, t1);
|
||||
checkFutureGetFailsWith(esc, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod1");
|
||||
|
||||
ScheduledFuture<?> esr = performScheduleInSeparateMethod2(sched, t1);
|
||||
checkFutureGetFailsWith(esr, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod2");
|
||||
|
||||
ScheduledFuture<?> esfr = performScheduleInSeparateMethod3(sched, t1);
|
||||
checkFutureGetFailsWith(esfr, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod3");
|
||||
|
||||
ScheduledFuture<?> esfd = performScheduleInSeparateMethod4(sched, t1);
|
||||
checkFutureGetFailsWith(esfd, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performScheduleInSeparateMethod4");
|
||||
|
||||
closer.close();
|
||||
}
|
||||
|
||||
static ScheduledFuture<Object> performScheduleInSeparateMethod1(ScheduledExecutorService sched, ConfigurableRunner t1) {
|
||||
return sched.schedule((Callable<Object>)t1, 0, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
static ScheduledFuture<?> performScheduleInSeparateMethod2(ScheduledExecutorService sched, ConfigurableRunner t1) {
|
||||
return sched.schedule((Runnable)t1, 0, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
static ScheduledFuture<?> performScheduleInSeparateMethod3(ScheduledExecutorService sched, ConfigurableRunner t1) {
|
||||
return sched.scheduleAtFixedRate((Runnable)t1, 0, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
static ScheduledFuture<?> performScheduleInSeparateMethod4(ScheduledExecutorService sched, ConfigurableRunner t1) {
|
||||
return sched.scheduleWithFixedDelay((Runnable)t1, 0, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ public class EventBusModuleTest {
|
|||
|
||||
@BeforeClass
|
||||
public void setup() {
|
||||
ExecutorServiceModule executorServiceModule = new ExecutorServiceModule() {
|
||||
ExecutorServiceModule userExecutorModule = new ExecutorServiceModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
|
@ -55,7 +55,7 @@ public class EventBusModuleTest {
|
|||
}
|
||||
};
|
||||
EventBusModule eventBusModule = new EventBusModule();
|
||||
injector = Guice.createInjector(executorServiceModule, eventBusModule);
|
||||
injector = Guice.createInjector(userExecutorModule, eventBusModule);
|
||||
}
|
||||
|
||||
public void testAsyncExecutorIsProvided() {
|
||||
|
|
|
@ -24,33 +24,17 @@ import static org.testng.Assert.assertTrue;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSession;
|
||||
|
||||
import org.jclouds.date.internal.DateServiceDateCodecFactory;
|
||||
import org.jclouds.date.internal.SimpleDateFormatDateService;
|
||||
import org.jclouds.http.BaseJettyTest;
|
||||
import org.jclouds.http.HttpCommand;
|
||||
import org.jclouds.http.HttpCommandExecutorService;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
import org.jclouds.http.HttpUtils;
|
||||
import org.jclouds.http.IntegrationTestAsyncClient;
|
||||
import org.jclouds.http.internal.HttpWire;
|
||||
import org.jclouds.http.internal.JavaUrlHttpCommandExecutorService;
|
||||
import org.jclouds.io.ContentMetadataCodec;
|
||||
import org.jclouds.io.ContentMetadataCodec.DefaultContentMetadataCodec;
|
||||
import org.jclouds.io.Payloads;
|
||||
import com.google.common.reflect.Invokable;
|
||||
import org.jclouds.rest.internal.RestAnnotationProcessor;
|
||||
import org.testng.annotations.BeforeTest;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.reflect.Invokable;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.TypeLiteral;
|
||||
|
||||
|
@ -96,35 +80,6 @@ public class BackoffLimitedRetryHandlerTest {
|
|||
|
||||
}
|
||||
|
||||
HttpCommandExecutorService http;
|
||||
|
||||
@BeforeTest
|
||||
void setupExecutorService() throws Exception {
|
||||
ExecutorService execService = Executors.newCachedThreadPool();
|
||||
BackoffLimitedRetryHandler backoff = new BackoffLimitedRetryHandler();
|
||||
HttpUtils utils = new HttpUtils(0, 500, 1, 1);
|
||||
ContentMetadataCodec contentMetadataCodec = new DefaultContentMetadataCodec(new DateServiceDateCodecFactory(
|
||||
new SimpleDateFormatDateService()));
|
||||
RedirectionRetryHandler retry = new RedirectionRetryHandler(backoff);
|
||||
http = new JavaUrlHttpCommandExecutorService(utils,
|
||||
contentMetadataCodec, execService,
|
||||
new DelegatingRetryHandler(backoff, retry), new BackoffLimitedRetryHandler(),
|
||||
new DelegatingErrorHandler(), new HttpWire(), new HostnameVerifier() {
|
||||
|
||||
@Override
|
||||
public boolean verify(String hostname, SSLSession session) {
|
||||
return false;
|
||||
}
|
||||
}, new Supplier<SSLContext>() {
|
||||
|
||||
@Override
|
||||
public SSLContext get() {
|
||||
return null;
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void testClosesInputStream() throws InterruptedException, IOException, SecurityException, NoSuchMethodException {
|
||||
HttpCommand command = createCommand();
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.jclouds.http.internal;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
|
@ -42,6 +40,8 @@ import org.jclouds.rest.internal.GeneratedHttpRequest;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.reflect.Invokable;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.TypeLiteral;
|
||||
|
@ -88,18 +88,18 @@ public class TrackingJavaUrlHttpCommandExecutorService extends JavaUrlHttpComman
|
|||
|
||||
@Inject
|
||||
public TrackingJavaUrlHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioWorkerExecutor,
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire, @Named("untrusted") HostnameVerifier verifier,
|
||||
@Named("untrusted") Supplier<SSLContext> untrustedSSLContextProvider, List<HttpCommand> commandsInvoked)
|
||||
throws SecurityException, NoSuchFieldException {
|
||||
super(utils, contentMetadataCodec, ioWorkerExecutor, retryHandler, ioRetryHandler, errorHandler, wire, verifier,
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire, verifier,
|
||||
untrustedSSLContextProvider);
|
||||
this.commandsInvoked = commandsInvoked;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
commandsInvoked.add(command);
|
||||
return super.submit(command);
|
||||
}
|
||||
|
|
|
@ -18,24 +18,24 @@
|
|||
*/
|
||||
package org.jclouds.lifecycle.config;
|
||||
|
||||
import static com.google.inject.name.Names.named;
|
||||
import static org.jclouds.Constants.PROPERTY_IO_WORKER_THREADS;
|
||||
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.inject.Named;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.concurrent.config.ExecutorServiceModule;
|
||||
import org.jclouds.lifecycle.Closer;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.common.util.concurrent.ExecutionList;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.name.Names;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -47,35 +47,17 @@ public class LifeCycleModuleTest {
|
|||
@Test
|
||||
void testBindsExecutor() {
|
||||
Injector i = createInjector();
|
||||
assert i.getInstance(Key.get(ExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_USER_THREADS))) != null;
|
||||
assert i.getInstance(Key.get(ExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_IO_WORKER_THREADS))) != null;
|
||||
assert i.getInstance(Key.get(ListeningExecutorService.class, named(PROPERTY_USER_THREADS))) != null;
|
||||
assert i.getInstance(Key.get(ListeningExecutorService.class, named(PROPERTY_IO_WORKER_THREADS))) != null;
|
||||
}
|
||||
|
||||
private Injector createInjector() {
|
||||
Injector i = Guice.createInjector(new LifeCycleModule() {
|
||||
@SuppressWarnings("unused")
|
||||
@Provides
|
||||
@Named(Constants.PROPERTY_USER_THREADS)
|
||||
int p() {
|
||||
return 1;
|
||||
Injector i = Guice.createInjector(new AbstractModule() {
|
||||
protected void configure() {
|
||||
bindConstant().annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).to(1);
|
||||
bindConstant().annotatedWith(named(PROPERTY_USER_THREADS)).to(1);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Provides
|
||||
@Named(Constants.PROPERTY_MAX_CONNECTIONS_PER_CONTEXT)
|
||||
int p2() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Provides
|
||||
@Named(Constants.PROPERTY_IO_WORKER_THREADS)
|
||||
int p3() {
|
||||
return 1;
|
||||
}
|
||||
}, new ExecutorServiceModule());
|
||||
}, new LifeCycleModule(), new ExecutorServiceModule());
|
||||
// TODO: currently have to manually invoke the execution list, as otherwise it may occur
|
||||
// before everything is wired up
|
||||
i.getInstance(ExecutionList.class).execute();
|
||||
|
@ -91,8 +73,8 @@ public class LifeCycleModuleTest {
|
|||
@Test
|
||||
void testCloserClosesExecutor() throws IOException {
|
||||
Injector i = createInjector();
|
||||
ExecutorService executor = i.getInstance(Key.get(ExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_USER_THREADS)));
|
||||
ListeningExecutorService executor = i.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_USER_THREADS)));
|
||||
assert !executor.isShutdown();
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
closer.close();
|
||||
|
@ -102,16 +84,16 @@ public class LifeCycleModuleTest {
|
|||
@Test
|
||||
void testCloserPreDestroyOrder() throws IOException {
|
||||
Injector i = createInjector();
|
||||
ExecutorService userThreads = i.getInstance(Key.get(ExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_USER_THREADS)));
|
||||
assert !userThreads.isShutdown();
|
||||
ExecutorService ioThreads = i.getInstance(Key.get(ExecutorService.class, Names
|
||||
.named(Constants.PROPERTY_IO_WORKER_THREADS)));
|
||||
assert !ioThreads.isShutdown();
|
||||
ListeningExecutorService userExecutor = i.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_USER_THREADS)));
|
||||
assert !userExecutor.isShutdown();
|
||||
ListeningExecutorService ioExecutor = i.getInstance(Key.get(ListeningExecutorService.class,
|
||||
named(PROPERTY_IO_WORKER_THREADS)));
|
||||
assert !ioExecutor.isShutdown();
|
||||
Closer closer = i.getInstance(Closer.class);
|
||||
closer.close();
|
||||
assert userThreads.isShutdown();
|
||||
assert ioThreads.isShutdown();
|
||||
assert userExecutor.isShutdown();
|
||||
assert ioExecutor.isShutdown();
|
||||
}
|
||||
|
||||
static class PostConstructable {
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.util.Map.Entry;
|
|||
import java.util.NoSuchElementException;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
|
@ -84,6 +83,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonParser;
|
||||
import com.google.inject.AbstractModule;
|
||||
|
@ -195,7 +195,7 @@ public abstract class BaseRestApiExpectTest<S> {
|
|||
@Inject
|
||||
public ExpectHttpCommandExecutorService(Function<HttpRequest, HttpResponse> fn, HttpUtils utils,
|
||||
ContentMetadataCodec contentMetadataCodec,
|
||||
@Named(PROPERTY_IO_WORKER_THREADS) ExecutorService ioExecutor,
|
||||
@Named(PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
|
||||
IOExceptionRetryHandler ioRetryHandler, DelegatingRetryHandler retryHandler,
|
||||
DelegatingErrorHandler errorHandler, HttpWire wire) {
|
||||
super(utils, contentMetadataCodec, ioExecutor, retryHandler, ioRetryHandler, errorHandler, wire);
|
||||
|
@ -230,8 +230,8 @@ public abstract class BaseRestApiExpectTest<S> {
|
|||
|
||||
@Override
|
||||
public void configure() {
|
||||
bind(ExecutorService.class).annotatedWith(named(PROPERTY_USER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(ExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_USER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(new TypeLiteral<Function<HttpRequest, HttpResponse>>() {
|
||||
}).toInstance(fn);
|
||||
bind(HttpCommandExecutorService.class).to(ExpectHttpCommandExecutorService.class);
|
||||
|
|
|
@ -32,7 +32,6 @@ import static org.testng.Assert.assertNull;
|
|||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.jclouds.concurrent.config.ConfiguresExecutorService;
|
||||
import org.jclouds.fallbacks.MapHttp4xxCodesToExceptions;
|
||||
|
@ -53,6 +52,7 @@ import com.google.common.collect.TreeMultimap;
|
|||
import com.google.common.reflect.Invokable;
|
||||
import com.google.common.reflect.TypeParameter;
|
||||
import com.google.common.reflect.TypeToken;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
|
@ -82,8 +82,8 @@ public abstract class BaseRestApiTest {
|
|||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(ExecutorService.class).annotatedWith(named(PROPERTY_USER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(ExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_USER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
|
||||
bind(HttpCommandExecutorService.class).toInstance(mock);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,6 @@ import java.util.Map;
|
|||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Qualifier;
|
||||
|
@ -94,7 +93,6 @@ import org.jclouds.logging.config.NullLoggingModule;
|
|||
import org.jclouds.providers.AnonymousProviderMetadata;
|
||||
import org.jclouds.reflect.Invocation;
|
||||
import org.jclouds.reflect.InvocationSuccess;
|
||||
import com.google.common.reflect.Invokable;
|
||||
import org.jclouds.rest.AuthorizationException;
|
||||
import org.jclouds.rest.ConfiguresRestClient;
|
||||
import org.jclouds.rest.InvocationContext;
|
||||
|
@ -144,6 +142,7 @@ import com.google.common.collect.ImmutableSortedSet;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.common.reflect.Invokable;
|
||||
import com.google.common.reflect.TypeToken;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
@ -255,7 +254,7 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(),
|
||||
"GET http://localhost:9999/client/1/foo HTTP/1.1");
|
||||
return Futures.immediateFuture(HttpResponse.builder().build());
|
||||
|
@ -280,7 +279,7 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
int callCounter = 0;
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
if (callCounter == 1)
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(),
|
||||
"GET http://localhost:1111/client/1/bar/2 HTTP/1.1");
|
||||
|
@ -310,7 +309,7 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
|
||||
return Futures.immediateFuture(HttpResponse.builder().build());
|
||||
}
|
||||
|
@ -335,7 +334,7 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(),
|
||||
"GET http://howdyboys/testing/testing/thepathparam/client/1/foo HTTP/1.1");
|
||||
return Futures.immediateFuture(HttpResponse.builder().build());
|
||||
|
@ -361,7 +360,7 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
|
||||
return Futures.immediateFuture(HttpResponse.builder().build());
|
||||
}
|
||||
|
@ -386,7 +385,7 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
|
||||
return Futures.immediateFuture(HttpResponse.builder().build());
|
||||
}
|
||||
|
@ -425,7 +424,7 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
|
|||
Injector child = injectorForCaller(new HttpCommandExecutorService() {
|
||||
|
||||
@Override
|
||||
public Future<HttpResponse> submit(HttpCommand command) {
|
||||
public ListenableFuture<HttpResponse> submit(HttpCommand command) {
|
||||
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
|
||||
return Futures.immediateFuture(HttpResponse.builder().build());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue