From eb4186dd5cd54c5a61f6562d6902b2acf06ae85c Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 24 Nov 2017 15:04:29 +0000 Subject: [PATCH] [ML] Stop datafeed when job fails (elastic/x-pack-elasticsearch#3107) The problem here was that when the autodetect process crashes we set the job state to FAILED but we did not remove the communicator from the map in AutodetectProcessManager. relates elastic/x-pack-elasticsearch#2773 Original commit: elastic/x-pack-elasticsearch@9b8eafb4d0b7a944bd01e55785f34726d8504666 --- .../autodetect/AutodetectProcessManager.java | 12 +++++++++--- .../xpack/ml/integration/DatafeedJobsIT.java | 16 ++++++++++++++++ .../xpack/ml/support/BaseMlIntegTestCase.java | 2 +- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 985525f3e72..8f2ebfe9b24 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -378,8 +378,7 @@ public class AutodetectProcessManager extends AbstractComponent { renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization()); AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), - autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService, - () -> setJobState(jobTask, JobState.FAILED)); + autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService, onProcessCrash(jobTask)); AutoDetectResultProcessor processor = new AutoDetectResultProcessor( client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(), autodetectParams.modelSnapshot() != null); @@ -425,6 +424,13 @@ public class AutodetectProcessManager extends AbstractComponent { auditor.info(jobId, msg); } + private Runnable onProcessCrash(JobTask jobTask) { + return () -> { + processByAllocation.remove(jobTask.getAllocationId()); + setJobState(jobTask, JobState.FAILED); + }; + } + /** * Stop the running job and mark it as finished. * @@ -494,7 +500,7 @@ public class AutodetectProcessManager extends AbstractComponent { private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) { ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); - if (processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { + if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { return processContext.getAutodetectCommunicator(); } return null; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index ea2dbb4df2a..d8460764eac 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.ml.action.KillProcessAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; @@ -207,6 +208,21 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { } } + public void testRealtime_GivenProcessIsKilled() throws Exception { + String jobId = "realtime-job-given-process-is-killed"; + String datafeedId = jobId + "-datafeed"; + startRealtime(jobId); + + KillProcessAction.Request killRequest = new KillProcessAction.Request(jobId); + client().execute(KillProcessAction.INSTANCE, killRequest).actionGet(); + + assertBusy(() -> { + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }); + } + private void startRealtime(String jobId) throws Exception { client().admin().indices().prepareCreate("data") .addMapping("type", "time", "type=date") diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index f454a6e6da0..bff8b4e6836 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -190,7 +190,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List indices) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId); builder.setQueryDelay(TimeValue.timeValueSeconds(1)); - builder.setFrequency(TimeValue.timeValueSeconds(2)); + builder.setFrequency(TimeValue.timeValueSeconds(1)); builder.setIndices(indices); builder.setTypes(Collections.singletonList("type")); return builder;