diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 60ee07cabc4..69ce26b53d0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -427,9 +427,9 @@ public class MachineLearning implements ActionPlugin { maxNumberOfJobs * 2, 1000, "xpack.ml.thread_pool"); // fail quick to run autodetect process / datafeed, so no queues - // 4 threads: for c++ logging, result processing, state processing and restore state + // 3 threads: for c++ logging, result processing, state processing FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME, - maxNumberOfJobs * 4, 200, "xpack.ml.autodetect_process_thread_pool"); + maxNumberOfJobs * 3, 200, "xpack.ml.autodetect_process_thread_pool"); // TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and // autodetect process are created at the same time then these two different TPs can merge. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 268f23792c4..f437f43395d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -334,11 +334,22 @@ public class OpenJobAction extends Action { - if (e2 == null) { - listener.onResponse(new TransportResponse.Empty()); - } else { - listener.onFailure(e2); + // We need to fork, otherwise we open a job on a network thread: + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> { + if (e2 == null) { + listener.onResponse(new TransportResponse.Empty()); + } else { + listener.onFailure(e2); + } + }); } }); }); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 22cca1d7eb5..c91b628e14c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -75,15 +75,12 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory try { autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get()); if (modelSnapshot != null) { - // TODO (norelease): I don't think we should do this in the background. If this happens then we should wait - // until restore it is done before we can accept data. - executorService.execute(() -> { - try (OutputStream r = processPipes.getRestoreStream().get()) { - jobProvider.restoreStateToStream(job.getId(), modelSnapshot, r); - } catch (Exception e) { - LOGGER.error("Error restoring model state for job " + job.getId(), e); - } - }); + try (OutputStream r = processPipes.getRestoreStream().get()) { + jobProvider.restoreStateToStream(job.getId(), modelSnapshot, r); + } catch (Exception e) { + // TODO: should we fail to start? + LOGGER.error("Error restoring model state for job " + job.getId(), e); + } } return autodetect; } catch (EsRejectedExecutionException e) {