renamed the Future we use to be DescribedFuture (instead of just AddedToStringFuture),

and added submission stack trace to that Future, with that trace appended to exceptions returned by the Future.get
(so the stack trace shows the logical continuity of processes submitted to an executor)
This commit is contained in:
Alex Heneveld 2011-09-27 14:27:46 -07:00
parent ea148929ff
commit 48be98ee7d
2 changed files with 248 additions and 16 deletions

View File

@ -52,7 +52,12 @@ import com.google.inject.Provides;
/**
* Configures {@link ExecutorService}.
*
* Note that this uses threads
* Note that this uses threads.
*
* <p>
* This extends the underlying Future to expose a description (the task's toString) and the submission context (stack trace).
* The submission stack trace is appended to relevant stack traces on exceptions that are returned,
* so the user can see the logical chain of execution (in the executor, and where it was passed to the executor).
*
* @author Adrian Cole
*/
@ -92,7 +97,7 @@ public class ExecutorServiceModule extends AbstractModule {
static ExecutorService addToStringOnSubmit(ExecutorService executor) {
if (executor != null) {
return new AddToStringOnSubmitExecutorService(executor);
return new DescribingExecutorService(executor);
}
return executor;
}
@ -117,11 +122,11 @@ public class ExecutorServiceModule extends AbstractModule {
protected void configure() {
}
static class AddToStringOnSubmitExecutorService implements ExecutorService {
static class DescribingExecutorService implements ExecutorService {
private final ExecutorService delegate;
public AddToStringOnSubmitExecutorService(ExecutorService delegate) {
public DescribingExecutorService(ExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}
@ -174,18 +179,18 @@ public class ExecutorServiceModule extends AbstractModule {
@Override
public <T> Future<T> submit(Callable<T> task) {
return new AddToStringFuture<T>(delegate.submit(task), task.toString());
return new DescribedFuture<T>(delegate.submit(task), task.toString(), getStackTraceHere());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public Future<?> submit(Runnable task) {
return new AddToStringFuture(delegate.submit(task), task.toString());
return new DescribedFuture(delegate.submit(task), task.toString(), getStackTraceHere());
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return new AddToStringFuture<T>(delegate.submit(task, result), task.toString());
return new DescribedFuture<T>(delegate.submit(task, result), task.toString(), getStackTraceHere());
}
@Override
@ -210,13 +215,15 @@ public class ExecutorServiceModule extends AbstractModule {
}
static class AddToStringFuture<T> implements Future<T> {
static class DescribedFuture<T> implements Future<T> {
private final Future<T> delegate;
private final String toString;
private final String description;
private StackTraceElement[] submissionTrace;
public AddToStringFuture(Future<T> delegate, String toString) {
public DescribedFuture(Future<T> delegate, String description, StackTraceElement[] submissionTrace) {
this.delegate = delegate;
this.toString = toString;
this.description = description;
this.submissionTrace = submissionTrace;
}
@Override
@ -226,12 +233,65 @@ public class ExecutorServiceModule extends AbstractModule {
@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get();
try {
return delegate.get();
} catch (ExecutionException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (InterruptedException e) {
throw ensureCauseHasSubmissionTrace(e);
}
}
@Override
public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(arg0, arg1);
try {
return delegate.get(arg0, arg1);
} catch (ExecutionException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (InterruptedException e) {
throw ensureCauseHasSubmissionTrace(e);
} catch (TimeoutException e) {
throw ensureCauseHasSubmissionTrace(e);
}
}
/** This method does the work to ensure _if_ a submission stack trace was provided,
* it is included in the exception. most errors are thrown from the frame of the
* Future.get call, with a cause that took place in the executor's thread.
* We extend the stack trace of that cause with the submission stack trace.
* (An alternative would be to put the stack trace as a root cause,
* at the bottom of the stack, or appended to all traces, or inserted
* after the second cause, etc ... but since we can't change the "Caused by:"
* method in Throwable the compromise made here seems best.)
*/
private <ET extends Exception> ET ensureCauseHasSubmissionTrace(ET e) {
if (submissionTrace==null) return e;
if (e.getCause()==null) {
ExecutionException ee = new ExecutionException("task submitted from the following trace", null);
e.initCause(ee);
return e;
}
Throwable cause = e.getCause();
StackTraceElement[] causeTrace = cause.getStackTrace();
boolean causeIncludesSubmissionTrace = submissionTrace.length >= causeTrace.length;
for (int i=0; causeIncludesSubmissionTrace && i<submissionTrace.length; i++) {
if (!causeTrace[causeTrace.length-1-i].equals(submissionTrace[submissionTrace.length-1-i])) {
causeIncludesSubmissionTrace = false;
}
}
if (!causeIncludesSubmissionTrace) {
cause.setStackTrace(merge(causeTrace, submissionTrace));
}
return e;
}
private StackTraceElement[] merge(StackTraceElement[] t1, StackTraceElement[] t2) {
StackTraceElement[] t12 = new StackTraceElement[t1.length + t2.length];
System.arraycopy(t1, 0, t12, 0, t1.length);
System.arraycopy(t2, 0, t12, t1.length, t2.length);
return t12;
}
@Override
@ -256,7 +316,7 @@ public class ExecutorServiceModule extends AbstractModule {
@Override
public String toString() {
return toString;
return description;
}
}
@ -302,4 +362,14 @@ public class ExecutorServiceModule extends AbstractModule {
.setThreadFactory(Executors.defaultThreadFactory()).build());
}
/** returns the stack trace at the caller */
static StackTraceElement[] getStackTraceHere() {
// remove the first two items in the stack trace (because the first one refers to the call to
// Thread.getStackTrace, and the second one is us)
StackTraceElement[] fullSubmissionTrace = Thread.currentThread().getStackTrace();
StackTraceElement[] cleanedSubmissionTrace = new StackTraceElement[fullSubmissionTrace.length-2];
System.arraycopy(fullSubmissionTrace, 2, cleanedSubmissionTrace, 0, cleanedSubmissionTrace.length);
return cleanedSubmissionTrace;
}
}

View File

@ -25,12 +25,16 @@ import static org.easymock.classextension.EasyMock.verify;
import static org.testng.Assert.assertEquals;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.jclouds.Constants;
import org.jclouds.lifecycle.Closer;
import org.testng.annotations.Test;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
@ -64,15 +68,14 @@ public class ExecutorServiceModuleTest {
public void testShutdownOnCloseThroughModule() throws IOException {
ExecutorServiceModule module = new ExecutorServiceModule() {
@Override
protected void configure() {
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1);
super.configure();
}
};
Injector i = Guice.createInjector(module);
assertEquals(module.userExecutorFromConstructor, null);
assertEquals(module.ioExecutorFromConstructor, null);
@ -93,4 +96,163 @@ public class ExecutorServiceModuleTest {
assert io.isShutdown();
}
@Test
public void testDescribedFutureToString() throws Exception {
ExecutorServiceModule module = new ExecutorServiceModule() {
@Override
protected void configure() {
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1);
super.configure();
}
};
Injector i = Guice.createInjector(module);
Closer closer = i.getInstance(Closer.class);
ExecutorService user = i
.getInstance(Key.get(ExecutorService.class, Names.named(Constants.PROPERTY_USER_THREADS)));
ExecutorService io = i.getInstance(Key.get(ExecutorService.class, Names
.named(Constants.PROPERTY_IO_WORKER_THREADS)));
ConfigurableRunner t1 = new ConfigurableRunner();
t1.result = "okay";
Future<Object> euc = performSubmissionInSeparateMethod1(user, t1);
assert euc.toString().indexOf("ConfigurableRunner") >= 0;
assert euc.get().equals("okay");
Future<Object> eic = performSubmissionInSeparateMethod1(io, t1);
assert eic.toString().indexOf("ConfigurableRunner") >= 0;
assert eic.get().equals("okay");
closer.close();
}
/*
* The decoration makes sure that the stack trace looks like the following.
* Note the last three included trace elements: this details where the task was submitted _from_
* (technically it is a different stack frame, since it is across threads; but logically it is the same)
*
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: foo
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at org.jclouds.concurrent.config.ExecutorServiceModule$DescribedFuture.get(ExecutorServiceModule.java:232)
at org.jclouds.concurrent.config.ExecutorServiceModuleTest.checkFutureGetFailsWith(ExecutorServiceModuleTest.java:186)
at org.jclouds.concurrent.config.ExecutorServiceModuleTest.testDescribedFutureExceptionIncludesSubmissionTrace(ExecutorServiceModuleTest.java:171)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:80)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:691)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:883)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1208)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:127)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:111)
at org.testng.TestRunner.privateRun(TestRunner.java:753)
at org.testng.TestRunner.run(TestRunner.java:613)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:335)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:330)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:292)
at org.testng.SuiteRunner.run(SuiteRunner.java:241)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1169)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1094)
at org.testng.TestNG.run(TestNG.java:1006)
at org.testng.remote.RemoteTestNG.run(RemoteTestNG.java:107)
at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:199)
at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:170)
Caused by: java.lang.IllegalStateException: foo
at org.jclouds.concurrent.config.ExecutorServiceModuleTest$ConfigurableRunner.call(ExecutorServiceModuleTest.java:206)
at org.jclouds.concurrent.config.ExecutorServiceModuleTest$ConfigurableRunner.run(ExecutorServiceModuleTest.java:203)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:637)
at org.jclouds.concurrent.config.ExecutorServiceModule$DescribingExecutorService.submit(ExecutorServiceModule.java:188)
at org.jclouds.concurrent.config.ExecutorServiceModuleTest.performSubmissionInSeparateMethod2(ExecutorServiceModuleTest.java:181)
at org.jclouds.concurrent.config.ExecutorServiceModuleTest.testDescribedFutureExceptionIncludesSubmissionTrace(ExecutorServiceModuleTest.java:170)
... 24 more
*
*/
@Test
public void testDescribedFutureExceptionIncludesSubmissionTrace() throws Exception {
ExecutorServiceModule module = new ExecutorServiceModule() {
@Override
protected void configure() {
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_IO_WORKER_THREADS)).to(1);
bindConstant().annotatedWith(Names.named(Constants.PROPERTY_USER_THREADS)).to(1);
super.configure();
}
};
Injector i = Guice.createInjector(module);
Closer closer = i.getInstance(Closer.class);
ExecutorService user = i
.getInstance(Key.get(ExecutorService.class, Names.named(Constants.PROPERTY_USER_THREADS)));
ExecutorService io = i.getInstance(Key.get(ExecutorService.class, Names
.named(Constants.PROPERTY_IO_WORKER_THREADS)));
ConfigurableRunner t1 = new ConfigurableRunner();
t1.failMessage = "foo";
t1.result = "shouldn't happen";
Future<Object> euc = performSubmissionInSeparateMethod1(user, t1);
checkFutureGetFailsWith(euc, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod1");
Future<Object> eur = performSubmissionInSeparateMethod2(user, t1);
checkFutureGetFailsWith(eur, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod2");
Future<Object> eic = performSubmissionInSeparateMethod1(io, t1);
checkFutureGetFailsWith(eic, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod1");
Future<Object> eir = performSubmissionInSeparateMethod2(io, t1);
checkFutureGetFailsWith(eir, "foo", "testDescribedFutureExceptionIncludesSubmissionTrace", "performSubmissionInSeparateMethod2");
closer.close();
}
static Future<Object> performSubmissionInSeparateMethod1(ExecutorService user, ConfigurableRunner t1) {
return user.submit((Callable<Object>)t1);
}
static Future<Object> performSubmissionInSeparateMethod2(ExecutorService io, ConfigurableRunner t1) {
return io.submit((Runnable)t1, (Object)"shouldn't happen");
}
static void checkFutureGetFailsWith(Future<Object> task, String ...requiredPhrases) throws Exception {
try {
task.get();
assert false : "task should have failed";
} catch (ExecutionException e) {
String trace = Throwables.getStackTraceAsString(e);
for (String requiredPhrase : requiredPhrases) {
assert trace.indexOf(requiredPhrase) >= 0 : "stack trace should have contained '"+requiredPhrase+"'";
}
}
}
private static class ConfigurableRunner implements Runnable, Callable<Object> {
private Object result;
private String failMessage;
@Override
public void run() {
call();
}
public Object call() {
if (failMessage!=null) throw new IllegalStateException(failMessage);
return result;
}
}
}