[ML] Restore model state as part of opening a job.

Before the model state was restored in the background when the job was already opened.

 relates elastic/x-pack-elasticsearch#751

Original commit: elastic/x-pack-elasticsearch@44a3d98365
This commit is contained in:
Martijn van Groningen 2017-03-22 13:52:39 +01:00
parent 85aedb6776
commit a59badf842
3 changed files with 24 additions and 16 deletions

View File

@ -427,9 +427,9 @@ public class MachineLearning implements ActionPlugin {
maxNumberOfJobs * 2, 1000, "xpack.ml.thread_pool"); maxNumberOfJobs * 2, 1000, "xpack.ml.thread_pool");
// fail quick to run autodetect process / datafeed, so no queues // fail quick to run autodetect process / datafeed, so no queues
// 4 threads: for c++ logging, result processing, state processing and restore state // 3 threads: for c++ logging, result processing, state processing
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME, FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME,
maxNumberOfJobs * 4, 200, "xpack.ml.autodetect_process_thread_pool"); maxNumberOfJobs * 3, 200, "xpack.ml.autodetect_process_thread_pool");
// TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and // TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and
// autodetect process are created at the same time then these two different TPs can merge. // autodetect process are created at the same time then these two different TPs can merge.

View File

@ -334,6 +334,15 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
return; return;
} }
// We need to fork, otherwise we open a job on a network thread:
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> { autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> {
if (e2 == null) { if (e2 == null) {
listener.onResponse(new TransportResponse.Empty()); listener.onResponse(new TransportResponse.Empty());
@ -341,6 +350,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
listener.onFailure(e2); listener.onFailure(e2);
} }
}); });
}
});
}); });
} }

View File

@ -75,15 +75,12 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
try { try {
autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get()); autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());
if (modelSnapshot != null) { if (modelSnapshot != null) {
// TODO (norelease): I don't think we should do this in the background. If this happens then we should wait
// until restore it is done before we can accept data.
executorService.execute(() -> {
try (OutputStream r = processPipes.getRestoreStream().get()) { try (OutputStream r = processPipes.getRestoreStream().get()) {
jobProvider.restoreStateToStream(job.getId(), modelSnapshot, r); jobProvider.restoreStateToStream(job.getId(), modelSnapshot, r);
} catch (Exception e) { } catch (Exception e) {
// TODO: should we fail to start?
LOGGER.error("Error restoring model state for job " + job.getId(), e); LOGGER.error("Error restoring model state for job " + job.getId(), e);
} }
});
} }
return autodetect; return autodetect;
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {