diff --git a/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java b/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java index b47151995b..5097696e94 100644 --- a/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java +++ b/core/src/main/java/org/jclouds/concurrent/config/ExecutorServiceModule.java @@ -52,7 +52,12 @@ import com.google.inject.Provides; /** * Configures {@link ExecutorService}. * - * Note that this uses threads + * Note that this uses threads. + * + *

+ * This extends the underlying Future to expose a description (the task's toString) and the submission context (stack trace). + * The submission stack trace is appended to relevant stack traces on exceptions that are returned, + * so the user can see the logical chain of execution (in the executor, and where it was passed to the executor). * * @author Adrian Cole */ @@ -92,7 +97,7 @@ public class ExecutorServiceModule extends AbstractModule { static ExecutorService addToStringOnSubmit(ExecutorService executor) { if (executor != null) { - return new AddToStringOnSubmitExecutorService(executor); + return new DescribingExecutorService(executor); } return executor; } @@ -117,11 +122,11 @@ public class ExecutorServiceModule extends AbstractModule { protected void configure() { } - static class AddToStringOnSubmitExecutorService implements ExecutorService { + static class DescribingExecutorService implements ExecutorService { private final ExecutorService delegate; - public AddToStringOnSubmitExecutorService(ExecutorService delegate) { + public DescribingExecutorService(ExecutorService delegate) { this.delegate = checkNotNull(delegate, "delegate"); } @@ -174,18 +179,18 @@ public class ExecutorServiceModule extends AbstractModule { @Override public Future submit(Callable task) { - return new AddToStringFuture(delegate.submit(task), task.toString()); + return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere()); } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public Future submit(Runnable task) { - return new AddToStringFuture(delegate.submit(task), task.toString()); + return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere()); } @Override public Future submit(Runnable task, T result) { - return new AddToStringFuture(delegate.submit(task, result), task.toString()); + return new DescribedFuture(delegate.submit(task, result), task.toString(), getStackTraceHere()); } @Override @@ -210,13 +215,15 @@ public class ExecutorServiceModule extends AbstractModule { } - static class AddToStringFuture implements Future { + static class DescribedFuture implements Future { private final Future delegate; - private final String toString; + private final String description; + private StackTraceElement[] submissionTrace; - public AddToStringFuture(Future delegate, String toString) { + public DescribedFuture(Future delegate, String description, StackTraceElement[] submissionTrace) { this.delegate = delegate; - this.toString = toString; + this.description = description; + this.submissionTrace = submissionTrace; } @Override @@ -226,12 +233,65 @@ public class ExecutorServiceModule extends AbstractModule { @Override public T get() throws InterruptedException, ExecutionException { - return delegate.get(); + try { + return delegate.get(); + } catch (ExecutionException e) { + throw ensureCauseHasSubmissionTrace(e); + } catch (InterruptedException e) { + throw ensureCauseHasSubmissionTrace(e); + } } @Override public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException { - return delegate.get(arg0, arg1); + try { + return delegate.get(arg0, arg1); + } catch (ExecutionException e) { + throw ensureCauseHasSubmissionTrace(e); + } catch (InterruptedException e) { + throw ensureCauseHasSubmissionTrace(e); + } catch (TimeoutException e) { + throw ensureCauseHasSubmissionTrace(e); + } + } + + /** This method does the work to ensure _if_ a submission stack trace was provided, + * it is included in the exception. most errors are thrown from the frame of the + * Future.get call, with a cause that took place in the executor's thread. + * We extend the stack trace of that cause with the submission stack trace. + * (An alternative would be to put the stack trace as a root cause, + * at the bottom of the stack, or appended to all traces, or inserted + * after the second cause, etc ... but since we can't change the "Caused by:" + * method in Throwable the compromise made here seems best.) + */ + private ET ensureCauseHasSubmissionTrace(ET e) { + if (submissionTrace==null) return e; + if (e.getCause()==null) { + ExecutionException ee = new ExecutionException("task submitted from the following trace", null); + e.initCause(ee); + return e; + } + Throwable cause = e.getCause(); + StackTraceElement[] causeTrace = cause.getStackTrace(); + boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length; + for (int i=0; causeIncludesSubmissionTrace && i euc = performSubmissionInSeparateMethod1(user, t1); + assert euc.toString().indexOf("ConfigurableRunner") >= 0; + assert euc.get().equals("okay"); + + Future eic = performSubmissionInSeparateMethod1(io, t1); + assert eic.toString().indexOf("ConfigurableRunner") >= 0; + assert eic.get().equals("okay"); + + + closer.close(); + } + + /* + * The decoration makes sure that the stack trace looks like the following. + * Note the last three included trace elements: this details where the task was submitted _from_ + * (technically it is a different stack frame, since it is across threads; but logically it is the same) + * +java.util.concurrent.ExecutionException: java.lang.IllegalStateException: foo + at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) + at java.util.concurrent.FutureTask.get(FutureTask.java:83) + at org.jclouds.concurrent.config.ExecutorServiceModule$DescribedFuture.get(ExecutorServiceModule.java:232) + at org.jclouds.concurrent.config.ExecutorServiceModuleTest.checkFutureGetFailsWith(ExecutorServiceModuleTest.java:186) + at org.jclouds.concurrent.config.ExecutorServiceModuleTest.testDescribedFutureExceptionIncludesSubmissionTrace(ExecutorServiceModuleTest.java:171) + at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) + at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) + at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) + at java.lang.reflect.Method.invoke(Method.java:597) + at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:80) + at org.testng.internal.Invoker.invokeMethod(Invoker.java:691) + at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:883) + at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1208) + at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:127) + at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:111) + at org.testng.TestRunner.privateRun(TestRunner.java:753) + at org.testng.TestRunner.run(TestRunner.java:613) + at org.testng.SuiteRunner.runTest(SuiteRunner.java:335) + at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:330) + at org.testng.SuiteRunner.privateRun(SuiteRunner.java:292) + at org.testng.SuiteRunner.run(SuiteRunner.java:241) + at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) + at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86) + at org.testng.TestNG.runSuitesSequentially(TestNG.java:1169) + at org.testng.TestNG.runSuitesLocally(TestNG.java:1094) + at org.testng.TestNG.run(TestNG.java:1006) + at org.testng.remote.RemoteTestNG.run(RemoteTestNG.java:107) + at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:199) + at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:170) +Caused by: java.lang.IllegalStateException: foo + at org.jclouds.concurrent.config.ExecutorServiceModuleTest$ConfigurableRunner.call(ExecutorServiceModuleTest.java:206) + at org.jclouds.concurrent.config.ExecutorServiceModuleTest$ConfigurableRunner.run(ExecutorServiceModuleTest.java:203) + at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) + at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) + at java.util.concurrent.FutureTask.run(FutureTask.java:138) + at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) + at java.lang.Thread.run(Thread.java:637) + at org.jclouds.concurrent.config.ExecutorServiceModule$DescribingExecutorService.submit(ExecutorServiceModule.java:188) + at org.jclouds.concurrent.config.ExecutorServiceModuleTest.performSubmissionInSeparateMethod2(ExecutorServiceModuleTest.java:181) + at org.jclouds.concurrent.config.ExecutorServiceModuleTest.testDescribedFutureExceptionIncludesSubmissionTrace(ExecutorServiceModuleTest.java:170) + ... 24 more + + * + */ + @Test + public void testDescribedFutureExceptionIncludesSubmissionTrace() throws Exception { + + ExecutorServiceModule module = new ExecutorServiceModule() { + @Override + protected void configure() { + bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1); + bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1); + super.configure(); + } + }; + + Injector i = Guice.createInjector(module); + Closer closer = i.getInstance(Closer.class); + + ExecutorService user = i + .getInstance(Key.get(ExecutorService.class, Names.named(Constants.PROPERTY_USER_THREADS))); + ExecutorService io = i.getInstance(Key.get(ExecutorService.class, Names + .named(Constants.PROPERTY_IO_WORKER_THREADS))); + + ConfigurableRunner t1 = new ConfigurableRunner(); + t1.failMessage = "foo"; + t1.result = "shouldn't happen"; + + Future euc = performSubmissionInSeparateMethod1(user, t1); + checkFutureGetFailsWith(euc, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod1"); + + Future eur = performSubmissionInSeparateMethod2(user, t1); + checkFutureGetFailsWith(eur, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod2"); + + Future eic = performSubmissionInSeparateMethod1(io, t1); + checkFutureGetFailsWith(eic, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod1"); + + Future eir = performSubmissionInSeparateMethod2(io, t1); + checkFutureGetFailsWith(eir, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod2"); + + closer.close(); + } + + static Future performSubmissionInSeparateMethod1(ExecutorService user, ConfigurableRunner t1) { + return user.submit((Callable)t1); + } + + static Future performSubmissionInSeparateMethod2(ExecutorService io, ConfigurableRunner t1) { + return io.submit((Runnable)t1, (Object)"shouldn't happen"); + } + + static void checkFutureGetFailsWith(Future task, String ...requiredPhrases) throws Exception { + try { + task.get(); + assert false : "task should have failed"; + } catch (ExecutionException e) { + String trace = Throwables.getStackTraceAsString(e); + for (String requiredPhrase : requiredPhrases) { + assert trace.indexOf(requiredPhrase) >= 0 : "stack trace should have contained '"+requiredPhrase+"'"; + } + } + } + + private static class ConfigurableRunner implements Runnable, Callable { + private Object result; + private String failMessage; + + @Override + public void run() { + call(); + } + public Object call() { + if (failMessage!=null) throw new IllegalStateException(failMessage); + return result; + } + } }