diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index e8261ca9f09..8d78f9c838e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -130,17 +130,17 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode if (logger.isTraceEnabled()) { logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob"); } + + // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running + threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob()); + try { - // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running - threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob()); if (clusterService.state().getNodes().getDataNodes().size() > 1) { // Submit an info update job to be run immediately threadPool.executor(executorName()).execute(() -> maybeRefresh()); } } catch (EsRejectedExecutionException ex) { - if (logger.isDebugEnabled()) { - logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex); - } + logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex); } } @@ -223,11 +223,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode if (logger.isTraceEnabled()) { logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString()); } - try { - threadPool.schedule(updateFrequency, executorName(), this); - } catch (EsRejectedExecutionException ex) { - logger.debug("Reschedule cluster info service was rejected", ex); - } + threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), this); } } }); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java index b8d06e7e117..8f17377c2a2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -185,15 +184,7 @@ final class TransportKeepAlive implements Closeable { @Override protected void onAfterInLifecycle() { - try { - threadPool.schedule(pingInterval, ThreadPool.Names.GENERIC, this); - } catch (EsRejectedExecutionException ex) { - if (ex.isExecutorShutdown()) { - logger.debug("couldn't schedule new ping execution, executor is shutting down", ex); - } else { - throw ex; - } - } + threadPool.scheduleUnlessShuttingDown(pingInterval, ThreadPool.Names.GENERIC, this); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 956171ba9b7..40aa90dcab5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.CommitStats; @@ -102,17 +101,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor headers) { ShardFollowTask params = taskInProgress.getParams(); Client followerClient = wrapClient(client, params.getHeaders()); - BiConsumer scheduler = (delay, command) -> { - try { - threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); - } catch (EsRejectedExecutionException e) { - if (e.isExecutorShutdown()) { - logger.debug("couldn't schedule command, executor is shutting down", e); - } else { - throw e; - } - } - }; + BiConsumer scheduler = (delay, command) -> + threadPool.scheduleUnlessShuttingDown(delay, Ccr.CCR_THREAD_POOL_NAME, command); final String recordedLeaderShardHistoryUUID = getLeaderShardHistoryUUID(params); return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params,