From a988b476274e576597d80fcaa0c1481dc35b3310 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Sat, 5 Mar 2011 20:57:19 -0500 Subject: [PATCH] Issue 471:add better error message when there's an interrupted exception --- .../java/org/jclouds/concurrent/Futures.java | 123 ++++++++++++------ .../org/jclouds/concurrent/FuturesTest.java | 93 +++++++++++++ 2 files changed, 173 insertions(+), 43 deletions(-) create mode 100644 core/src/test/java/org/jclouds/concurrent/FuturesTest.java diff --git a/core/src/main/java/org/jclouds/concurrent/Futures.java b/core/src/main/java/org/jclouds/concurrent/Futures.java index f74ffbb9c1..139d72a5f2 100644 --- a/core/src/main/java/org/jclouds/concurrent/Futures.java +++ b/core/src/main/java/org/jclouds/concurrent/Futures.java @@ -21,7 +21,6 @@ package org.jclouds.concurrent; import static com.google.common.base.Preconditions.checkNotNull; -import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -31,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.ForwardingObject; import com.google.common.util.concurrent.ExecutionList; @@ -44,54 +44,90 @@ import com.google.common.util.concurrent.ListenableFuture; */ @Beta public class Futures { + @VisibleForTesting + static class CallGetAndRunExecutionList implements Runnable { + private final Future delegate; + private final ExecutionList executionList; - public static class FutureListener { - private final Future future; - final ExecutorService executor; - private final ExecutionList executionList = new ExecutionList(); - private final AtomicBoolean hasListeners = new AtomicBoolean(false); - - static FutureListener create(Future future, ExecutorService executor) { - return new FutureListener(future, executor); + public CallGetAndRunExecutionList(Future delegate, ExecutionList executionList) { + this.delegate = checkNotNull(delegate, "delegate"); + this.executionList = checkNotNull(executionList, "executionList"); } - private FutureListener(Future future, ExecutorService executor) { - this.future = checkNotNull(future, "future"); - this.executor = checkNotNull(executor, "executor"); + @Override + public void run() { + try { + delegate.get(); + } catch (InterruptedException e) { + // This thread was interrupted. This should never happen, so we + // throw an IllegalStateException. + Thread.currentThread().interrupt(); + throw new IllegalStateException(String.format( + "interrupted calling get() on [%s], so could not run listeners", delegate), e); + } catch (Throwable e) { + // ExecutionException / CancellationException / RuntimeException + // The task is done, run the listeners. + } + executionList.run(); + } + + @Override + public String toString() { + return "[delegate=" + delegate + ", executionList=" + executionList + "]"; + } + } + + // Adapted from Guava + // + // * to allow us to enforce supply of an adapterExecutor + // note that this is done so that we can operate in Google AppEngine which + // restricts thread creation + // * to allow us to print debug info about what the delegate was doing + public static class FutureListener { + + final ExecutorService adapterExecutor; + + // The execution list to hold our listeners. + private final ExecutionList executionList = new ExecutionList(); + + // This allows us to only start up a thread waiting on the delegate future + // when the first listener is added. + private final AtomicBoolean hasListeners = new AtomicBoolean(false); + + // The delegate future. + private final Future delegate; + + static FutureListener create(Future delegate, ExecutorService adapterExecutor) { + return new FutureListener(delegate, adapterExecutor); + } + + private FutureListener(Future delegate, ExecutorService adapterExecutor) { + this.delegate = checkNotNull(delegate, "delegate"); + this.adapterExecutor = checkNotNull(adapterExecutor, "adapterExecutor"); } public void addListener(Runnable listener, Executor exec) { + executionList.add(listener, exec); // When a listener is first added, we run a task that will wait for - // the future to finish, and when it is done will run the listeners. - if (!hasListeners.get() && hasListeners.compareAndSet(false, true)) { - executor.execute(new Runnable() { - /* @Override */ - public void run() { - try { - future.get(); - } catch (CancellationException e) { - // The task was cancelled, so it is done, run the listeners. - } catch (InterruptedException e) { - // This thread was interrupted. This should never happen, so we - // throw an IllegalStateException. - throw new IllegalStateException("Adapter thread interrupted!", e); - } catch (ExecutionException e) { - // The task caused an exception, so it is done, run the listeners. - } - executionList.run(); - } - }); + // the delegate to finish, and when it is done will run the listeners. + if (hasListeners.compareAndSet(false, true)) { + if (delegate.isDone()) { + // If the delegate is already done, run the execution list + // immediately on the current thread. + executionList.run(); + return; + } + adapterExecutor.execute(new CallGetAndRunExecutionList(delegate, executionList)); } - executionList.add(listener, exec); } Future getFuture() { - return future; + return delegate; } ExecutorService getExecutor() { - return executor; + return adapterExecutor; } } @@ -119,27 +155,27 @@ public class Futures { } public static class LazyListenableFutureFunctionAdapter extends ForwardingObject implements - ListenableFuture { + ListenableFuture { private final FutureListener futureListener; private final Function function; static LazyListenableFutureFunctionAdapter create(Future future, - Function function, ExecutorService executor) { + Function function, ExecutorService executor) { return new LazyListenableFutureFunctionAdapter(future, function, executor); } static LazyListenableFutureFunctionAdapter create(FutureListener futureListener, - Function function) { + Function function) { return new LazyListenableFutureFunctionAdapter(futureListener, function); } private LazyListenableFutureFunctionAdapter(Future future, Function function, - ExecutorService executor) { + ExecutorService executor) { this(FutureListener.create(future, executor), function); } private LazyListenableFutureFunctionAdapter(FutureListener futureListener, - Function function) { + Function function) { this.futureListener = checkNotNull(futureListener, "futureListener"); this.function = checkNotNull(function, "function"); } @@ -219,18 +255,19 @@ public class Futures { * chaining, so that we don't invoke get() early. */ public static ListenableFuture compose(Future future, final Function function, - ExecutorService executorService) { + ExecutorService executorService) { if (future instanceof Futures.ListenableFutureAdapter) { Futures.ListenableFutureAdapter lf = (ListenableFutureAdapter) future; - if (lf.futureListener.executor.getClass().isAnnotationPresent(SingleThreaded.class)) + if (lf.futureListener.adapterExecutor.getClass().isAnnotationPresent(SingleThreaded.class)) return Futures.LazyListenableFutureFunctionAdapter.create( - ((org.jclouds.concurrent.Futures.ListenableFutureAdapter) future).futureListener, function); + ((org.jclouds.concurrent.Futures.ListenableFutureAdapter) future).futureListener, function); else return com.google.common.util.concurrent.Futures.compose(lf, function, executorService); } else if (executorService.getClass().isAnnotationPresent(SingleThreaded.class)) { return Futures.LazyListenableFutureFunctionAdapter.create(future, function, executorService); } else { - return com.google.common.util.concurrent.Futures.compose(Futures.makeListenable(future, executorService), function, executorService); + return com.google.common.util.concurrent.Futures.compose(Futures.makeListenable(future, executorService), + function, executorService); } } diff --git a/core/src/test/java/org/jclouds/concurrent/FuturesTest.java b/core/src/test/java/org/jclouds/concurrent/FuturesTest.java new file mode 100644 index 0000000000..36706c3a79 --- /dev/null +++ b/core/src/test/java/org/jclouds/concurrent/FuturesTest.java @@ -0,0 +1,93 @@ +/** + * + * Copyright (C) 2010 Cloud Conscious, LLC. + * + * ==================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + */ + +package org.jclouds.concurrent; + +import static org.easymock.EasyMock.expect; +import static org.easymock.classextension.EasyMock.createMock; +import static org.easymock.classextension.EasyMock.replay; +import static org.easymock.classextension.EasyMock.verify; +import static org.testng.Assert.assertEquals; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.jclouds.concurrent.Futures.CallGetAndRunExecutionList; +import org.testng.annotations.Test; + +import com.google.common.util.concurrent.ExecutionList; + +/** + * Tests behavior of Futures + * + * @author Adrian Cole + */ +@Test(groups = "unit") +public class FuturesTest { + ExecutorService executorService = MoreExecutors.sameThreadExecutor(); + + @Test + public void testCallGetAndRunRunnableRunsListOnRuntimeException() throws InterruptedException, ExecutionException { + + Runnable runnable = createMock(Runnable.class); + @SuppressWarnings("unchecked") + Future future = createMock(Future.class); + runnable.run(); + expect(future.get()).andThrow(new RuntimeException()); + replay(runnable); + replay(future); + + ExecutionList executionList = new ExecutionList(); + executionList.add(runnable, executorService); + + CallGetAndRunExecutionList caller = new CallGetAndRunExecutionList(future, executionList); + caller.run(); + + verify(runnable); + verify(future); + } + + @Test + public void testCallGetAndRunRunnableInterruptsAndThrowsIllegalStateExceptionOnInterruptedException() + throws InterruptedException, ExecutionException { + + Runnable runnable = createMock(Runnable.class); + @SuppressWarnings("unchecked") + Future future = createMock(Future.class); + expect(future.get()).andThrow(new InterruptedException()); + replay(runnable); + replay(future); + + ExecutionList executionList = new ExecutionList(); + executionList.add(runnable, executorService); + + CallGetAndRunExecutionList caller = new CallGetAndRunExecutionList(future, executionList); + try { + caller.run(); + assert false; + } catch (IllegalStateException e) { + assertEquals(e.getMessage(), "interrupted calling get() on [EasyMock for interface java.util.concurrent.Future], so could not run listeners"); + } + + verify(runnable); + verify(future); + } + +}