diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 8b1169cb733..74a6282885e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; @@ -299,7 +300,12 @@ public class AutodetectProcessManager extends AbstractComponent { } AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer handler) { - if (autoDetectCommunicatorByOpenJob.size() == maxAllowedRunningJobs) { + // Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME + // that an open job uses, so include them too when considering if enough threads are available. + // There's a slight possibility that the same key is in both sets, hence it's not sufficient to simply + // add the two map sizes. + int currentRunningJobs = Sets.union(autoDetectCommunicatorByOpenJob.keySet(), autoDetectCommunicatorByClosingJob.keySet()).size(); + if (currentRunningJobs >= maxAllowedRunningJobs) { throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS); } @@ -379,9 +385,9 @@ public class AutodetectProcessManager extends AbstractComponent { } /** - * Stop the running job and mark it as finished.
+ * Stop the running job and mark it as finished. * - * @param jobTask The job to stop + * @param jobTask The job to stop * @param restart Whether the job should be restarted by persistent tasks * @param reason The reason for closing the job */