Only rethrow exceptions if they are coming from the actual runnable but not if we are shutting down the pool

This commit is contained in:
Simon Willnauer 2016-01-27 21:06:24 +01:00
parent 5b836dbb11
commit 7ef762c8f0
2 changed files with 9 additions and 7 deletions

View File

@ -184,13 +184,14 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
boolean started = false; boolean whileRunning = false;
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){ try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
ctx.restore(); ctx.restore();
started = true; whileRunning = true;
in.doRun(); in.doRun();
whileRunning = false;
} catch (IllegalStateException ex) { } catch (IllegalStateException ex) {
if (started || isShutdown() == false) { if (whileRunning || isShutdown() == false) {
throw ex; throw ex;
} }
// if we hit an ISE here we have been shutting down // if we hit an ISE here we have been shutting down
@ -219,13 +220,14 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
@Override @Override
public void run() { public void run() {
boolean started = false; boolean whileRunning = false;
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){ try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
ctx.restore(); ctx.restore();
started = true; whileRunning = true;
in.run(); in.run();
whileRunning = false;
} catch (IllegalStateException ex) { } catch (IllegalStateException ex) {
if (started || isShutdown() == false) { if (whileRunning || isShutdown() == false) {
throw ex; throw ex;
} }
// if we hit an ISE here we have been shutting down // if we hit an ISE here we have been shutting down

View File

@ -325,7 +325,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
try { try {
Settings nodeSettings = Settings.settingsBuilder() Settings nodeSettings = Settings.settingsBuilder()
.put("threadpool." + threadPoolName + ".queue_size", 1000) .put("threadpool." + threadPoolName + ".queue_size", 1000)
.put("name", "testCachedExecutorType").build(); .put("name", "testShutdownNowInterrupts").build();
threadPool = new ThreadPool(nodeSettings); threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings); threadPool.setClusterSettings(clusterSettings);