From e4275f3749eec979194f7fe537ea7843c1bafed0 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 14 Sep 2020 16:46:06 +0100 Subject: [PATCH] [ML] Use utility thread pool for memory estimation (#62314) The job comms thread pool is intended for the long-running job processes that do anomaly detection or data frame analytics and count towards job count and memory limits. This commit moves the short-lived memory estimation processes to the ML utility thread pool. Although this doesn't matter in most cases, at the limits of scale it could mean that memory estimations would get in the way of starting jobs, or would queue up for an excessive period of time while waiting for jobs to finish. --- .../org/elasticsearch/xpack/ml/MachineLearning.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 430457a125e..2371b666b5c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -706,7 +706,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, EsExecutors.allocatedProcessors(settings)); MemoryUsageEstimationProcessManager memoryEstimationProcessManager = new MemoryUsageEstimationProcessManager( - threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory); + threadPool.generic(), threadPool.executor(UTILITY_THREAD_POOL_NAME), memoryEstimationProcessFactory); DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry, dataFrameAnalyticsAuditor); assert client instanceof NodeClient; @@ -963,11 +963,15 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, // number of jobs per node. // 4 threads per job process: for input, c++ logger output, result processing and state processing. + // Only use this thread pool for the main long-running process associated with an anomaly detection + // job or a data frame analytics job. (Using it for some other purpose could mean that an unrelated + // job fails to start or that whatever needed the thread for another purpose has to queue for a very + // long time.) ScalingExecutorBuilder jobComms = new ScalingExecutorBuilder(JOB_COMMS_THREAD_POOL_NAME, 4, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(1), "xpack.ml.job_comms_thread_pool"); - // This pool is used by renormalization, plus some other parts of ML that - // need to kick off non-trivial activities that mustn't block other threads. + // This pool is used by renormalization, data frame analytics memory estimation, plus some other parts + // of ML that need to kick off non-trivial activities that mustn't block other threads. ScalingExecutorBuilder utility = new ScalingExecutorBuilder(UTILITY_THREAD_POOL_NAME, 1, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(10), "xpack.ml.utility_thread_pool");