Refactor into appropriate uses of scheduleUnlessShuttingDown (#37709)

Replace `threadPool().schedule()` / catch
`EsRejectedExecutionException` pattern with direct calls to
`ThreadPool#scheduleUnlessShuttingDown()`.

Closes #36318
This commit is contained in:
Dimitrios Liappis 2019-01-28 10:01:26 +02:00 committed by GitHub
parent b1735aa93b
commit 290c6637c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 9 additions and 32 deletions

View File

@ -130,17 +130,17 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob"); logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
} }
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
try { try {
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
if (clusterService.state().getNodes().getDataNodes().size() > 1) { if (clusterService.state().getNodes().getDataNodes().size() > 1) {
// Submit an info update job to be run immediately // Submit an info update job to be run immediately
threadPool.executor(executorName()).execute(() -> maybeRefresh()); threadPool.executor(executorName()).execute(() -> maybeRefresh());
} }
} catch (EsRejectedExecutionException ex) { } catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) { logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
}
} }
} }
@ -223,11 +223,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString()); logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString());
} }
try { threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), this);
threadPool.schedule(updateFrequency, executorName(), this);
} catch (EsRejectedExecutionException ex) {
logger.debug("Reschedule cluster info service was rejected", ex);
}
} }
} }
}); });

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable; import java.io.Closeable;
@ -185,15 +184,7 @@ final class TransportKeepAlive implements Closeable {
@Override @Override
protected void onAfterInLifecycle() { protected void onAfterInLifecycle() {
try { threadPool.scheduleUnlessShuttingDown(pingInterval, ThreadPool.Names.GENERIC, this);
threadPool.schedule(pingInterval, ThreadPool.Names.GENERIC, this);
} catch (EsRejectedExecutionException ex) {
if (ex.isExecutorShutdown()) {
logger.debug("couldn't schedule new ping execution, executor is shutting down", ex);
} else {
throw ex;
}
}
} }
@Override @Override

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.CommitStats;
@ -102,17 +101,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Map<String, String> headers) { Map<String, String> headers) {
ShardFollowTask params = taskInProgress.getParams(); ShardFollowTask params = taskInProgress.getParams();
Client followerClient = wrapClient(client, params.getHeaders()); Client followerClient = wrapClient(client, params.getHeaders());
BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> { BiConsumer<TimeValue, Runnable> scheduler = (delay, command) ->
try { threadPool.scheduleUnlessShuttingDown(delay, Ccr.CCR_THREAD_POOL_NAME, command);
threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug("couldn't schedule command, executor is shutting down", e);
} else {
throw e;
}
}
};
final String recordedLeaderShardHistoryUUID = getLeaderShardHistoryUUID(params); final String recordedLeaderShardHistoryUUID = getLeaderShardHistoryUUID(params);
return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params,