[ML] Include closing jobs in node capacity check (elastic/x-pack-elasticsearch#2034)

Closing jobs can still use some or all of the threads that communicate
with the C++ process - the number of threads used will decrease as the
close progresses, but at the beginning of the closure all are still in
use.  Therefore, to prevent the risk of EsRejectedExecution exceptions
for the autodetect communications threadpool, closing jobs need to be
considered when checking that enough threads exist to start a new
process.  An explicit check produces a much more understandable error
message than an EsRejectedExecution exception.

relates elastic/x-pack-elasticsearch#1364

Original commit: elastic/x-pack-elasticsearch@845bfe0188
This commit is contained in:
David Roberts 2017-07-19 11:25:02 +01:00 committed by GitHub
parent a41c33dd95
commit ac46b0b0a5
1 changed files with 9 additions and 3 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
@ -299,7 +300,12 @@ public class AutodetectProcessManager extends AbstractComponent {
}
AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer<Exception> handler) {
if (autoDetectCommunicatorByOpenJob.size() == maxAllowedRunningJobs) {
// Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME
// that an open job uses, so include them too when considering if enough threads are available.
// There's a slight possibility that the same key is in both sets, hence it's not sufficient to simply
// add the two map sizes.
int currentRunningJobs = Sets.union(autoDetectCommunicatorByOpenJob.keySet(), autoDetectCommunicatorByClosingJob.keySet()).size();
if (currentRunningJobs >= maxAllowedRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
RestStatus.TOO_MANY_REQUESTS);
}
@ -379,7 +385,7 @@ public class AutodetectProcessManager extends AbstractComponent {
}
/**
* Stop the running job and mark it as finished.<br>
* Stop the running job and mark it as finished.
*
* @param jobTask The job to stop
* @param restart Whether the job should be restarted by persistent tasks