Catch ESRejectedExecutionException on node close

When a node shuts down thread pools might throw
ESRejectedExecutionException and our test framework fails tests if
exceptions are not caught hitting uncaught exception handler.
This commit is contained in:
Simon Willnauer 2013-09-09 21:10:53 +02:00
parent 764aa54f2d
commit 777d7f47a5

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
@ -121,11 +122,17 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
if (logger.isTraceEnabled()) {
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
}
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
// Submit an info update job to be run immediately
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false));
try {
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
// Submit an info update job to be run immediately
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false));
}
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
}
}
}
@ -192,7 +199,13 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
if (logger.isTraceEnabled()) {
logger.trace("Submitting new rescheduling cluster info update job");
}
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(true));
try {
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(true));
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex);
}
}
}
}