ensure we don't fail if the executor is shut down

This commit is contained in:
Simon Willnauer 2016-01-27 17:04:26 +01:00
parent 7ff99eb89d
commit ccd819229d
1 changed files with 20 additions and 2 deletions

View File

@ -151,7 +151,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
return runnable; return runnable;
} }
private static class FilterAbstractRunnable extends AbstractRunnable { private class FilterAbstractRunnable extends AbstractRunnable {
private final ThreadContext contextHolder; private final ThreadContext contextHolder;
private final AbstractRunnable in; private final AbstractRunnable in;
private final ThreadContext.StoredContext ctx; private final ThreadContext.StoredContext ctx;
@ -184,9 +184,18 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
boolean started = false;
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){ try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
ctx.restore(); ctx.restore();
started = true;
in.doRun(); in.doRun();
} catch (IllegalStateException ex) {
if (started || isShutdown() == false) {
throw ex;
}
// if we hit an ISE here we have been shutting down
// this comes from the threadcontext and barfs if
// our threadpool has been shutting down
} }
} }
@ -197,7 +206,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
} }
private static class FilterRunnable implements Runnable { private class FilterRunnable implements Runnable {
private final ThreadContext contextHolder; private final ThreadContext contextHolder;
private final Runnable in; private final Runnable in;
private final ThreadContext.StoredContext ctx; private final ThreadContext.StoredContext ctx;
@ -210,9 +219,18 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
@Override @Override
public void run() { public void run() {
boolean started = false;
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){ try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
ctx.restore(); ctx.restore();
started = true;
in.run(); in.run();
} catch (IllegalStateException ex) {
if (started || isShutdown() == false) {
throw ex;
}
// if we hit an ISE here we have been shutting down
// this comes from the threadcontext and barfs if
// our threadpool has been shutting down
} }
} }
@Override @Override