[ML] Stop datafeed when job fails (elastic/x-pack-elasticsearch#3107)

The problem here was that when the autodetect process crashes
we set the job state to FAILED but we did not remove the
communicator from the map in AutodetectProcessManager.

relates elastic/x-pack-elasticsearch#2773

Original commit: elastic/x-pack-elasticsearch@9b8eafb4d0
This commit is contained in:
Dimitris Athanasiou 2017-11-24 15:04:29 +00:00 committed by GitHub
parent d89d8abec9
commit eb4186dd5c
3 changed files with 26 additions and 4 deletions

View File

@ -378,8 +378,7 @@ public class AutodetectProcessManager extends AbstractComponent {
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService,
() -> setJobState(jobTask, JobState.FAILED));
autodetectParams.quantiles(), autodetectParams.filters(), autoDetectExecutorService, onProcessCrash(jobTask));
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(),
autodetectParams.modelSnapshot() != null);
@ -425,6 +424,13 @@ public class AutodetectProcessManager extends AbstractComponent {
auditor.info(jobId, msg);
}
private Runnable onProcessCrash(JobTask jobTask) {
return () -> {
processByAllocation.remove(jobTask.getAllocationId());
setJobState(jobTask, JobState.FAILED);
};
}
/**
* Stop the running job and mark it as finished.
*
@ -494,7 +500,7 @@ public class AutodetectProcessManager extends AbstractComponent {
private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) {
ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId());
if (processContext.getState() == ProcessContext.ProcessStateName.RUNNING) {
if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) {
return processContext.getAutodetectCommunicator();
}
return null;

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.KillProcessAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
@ -207,6 +208,21 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
}
}
public void testRealtime_GivenProcessIsKilled() throws Exception {
String jobId = "realtime-job-given-process-is-killed";
String datafeedId = jobId + "-datafeed";
startRealtime(jobId);
KillProcessAction.Request killRequest = new KillProcessAction.Request(jobId);
client().execute(KillProcessAction.INSTANCE, killRequest).actionGet();
assertBusy(() -> {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
}
private void startRealtime(String jobId) throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")

View File

@ -190,7 +190,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
public static DatafeedConfig.Builder createDatafeedBuilder(String datafeedId, String jobId, List<String> indices) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId);
builder.setQueryDelay(TimeValue.timeValueSeconds(1));
builder.setFrequency(TimeValue.timeValueSeconds(2));
builder.setFrequency(TimeValue.timeValueSeconds(1));
builder.setIndices(indices);
builder.setTypes(Collections.singletonList("type"));
return builder;