[ML] Fork on management thread when really opening the job

Original commit: elastic/x-pack-elasticsearch@e528912c23
This commit is contained in:
Martijn van Groningen 2017-03-23 19:47:59 +01:00
parent 99e3508267
commit 1bed557911
3 changed files with 32 additions and 32 deletions

View File

@ -389,15 +389,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
return; return;
} }
// We need to fork, otherwise we open a job on a network thread:
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> { autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> {
if (e2 == null) { if (e2 == null) {
listener.onResponse(new TransportResponse.Empty()); listener.onResponse(new TransportResponse.Empty());
@ -405,8 +396,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
listener.onFailure(e2); listener.onFailure(e2);
} }
}); });
}
});
}); });
} }

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
@ -205,6 +206,15 @@ public class AutodetectProcessManager extends AbstractComponent {
public void openJob(String jobId, long taskId, boolean ignoreDowntime, Consumer<Exception> handler) { public void openJob(String jobId, long taskId, boolean ignoreDowntime, Consumer<Exception> handler) {
gatherRequiredInformation(jobId, (dataCounts, modelSnapshot, quantiles, filters) -> { gatherRequiredInformation(jobId, (dataCounts, modelSnapshot, quantiles, filters) -> {
// We need to fork, otherwise we restore model state from a network thread (several GET api calls):
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
handler.accept(e);
}
@Override
protected void doRun() throws Exception {
try { try {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id ->
create(id, taskId, dataCounts, modelSnapshot, quantiles, filters, ignoreDowntime, handler)); create(id, taskId, dataCounts, modelSnapshot, quantiles, filters, ignoreDowntime, handler));
@ -219,6 +229,8 @@ public class AutodetectProcessManager extends AbstractComponent {
} }
setJobState(taskId, JobState.FAILED, e2 -> handler.accept(e1)); setJobState(taskId, JobState.FAILED, e2 -> handler.accept(e1));
} }
}
});
}, e1 -> { }, e1 -> {
logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); logger.warn("Failed to gather information required to open job [" + jobId + "]", e1);
setJobState(taskId, JobState.FAILED, e2 -> handler.accept(e1)); setJobState(taskId, JobState.FAILED, e2 -> handler.accept(e1));

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
@ -153,8 +152,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class); ThreadPool.Cancellable cancellable = mock(ThreadPool.Cancellable.class);
when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable); when(threadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(cancellable);
when(threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME)) when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService());
.thenReturn(EsExecutors.newDirectExecutorService());
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class); AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
when(autodetectProcess.isProcessAlive()).thenReturn(true); when(autodetectProcess.isProcessAlive()).thenReturn(true);
when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator()); when(autodetectProcess.readAutodetectResults()).thenReturn(Collections.emptyIterator());
@ -344,6 +342,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client, private AutodetectProcessManager createManager(AutodetectCommunicator communicator, Client client,
PersistentTasksService persistentTasksService) { PersistentTasksService persistentTasksService) {
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService());
AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class); AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client,
threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister,