[ML] Stop using the management thread pool unnecessarily for ML actions (elastic/x-pack-elasticsearch#1213)

The management thread pool only has 5 threads and clogging it up makes
monitoring think the cluster is dead.

relates elastic/x-pack-elasticsearch#1210

Original commit: elastic/x-pack-elasticsearch@f4ad7578d9
This commit is contained in:
David Roberts 2017-04-26 17:17:26 +01:00 committed by GitHub
parent 6b4db0fc36
commit f3f9cb6d74
7 changed files with 19 additions and 14 deletions

View File

@ -149,7 +149,7 @@ public class MachineLearning implements ActionPlugin {
public static final String BASE_PATH = "/_xpack/ml/"; public static final String BASE_PATH = "/_xpack/ml/";
public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed"; public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed";
public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect"; public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect";
public static final String NORMALIZER_THREAD_POOL_NAME = NAME + "_normalizer"; public static final String UTILITY_THREAD_POOL_NAME = NAME + "_utility";
public static final Setting<Boolean> AUTODETECT_PROCESS = public static final Setting<Boolean> AUTODETECT_PROCESS =
Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope); Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope);
@ -296,7 +296,7 @@ public class MachineLearning implements ActionPlugin {
executorService) -> new MultiplyingNormalizerProcess(settings, 1.0); executorService) -> new MultiplyingNormalizerProcess(settings, 1.0);
} }
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME)); threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor); normalizerFactory, xContentRegistry, auditor);
@ -437,15 +437,17 @@ public class MachineLearning implements ActionPlugin {
return emptyList(); return emptyList();
} }
int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings); int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
// 4 threads: for cpp logging, result processing, state processing and // 4 threads per job: for cpp logging, result processing, state processing and
// AutodetectProcessManager worker thread: // AutodetectProcessManager worker thread:
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME, FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME,
maxNumberOfJobs * 4, 4, "xpack.ml.autodetect_thread_pool"); maxNumberOfJobs * 4, 4, "xpack.ml.autodetect_thread_pool");
// 3 threads: normalization (cpp logging, result handling) and // 4 threads per job: processing logging, result and state of the renormalization process.
// renormalization (ShortCircuitingRenormalizer): // Renormalization does't run for the entire lifetime of a job, so additionally autodetect process
FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, NORMALIZER_THREAD_POOL_NAME, // based operation (open, close, flush, post data), datafeed based operations (start and stop)
maxNumberOfJobs * 3, 200, "xpack.ml.normalizer_thread_pool"); // and deleting expired data use this threadpool too and queue up if all threads are busy.
FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, UTILITY_THREAD_POOL_NAME,
maxNumberOfJobs * 4, 500, "xpack.ml.utility_thread_pool");
// TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and // TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and
// autodetect process are created at the same time then these two different TPs can merge. // autodetect process are created at the same time then these two different TPs can merge.

View File

@ -362,7 +362,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId()); JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId());
jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> { jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> {
// we need to fork because we are now on a network threadpool and closeJob method may take a while to complete: // we need to fork because we are now on a network threadpool and closeJob method may take a while to complete:
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
listener.onFailure(e); listener.onFailure(e);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -133,7 +134,7 @@ public class DeleteExpiredDataAction extends Action<DeleteExpiredDataAction.Requ
@Override @Override
protected void doExecute(Request request, ActionListener<Response> listener) { protected void doExecute(Request request, ActionListener<Response> listener) {
logger.info("Deleting expired data"); logger.info("Deleting expired data");
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> deleteExpiredData(listener)); threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener));
} }
private void deleteExpiredData(ActionListener<Response> listener) { private void deleteExpiredData(ActionListener<Response> listener) {

View File

@ -482,7 +482,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
AutodetectProcessManager autodetectProcessManager) { AutodetectProcessManager autodetectProcessManager) {
super(settings, TASK_NAME, ThreadPool.Names.MANAGEMENT); super(settings, TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.autodetectProcessManager = autodetectProcessManager; this.autodetectProcessManager = autodetectProcessManager;
this.maxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings); this.maxNumberOfOpenJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings); this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);

View File

@ -47,6 +47,7 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator;
@ -505,7 +506,7 @@ public class StartDatafeedAction
private final IndexNameExpressionResolver resolver; private final IndexNameExpressionResolver resolver;
public StartDatafeedPersistentTasksExecutor(Settings settings, DatafeedManager datafeedManager) { public StartDatafeedPersistentTasksExecutor(Settings settings, DatafeedManager datafeedManager) {
super(settings, TASK_NAME, ThreadPool.Names.MANAGEMENT); super(settings, TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.datafeedManager = datafeedManager; this.datafeedManager = datafeedManager;
this.resolver = new IndexNameExpressionResolver(settings); this.resolver = new IndexNameExpressionResolver(settings);
} }

View File

@ -39,6 +39,7 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
@ -261,7 +262,7 @@ public class StopDatafeedAction
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, PersistentTasksService persistentTasksService) { ClusterService clusterService, PersistentTasksService persistentTasksService) {
super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters, super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT); indexNameExpressionResolver, Request::new, Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.persistentTasksService = persistentTasksService; this.persistentTasksService = persistentTasksService;
} }

View File

@ -213,7 +213,7 @@ public class AutodetectProcessManager extends AbstractComponent {
Job job = jobManager.getJobOrThrowIfUnknown(jobId); Job job = jobManager.getJobOrThrowIfUnknown(jobId);
jobProvider.getAutodetectParams(job, params -> { jobProvider.getAutodetectParams(job, params -> {
// We need to fork, otherwise we restore model state from a network thread (several GET api calls): // We need to fork, otherwise we restore model state from a network thread (several GET api calls):
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
handler.accept(e); handler.accept(e);
@ -272,7 +272,7 @@ public class AutodetectProcessManager extends AbstractComponent {
jobDataCountsPersister); jobDataCountsPersister);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory); normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME); ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());