diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 985525f3e72..8f2ebfe9b24 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -378,8 +378,7 @@ public class AutodetectProcessManager extends AbstractComponent { renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), - autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService, - () -> setJobState(jobTask, JobState.FAILED)); + autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService, onProcessCrash(jobTask)); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(), autodetectParams.modelSnapshot() != null); @@ -425,6 +424,13 @@ public class AutodetectProcessManager extends AbstractComponent { auditor.info(jobId, msg); } + private Runnable onProcessCrash(JobTask jobTask) { + return () -> { + processByAllocation.remove(jobTask.getAllocationId()); + setJobState(jobTask, JobState.FAILED); + }; + } + /** * Stop the running job and mark it as finished. * @@ -494,7 +500,7 @@ public class AutodetectProcessManager extends AbstractComponent { private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) { ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); - if (processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { + if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { return processContext.getAutodetectCommunicator(); } return null; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index ea2dbb4df2a..d8460764eac 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.ml.action.KillProcessAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; @@ -207,6 +208,21 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { } } + public void testRealtime_GivenProcessIsKilled() throws Exception { + String jobId = "realtime-job-given-process-is-killed"; + String datafeedId = jobId + "-datafeed"; + startRealtime(jobId); + + KillProcessAction.Request killRequest = new KillProcessAction.Request(jobId); + client().execute(KillProcessAction.INSTANCE, killRequest).actionGet(); + + assertBusy(() -> { + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }); + } + private void startRealtime(String jobId) throws Exception { client().admin().indices().prepareCreate("data") .addMapping("type", "time", "type=date") diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index f454a6e6da0..bff8b4e6836 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -190,7 +190,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List indices) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId); builder.setQueryDelay(TimeValue.timeValueSeconds(1)); - builder.setFrequency(TimeValue.timeValueSeconds(2)); + builder.setFrequency(TimeValue.timeValueSeconds(1)); builder.setIndices(indices); builder.setTypes(Collections.singletonList("type")); return builder;