diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 212b481ca11..964175df95f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -149,7 +149,7 @@ public class MachineLearning implements ActionPlugin { public static final String BASE_PATH = "/_xpack/ml/"; public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed"; public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect"; - public static final String NORMALIZER_THREAD_POOL_NAME = NAME + "_normalizer"; + public static final String UTILITY_THREAD_POOL_NAME = NAME + "_utility"; public static final Setting AUTODETECT_PROCESS = Setting.boolSetting("xpack.ml.autodetect_process", true, Property.NodeScope); @@ -296,7 +296,7 @@ public class MachineLearning implements ActionPlugin { executorService) -> new MultiplyingNormalizerProcess(settings, 1.0); } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, - threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME)); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry, auditor); @@ -437,15 +437,17 @@ public class MachineLearning implements ActionPlugin { return emptyList(); } int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings); - // 4 threads: for cpp logging, result processing, state processing and + // 4 threads per job: for cpp logging, result processing, state processing and // AutodetectProcessManager worker thread: FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME, maxNumberOfJobs * 4, 4, "xpack.ml.autodetect_thread_pool"); - // 3 threads: normalization (cpp logging, result handling) and - // renormalization (ShortCircuitingRenormalizer): - FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, NORMALIZER_THREAD_POOL_NAME, - maxNumberOfJobs * 3, 200, "xpack.ml.normalizer_thread_pool"); + // 4 threads per job: processing logging, result and state of the renormalization process. + // Renormalization does't run for the entire lifetime of a job, so additionally autodetect process + // based operation (open, close, flush, post data), datafeed based operations (start and stop) + // and deleting expired data use this threadpool too and queue up if all threads are busy. + FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, UTILITY_THREAD_POOL_NAME, + maxNumberOfJobs * 4, 500, "xpack.ml.utility_thread_pool"); // TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and // autodetect process are created at the same time then these two different TPs can merge. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index e2dead6ebc6..f67a2bbb95a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -362,7 +362,7 @@ public class CloseJobAction extends Action { // we need to fork because we are now on a network threadpool and closeJob method may take a while to complete: - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { listener.onFailure(e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java index b8dd8dbbeb1..2f38aed0768 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -133,7 +134,7 @@ public class DeleteExpiredDataAction extends Action listener) { logger.info("Deleting expired data"); - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> deleteExpiredData(listener)); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener)); } private void deleteExpiredData(ActionListener listener) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 59dc047c7ee..6ad6135fd8f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -482,7 +482,7 @@ public class OpenJobAction extends Action { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { handler.accept(e); @@ -272,7 +272,7 @@ public class AutodetectProcessManager extends AbstractComponent { jobDataCountsPersister); ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), normalizerFactory); - ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.NORMALIZER_THREAD_POOL_NAME); + ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME); Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());