From f3f9cb6d741eb5a3c0fab1eae4e5626c49c2d31f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 26 Apr 2017 17:17:26 +0100 Subject: [PATCH] [ML] Stop using the management thread pool unnecessarily for ML actions (elastic/x-pack-elasticsearch#1213) The management thread pool only has 5 threads and clogging it up makes monitoring think the cluster is dead. relates elastic/x-pack-elasticsearch#1210 Original commit: elastic/x-pack-elasticsearch@f4ad7578d9cf260dec69f9b20f8aae583c4bf9b4 --- .../elasticsearch/xpack/ml/MachineLearning.java | 16 +++++++++------- .../xpack/ml/action/CloseJobAction.java | 2 +- .../xpack/ml/action/DeleteExpiredDataAction.java | 3 ++- .../xpack/ml/action/OpenJobAction.java | 2 +- .../xpack/ml/action/StartDatafeedAction.java | 3 ++- .../xpack/ml/action/StopDatafeedAction.java | 3 ++- .../autodetect/AutodetectProcessManager.java | 4 ++-- 7 files changed, 19 insertions(+), 14 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 212b481ca11..964175df95f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -149,7 +149,7 @@ public class MachineLearning implements ActionPlugin { public static final String BASE_PATH = "/_xpack/ml/"; public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed"; public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect"; - public static final String NORMALIZER_THREAD_POOL_NAME = NAME + "_normalizer"; + public static final String UTILITY_THREAD_POOL_NAME = NAME + "_utility"; public static final Setting AUTODETECT_PROCESS = Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope); @@ -296,7 +296,7 @@ public class MachineLearning implements ActionPlugin { executorService) -> new MultiplyingNormalizerProcess(settings, 1.0); } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, - threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME)); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry, auditor); @@ -437,15 +437,17 @@ public class MachineLearning implements ActionPlugin { return emptyList(); } int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings); - // 4 threads: for cpp logging, result processing, state processing and + // 4 threads per job: for cpp logging, result processing, state processing and // AutodetectProcessManager worker thread: FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME, maxNumberOfJobs * 4, 4, "xpack.ml.autodetect_thread_pool"); - // 3 threads: normalization (cpp logging, result handling) and - // renormalization (ShortCircuitingRenormalizer): - FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, NORMALIZER_THREAD_POOL_NAME, - maxNumberOfJobs * 3, 200, "xpack.ml.normalizer_thread_pool"); + // 4 threads per job: processing logging, result and state of the renormalization process. + // Renormalization does't run for the entire lifetime of a job, so additionally autodetect process + // based operation (open, close, flush, post data), datafeed based operations (start and stop) + // and deleting expired data use this threadpool too and queue up if all threads are busy. + FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, UTILITY_THREAD_POOL_NAME, + maxNumberOfJobs * 4, 500, "xpack.ml.utility_thread_pool"); // TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and // autodetect process are created at the same time then these two different TPs can merge. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index e2dead6ebc6..f67a2bbb95a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -362,7 +362,7 @@ public class CloseJobAction extends Action { // we need to fork because we are now on a network threadpool and closeJob method may take a while to complete: - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { listener.onFailure(e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java index b8dd8dbbeb1..2f38aed0768 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -133,7 +134,7 @@ public class DeleteExpiredDataAction extends Action listener) { logger.info("Deleting expired data"); - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> deleteExpiredData(listener)); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener)); } private void deleteExpiredData(ActionListener listener) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 59dc047c7ee..6ad6135fd8f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -482,7 +482,7 @@ public class OpenJobAction extends Action { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { handler.accept(e); @@ -272,7 +272,7 @@ public class AutodetectProcessManager extends AbstractComponent { jobDataCountsPersister); ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), normalizerFactory); - ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME); + ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());