From ac46b0b0a5efa1afe853a7e88e8264adf74b99c4 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 19 Jul 2017 11:25:02 +0100 Subject: [PATCH] [ML] Include closing jobs in node capacity check (elastic/x-pack-elasticsearch#2034) Closing jobs can still use some or all of the threads that communicate with the C++ process - the number of threads used will decrease as the close progresses, but at the beginning of the closure all are still in use. Therefore, to prevent the risk of EsRejectedExecution exceptions for the autodetect communications threadpool, closing jobs need to be considered when checking that enough threads exist to start a new process. An explicit check produces a much more understandable error message than an EsRejectedExecution exception. relates elastic/x-pack-elasticsearch#1364 Original commit: elastic/x-pack-elasticsearch@845bfe01884d2f55a7f522b48169e2d5138d250f --- .../process/autodetect/AutodetectProcessManager.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 8b1169cb733..74a6282885e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; @@ -299,7 +300,12 @@ public class AutodetectProcessManager extends AbstractComponent { } AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer handler) { - if (autoDetectCommunicatorByOpenJob.size() == maxAllowedRunningJobs) { + // Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME + // that an open job uses, so include them too when considering if enough threads are available. + // There's a slight possibility that the same key is in both sets, hence it's not sufficient to simply + // add the two map sizes. + int currentRunningJobs = Sets.union(autoDetectCommunicatorByOpenJob.keySet(), autoDetectCommunicatorByClosingJob.keySet()).size(); + if (currentRunningJobs >= maxAllowedRunningJobs) { throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS); } @@ -379,9 +385,9 @@ public class AutodetectProcessManager extends AbstractComponent { } /** - * Stop the running job and mark it as finished.
+ * Stop the running job and mark it as finished. * - * @param jobTask The job to stop + * @param jobTask The job to stop * @param restart Whether the job should be restarted by persistent tasks * @param reason The reason for closing the job */