Issue 471: set toString on futures

This commit is contained in:
Adrian Cole 2011-04-01 13:58:32 -07:00
parent ddbd6fc1d8
commit eb4a8393fb
2 changed files with 174 additions and 14 deletions

View File

@ -62,8 +62,10 @@ public class Futures {
// This thread was interrupted. This should never happen, so we // This thread was interrupted. This should never happen, so we
// throw an IllegalStateException. // throw an IllegalStateException.
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
// TODO we cannot inspect the executionList at the moment to make a reasonable
// toString()
throw new IllegalStateException(String.format( throw new IllegalStateException(String.format(
"interrupted calling get() on [%s], so could not run listeners", delegate), e); "interrupted calling get() on [%s], so could not run listeners", delegate), e);
} catch (Throwable e) { } catch (Throwable e) {
// ExecutionException / CancellationException / RuntimeException // ExecutionException / CancellationException / RuntimeException
// The task is done, run the listeners. // The task is done, run the listeners.
@ -155,27 +157,27 @@ public class Futures {
} }
public static class LazyListenableFutureFunctionAdapter<I, O> extends ForwardingObject implements public static class LazyListenableFutureFunctionAdapter<I, O> extends ForwardingObject implements
ListenableFuture<O> { ListenableFuture<O> {
private final FutureListener<I> futureListener; private final FutureListener<I> futureListener;
private final Function<? super I, ? extends O> function; private final Function<? super I, ? extends O> function;
static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(Future<I> future, static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(Future<I> future,
Function<? super I, ? extends O> function, ExecutorService executor) { Function<? super I, ? extends O> function, ExecutorService executor) {
return new LazyListenableFutureFunctionAdapter<I, O>(future, function, executor); return new LazyListenableFutureFunctionAdapter<I, O>(future, function, executor);
} }
static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(FutureListener<I> futureListener, static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(FutureListener<I> futureListener,
Function<? super I, ? extends O> function) { Function<? super I, ? extends O> function) {
return new LazyListenableFutureFunctionAdapter<I, O>(futureListener, function); return new LazyListenableFutureFunctionAdapter<I, O>(futureListener, function);
} }
private LazyListenableFutureFunctionAdapter(Future<I> future, Function<? super I, ? extends O> function, private LazyListenableFutureFunctionAdapter(Future<I> future, Function<? super I, ? extends O> function,
ExecutorService executor) { ExecutorService executor) {
this(FutureListener.create(future, executor), function); this(FutureListener.create(future, executor), function);
} }
private LazyListenableFutureFunctionAdapter(FutureListener<I> futureListener, private LazyListenableFutureFunctionAdapter(FutureListener<I> futureListener,
Function<? super I, ? extends O> function) { Function<? super I, ? extends O> function) {
this.futureListener = checkNotNull(futureListener, "futureListener"); this.futureListener = checkNotNull(futureListener, "futureListener");
this.function = checkNotNull(function, "function"); this.function = checkNotNull(function, "function");
} }
@ -255,19 +257,19 @@ public class Futures {
* chaining, so that we don't invoke get() early. * chaining, so that we don't invoke get() early.
*/ */
public static <I, O> ListenableFuture<O> compose(Future<I> future, final Function<? super I, ? extends O> function, public static <I, O> ListenableFuture<O> compose(Future<I> future, final Function<? super I, ? extends O> function,
ExecutorService executorService) { ExecutorService executorService) {
if (future instanceof Futures.ListenableFutureAdapter<?>) { if (future instanceof Futures.ListenableFutureAdapter<?>) {
Futures.ListenableFutureAdapter<I> lf = (ListenableFutureAdapter<I>) future; Futures.ListenableFutureAdapter<I> lf = (ListenableFutureAdapter<I>) future;
if (lf.futureListener.adapterExecutor.getClass().isAnnotationPresent(SingleThreaded.class)) if (lf.futureListener.adapterExecutor.getClass().isAnnotationPresent(SingleThreaded.class))
return Futures.LazyListenableFutureFunctionAdapter.create( return Futures.LazyListenableFutureFunctionAdapter.create(
((org.jclouds.concurrent.Futures.ListenableFutureAdapter<I>) future).futureListener, function); ((org.jclouds.concurrent.Futures.ListenableFutureAdapter<I>) future).futureListener, function);
else else
return com.google.common.util.concurrent.Futures.compose(lf, function, executorService); return com.google.common.util.concurrent.Futures.compose(lf, function, executorService);
} else if (executorService.getClass().isAnnotationPresent(SingleThreaded.class)) { } else if (executorService.getClass().isAnnotationPresent(SingleThreaded.class)) {
return Futures.LazyListenableFutureFunctionAdapter.create(future, function, executorService); return Futures.LazyListenableFutureFunctionAdapter.create(future, function, executorService);
} else { } else {
return com.google.common.util.concurrent.Futures.compose(Futures.makeListenable(future, executorService), return com.google.common.util.concurrent.Futures.compose(Futures.makeListenable(future, executorService),
function, executorService); function, executorService);
} }
} }

View File

@ -19,13 +19,20 @@
package org.jclouds.concurrent.config; package org.jclouds.concurrent.config;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool; import static org.jclouds.concurrent.DynamicExecutors.newScalingThreadPool;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.inject.Inject; import javax.inject.Inject;
@ -80,11 +87,18 @@ public class ExecutorServiceModule extends AbstractModule {
@Inject @Inject
public ExecutorServiceModule(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads, public ExecutorServiceModule(@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) { @Named(Constants.PROPERTY_IO_WORKER_THREADS) ExecutorService ioThreads) {
this.userExecutorFromConstructor = checkNotGuavaSameThreadExecutor(userThreads); this.userExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(userThreads));
this.ioExecutorFromConstructor = checkNotGuavaSameThreadExecutor(ioThreads); this.ioExecutorFromConstructor = addToStringOnSubmit(checkNotGuavaSameThreadExecutor(ioThreads));
} }
private ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) { static ExecutorService addToStringOnSubmit(ExecutorService executor) {
if (executor != null) {
return new AddToStringOnSubmitExecutorService(executor);
}
return executor;
}
static ExecutorService checkNotGuavaSameThreadExecutor(ExecutorService executor) {
// we detect behavior based on the class // we detect behavior based on the class
if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class)) if (executor != null && !(executor.getClass().isAnnotationPresent(SingleThreaded.class))
&& executor.getClass().getSimpleName().indexOf("SameThread") != -1) { && executor.getClass().getSimpleName().indexOf("SameThread") != -1) {
@ -104,13 +118,157 @@ public class ExecutorServiceModule extends AbstractModule {
protected void configure() { protected void configure() {
} }
static class AddToStringOnSubmitExecutorService implements ExecutorService {
private final ExecutorService delegate;
public AddToStringOnSubmitExecutorService(ExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return new AddToStringFuture<T>(delegate.submit(task), task.toString());
}
@SuppressWarnings("unchecked")
@Override
public Future<?> submit(Runnable task) {
return new AddToStringFuture(delegate.submit(task), task.toString());
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return new AddToStringFuture<T>(delegate.submit(task, result), task.toString());
}
@Override
public void execute(Runnable arg0) {
delegate.execute(arg0);
}
@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public String toString() {
return delegate.toString();
}
}
static class AddToStringFuture<T> implements Future<T> {
private final Future<T> delegate;
private final String toString;
public AddToStringFuture(Future<T> delegate, String toString) {
this.delegate = delegate;
this.toString = toString;
}
@Override
public boolean cancel(boolean arg0) {
return delegate.cancel(arg0);
}
@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(arg0, arg1);
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public String toString() {
return toString;
}
}
@Provides @Provides
@Singleton @Singleton
@Named(Constants.PROPERTY_USER_THREADS) @Named(Constants.PROPERTY_USER_THREADS)
ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, Closer closer) { ExecutorService provideExecutorService(@Named(Constants.PROPERTY_USER_THREADS) int count, Closer closer) {
if (userExecutorFromConstructor != null) if (userExecutorFromConstructor != null)
return userExecutorFromConstructor; return userExecutorFromConstructor;
return shutdownOnClose(newThreadPoolNamed("user thread %d", count), closer); return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("user thread %d", count)), closer);
} }
@Provides @Provides
@ -119,7 +277,7 @@ public class ExecutorServiceModule extends AbstractModule {
ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, Closer closer) { ExecutorService provideIOExecutor(@Named(Constants.PROPERTY_IO_WORKER_THREADS) int count, Closer closer) {
if (ioExecutorFromConstructor != null) if (ioExecutorFromConstructor != null)
return ioExecutorFromConstructor; return ioExecutorFromConstructor;
return shutdownOnClose(newThreadPoolNamed("i/o thread %d", count), closer); return shutdownOnClose(addToStringOnSubmit(newThreadPoolNamed("i/o thread %d", count)), closer);
} }
@VisibleForTesting @VisibleForTesting