Issue 471:add better error message when there's an interrupted exception

This commit is contained in:
Adrian Cole 2011-03-05 20:57:19 -05:00
parent 7bcdc74a35
commit a988b47627
2 changed files with 173 additions and 43 deletions

View File

@ -21,7 +21,6 @@ package org.jclouds.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@ -31,6 +30,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ForwardingObject;
import com.google.common.util.concurrent.ExecutionList;
@ -44,54 +44,90 @@ import com.google.common.util.concurrent.ListenableFuture;
*/
@Beta
public class Futures {
@VisibleForTesting
static class CallGetAndRunExecutionList<T> implements Runnable {
private final Future<T> delegate;
private final ExecutionList executionList;
public static class FutureListener<T> {
private final Future<T> future;
final ExecutorService executor;
private final ExecutionList executionList = new ExecutionList();
private final AtomicBoolean hasListeners = new AtomicBoolean(false);
static <T> FutureListener<T> create(Future<T> future, ExecutorService executor) {
return new FutureListener<T>(future, executor);
public CallGetAndRunExecutionList(Future<T> delegate, ExecutionList executionList) {
this.delegate = checkNotNull(delegate, "delegate");
this.executionList = checkNotNull(executionList, "executionList");
}
private FutureListener(Future<T> future, ExecutorService executor) {
this.future = checkNotNull(future, "future");
this.executor = checkNotNull(executor, "executor");
@Override
public void run() {
try {
delegate.get();
} catch (InterruptedException e) {
// This thread was interrupted. This should never happen, so we
// throw an IllegalStateException.
Thread.currentThread().interrupt();
throw new IllegalStateException(String.format(
"interrupted calling get() on [%s], so could not run listeners", delegate), e);
} catch (Throwable e) {
// ExecutionException / CancellationException / RuntimeException
// The task is done, run the listeners.
}
executionList.run();
}
@Override
public String toString() {
return "[delegate=" + delegate + ", executionList=" + executionList + "]";
}
}
// Adapted from Guava
//
// * to allow us to enforce supply of an adapterExecutor
// note that this is done so that we can operate in Google AppEngine which
// restricts thread creation
// * to allow us to print debug info about what the delegate was doing
public static class FutureListener<T> {
final ExecutorService adapterExecutor;
// The execution list to hold our listeners.
private final ExecutionList executionList = new ExecutionList();
// This allows us to only start up a thread waiting on the delegate future
// when the first listener is added.
private final AtomicBoolean hasListeners = new AtomicBoolean(false);
// The delegate future.
private final Future<T> delegate;
static <T> FutureListener<T> create(Future<T> delegate, ExecutorService adapterExecutor) {
return new FutureListener<T>(delegate, adapterExecutor);
}
private FutureListener(Future<T> delegate, ExecutorService adapterExecutor) {
this.delegate = checkNotNull(delegate, "delegate");
this.adapterExecutor = checkNotNull(adapterExecutor, "adapterExecutor");
}
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
// When a listener is first added, we run a task that will wait for
// the future to finish, and when it is done will run the listeners.
if (!hasListeners.get() && hasListeners.compareAndSet(false, true)) {
executor.execute(new Runnable() {
/* @Override */
public void run() {
try {
future.get();
} catch (CancellationException e) {
// The task was cancelled, so it is done, run the listeners.
} catch (InterruptedException e) {
// This thread was interrupted. This should never happen, so we
// throw an IllegalStateException.
throw new IllegalStateException("Adapter thread interrupted!", e);
} catch (ExecutionException e) {
// The task caused an exception, so it is done, run the listeners.
}
executionList.run();
}
});
// the delegate to finish, and when it is done will run the listeners.
if (hasListeners.compareAndSet(false, true)) {
if (delegate.isDone()) {
// If the delegate is already done, run the execution list
// immediately on the current thread.
executionList.run();
return;
}
adapterExecutor.execute(new CallGetAndRunExecutionList<T>(delegate, executionList));
}
executionList.add(listener, exec);
}
Future<T> getFuture() {
return future;
return delegate;
}
ExecutorService getExecutor() {
return executor;
return adapterExecutor;
}
}
@ -119,27 +155,27 @@ public class Futures {
}
public static class LazyListenableFutureFunctionAdapter<I, O> extends ForwardingObject implements
ListenableFuture<O> {
ListenableFuture<O> {
private final FutureListener<I> futureListener;
private final Function<? super I, ? extends O> function;
static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(Future<I> future,
Function<? super I, ? extends O> function, ExecutorService executor) {
Function<? super I, ? extends O> function, ExecutorService executor) {
return new LazyListenableFutureFunctionAdapter<I, O>(future, function, executor);
}
static <I, O> LazyListenableFutureFunctionAdapter<I, O> create(FutureListener<I> futureListener,
Function<? super I, ? extends O> function) {
Function<? super I, ? extends O> function) {
return new LazyListenableFutureFunctionAdapter<I, O>(futureListener, function);
}
private LazyListenableFutureFunctionAdapter(Future<I> future, Function<? super I, ? extends O> function,
ExecutorService executor) {
ExecutorService executor) {
this(FutureListener.create(future, executor), function);
}
private LazyListenableFutureFunctionAdapter(FutureListener<I> futureListener,
Function<? super I, ? extends O> function) {
Function<? super I, ? extends O> function) {
this.futureListener = checkNotNull(futureListener, "futureListener");
this.function = checkNotNull(function, "function");
}
@ -219,18 +255,19 @@ public class Futures {
* chaining, so that we don't invoke get() early.
*/
public static <I, O> ListenableFuture<O> compose(Future<I> future, final Function<? super I, ? extends O> function,
ExecutorService executorService) {
ExecutorService executorService) {
if (future instanceof Futures.ListenableFutureAdapter<?>) {
Futures.ListenableFutureAdapter<I> lf = (ListenableFutureAdapter<I>) future;
if (lf.futureListener.executor.getClass().isAnnotationPresent(SingleThreaded.class))
if (lf.futureListener.adapterExecutor.getClass().isAnnotationPresent(SingleThreaded.class))
return Futures.LazyListenableFutureFunctionAdapter.create(
((org.jclouds.concurrent.Futures.ListenableFutureAdapter<I>) future).futureListener, function);
((org.jclouds.concurrent.Futures.ListenableFutureAdapter<I>) future).futureListener, function);
else
return com.google.common.util.concurrent.Futures.compose(lf, function, executorService);
} else if (executorService.getClass().isAnnotationPresent(SingleThreaded.class)) {
return Futures.LazyListenableFutureFunctionAdapter.create(future, function, executorService);
} else {
return com.google.common.util.concurrent.Futures.compose(Futures.makeListenable(future, executorService), function, executorService);
return com.google.common.util.concurrent.Futures.compose(Futures.makeListenable(future, executorService),
function, executorService);
}
}

View File

@ -0,0 +1,93 @@
/**
*
* Copyright (C) 2010 Cloud Conscious, LLC. <info@cloudconscious.com>
*
* ====================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*/
package org.jclouds.concurrent;
import static org.easymock.EasyMock.expect;
import static org.easymock.classextension.EasyMock.createMock;
import static org.easymock.classextension.EasyMock.replay;
import static org.easymock.classextension.EasyMock.verify;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.jclouds.concurrent.Futures.CallGetAndRunExecutionList;
import org.testng.annotations.Test;
import com.google.common.util.concurrent.ExecutionList;
/**
* Tests behavior of Futures
*
* @author Adrian Cole
*/
@Test(groups = "unit")
public class FuturesTest {
ExecutorService executorService = MoreExecutors.sameThreadExecutor();
@Test
public void testCallGetAndRunRunnableRunsListOnRuntimeException() throws InterruptedException, ExecutionException {
Runnable runnable = createMock(Runnable.class);
@SuppressWarnings("unchecked")
Future<String> future = createMock(Future.class);
runnable.run();
expect(future.get()).andThrow(new RuntimeException());
replay(runnable);
replay(future);
ExecutionList executionList = new ExecutionList();
executionList.add(runnable, executorService);
CallGetAndRunExecutionList<String> caller = new CallGetAndRunExecutionList<String>(future, executionList);
caller.run();
verify(runnable);
verify(future);
}
@Test
public void testCallGetAndRunRunnableInterruptsAndThrowsIllegalStateExceptionOnInterruptedException()
throws InterruptedException, ExecutionException {
Runnable runnable = createMock(Runnable.class);
@SuppressWarnings("unchecked")
Future<String> future = createMock(Future.class);
expect(future.get()).andThrow(new InterruptedException());
replay(runnable);
replay(future);
ExecutionList executionList = new ExecutionList();
executionList.add(runnable, executorService);
CallGetAndRunExecutionList<String> caller = new CallGetAndRunExecutionList<String>(future, executionList);
try {
caller.run();
assert false;
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "interrupted calling get() on [EasyMock for interface java.util.concurrent.Future], so could not run listeners");
}
verify(runnable);
verify(future);
}
}