[ML] Use utility thread pool for memory estimation (#62314)

The job comms thread pool is intended for the long-running job
processes that do anomaly detection or data frame analytics and
count towards job count and memory limits.

This commit moves the short-lived memory estimation processes
to the ML utility thread pool.

Although this doesn't matter in most cases, at the limits of
scale it could mean that memory estimations would get in the way
of starting jobs, or would queue up for an excessive period of
time while waiting for jobs to finish.
This commit is contained in:
David Roberts 2020-09-14 16:46:06 +01:00
parent bf9651c635
commit e4275f3749
1 changed files with 7 additions and 3 deletions

View File

@ -706,7 +706,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
EsExecutors.allocatedProcessors(settings)); EsExecutors.allocatedProcessors(settings));
MemoryUsageEstimationProcessManager memoryEstimationProcessManager = MemoryUsageEstimationProcessManager memoryEstimationProcessManager =
new MemoryUsageEstimationProcessManager( new MemoryUsageEstimationProcessManager(
threadPool.generic(), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), memoryEstimationProcessFactory); threadPool.generic(), threadPool.executor(UTILITY_THREAD_POOL_NAME), memoryEstimationProcessFactory);
DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry, DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client, xContentRegistry,
dataFrameAnalyticsAuditor); dataFrameAnalyticsAuditor);
assert client instanceof NodeClient; assert client instanceof NodeClient;
@ -963,11 +963,15 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
// number of jobs per node. // number of jobs per node.
// 4 threads per job process: for input, c++ logger output, result processing and state processing. // 4 threads per job process: for input, c++ logger output, result processing and state processing.
// Only use this thread pool for the main long-running process associated with an anomaly detection
// job or a data frame analytics job. (Using it for some other purpose could mean that an unrelated
// job fails to start or that whatever needed the thread for another purpose has to queue for a very
// long time.)
ScalingExecutorBuilder jobComms = new ScalingExecutorBuilder(JOB_COMMS_THREAD_POOL_NAME, ScalingExecutorBuilder jobComms = new ScalingExecutorBuilder(JOB_COMMS_THREAD_POOL_NAME,
4, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(1), "xpack.ml.job_comms_thread_pool"); 4, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(1), "xpack.ml.job_comms_thread_pool");
// This pool is used by renormalization, plus some other parts of ML that // This pool is used by renormalization, data frame analytics memory estimation, plus some other parts
// need to kick off non-trivial activities that mustn't block other threads. // of ML that need to kick off non-trivial activities that mustn't block other threads.
ScalingExecutorBuilder utility = new ScalingExecutorBuilder(UTILITY_THREAD_POOL_NAME, ScalingExecutorBuilder utility = new ScalingExecutorBuilder(UTILITY_THREAD_POOL_NAME,
1, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(10), "xpack.ml.utility_thread_pool"); 1, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(10), "xpack.ml.utility_thread_pool");