diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index cf4c4dfd5f7..684fc9ce8e1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -150,7 +150,7 @@ public class StartDatafeedAction @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new DatafeedTask(id, type, action, parentTaskId, datafeedId); + return new DatafeedTask(id, type, action, parentTaskId, this); } @Override @@ -219,10 +219,33 @@ public class StartDatafeedAction public static class DatafeedTask extends PersistentTask { + private final String datafeedId; + private final long startTime; + private final Long endTime; private volatile DatafeedJobRunner.Holder holder; - public DatafeedTask(long id, String type, String action, TaskId parentTaskId, String datafeedId) { - super(id, type, action, "datafeed-" + datafeedId, parentTaskId); + public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) { + super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId); + this.datafeedId = request.getDatafeedId(); + this.startTime = request.startTime; + this.endTime = request.endTime; + } + + public String getDatafeedId() { + return datafeedId; + } + + public long getStartTime() { + return startTime; + } + + @org.elasticsearch.common.Nullable + public Long getEndTime() { + return endTime; + } + + public boolean isLookbackOnly() { + return endTime != null; } public void setHolder(DatafeedJobRunner.Holder holder) { @@ -300,8 +323,7 @@ public class StartDatafeedAction @Override protected void nodeOperation(PersistentTask persistentTask, Request request, ActionListener listener) { DatafeedTask datafeedTask = (DatafeedTask) persistentTask; - datafeedJobRunner.run(request.getDatafeedId(), request.getStartTime(), request.getEndTime(), - datafeedTask, + datafeedJobRunner.run(datafeedTask, (error) -> { if (error != null) { listener.onFailure(error); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 42fad786a23..f5919be8701 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -116,7 +116,6 @@ class DatafeedJob { */ public boolean stop() { if (running.compareAndSet(true, false)) { - auditor.info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); return true; } else { return false; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index dece393441c..6e8cb661770 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -26,6 +27,7 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorF import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -62,10 +64,9 @@ public class DatafeedJobRunner extends AbstractComponent { this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); } - public void run(String datafeedId, long startTime, Long endTime, StartDatafeedAction.DatafeedTask task, - Consumer handler) { + public void run(StartDatafeedAction.DatafeedTask task, Consumer handler) { MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); - DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); + DatafeedConfig datafeed = mlMetadata.getDatafeed(task.getDatafeedId()); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); gatherInformation(job.getId(), (buckets, dataCounts) -> { long latestFinalBucketEndMs = -1L; @@ -81,7 +82,7 @@ public class DatafeedJobRunner extends AbstractComponent { UpdatePersistentTaskStatusAction.Request updateDatafeedStatus = new UpdatePersistentTaskStatusAction.Request(task.getPersistentTaskId(), DatafeedState.STARTED); client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateDatafeedStatus, ActionListener.wrap(r -> { - innerRun(holder, startTime, endTime); + innerRun(holder, task.getStartTime(), task.getEndTime()); }, handler)); }, handler); } @@ -165,7 +166,7 @@ public class DatafeedJobRunner extends AbstractComponent { DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job); DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); - Holder holder = new Holder(datafeed, datafeedJob, new ProblemTracker(() -> auditor), handler); + Holder holder = new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(() -> auditor), handler); task.setHolder(holder); return holder; } @@ -225,13 +226,16 @@ public class DatafeedJobRunner extends AbstractComponent { private final DatafeedConfig datafeed; private final DatafeedJob datafeedJob; + private final boolean autoCloseJob; private final ProblemTracker problemTracker; private final Consumer handler; volatile Future future; - private Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer handler) { + private Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker, + Consumer handler) { this.datafeed = datafeed; this.datafeedJob = datafeedJob; + this.autoCloseJob = autoCloseJob; this.problemTracker = problemTracker; this.handler = handler; } @@ -259,7 +263,11 @@ public class DatafeedJobRunner extends AbstractComponent { if (datafeedJob.stop()) { FutureUtils.cancel(future); handler.accept(e); + jobProvider.audit(datafeed.getJobId()).info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); + if (autoCloseJob) { + closeJob(); + } } else { logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); } @@ -267,5 +275,25 @@ public class DatafeedJobRunner extends AbstractComponent { }); } + private void closeJob() { + logger.info("[{}] closing job", datafeed.getJobId()); + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId()); + client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener() { + + @Override + public void onResponse(CloseJobAction.Response response) { + if (response.isClosed()) { + logger.info("[{}] job closed", datafeed.getJobId()); + } else { + logger.error("[{}] job close action was not acknowledged", datafeed.getJobId()); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("[" + datafeed.getJobId() + "] failed to close job", e); + } + }); + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 2711e5db697..9b3475517b4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -279,7 +279,12 @@ public class AutodetectProcessManager extends AbstractComponent { return; } - logger.info("Closing job [{}], because [{}]", jobId, errorReason); + if (errorReason == null) { + logger.info("Closing job [{}]", jobId); + } else { + logger.info("Closing job [{}], because [{}]", jobId, errorReason); + } + try { communicator.close(errorReason); } catch (Exception e) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java index b88b807a3cf..cd0fd5c151b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStartDatafeedAction.java @@ -54,8 +54,9 @@ public class RestStartDatafeedAction extends BaseRestHandler { } jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis); jobDatafeedRequest.setEndTime(endTimeMillis); - if (restRequest.hasParam("timeout")) { - TimeValue openTimeout = restRequest.paramAsTime("timeout", TimeValue.timeValueSeconds(20)); + if (restRequest.hasParam(StartDatafeedAction.TIMEOUT.getPreferredName())) { + TimeValue openTimeout = restRequest.paramAsTime( + StartDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20)); jobDatafeedRequest.setTimeout(openTimeout); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java index 8a8697b820e..9a73d371b04 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java @@ -80,8 +80,7 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase { StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L); startDatafeedRequest.setEndTime(now); - PersistentActionResponse startDatafeedResponse = - client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get(); + client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get(); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index 6a86e16ed34..0386de7247c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -144,8 +144,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.next()).thenReturn(Optional.of(in)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); - StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class); - datafeedJobRunner.run("datafeed1", 0L, 60000L, task, handler); + StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, 60000L); + datafeedJobRunner.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -181,8 +181,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); - StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class); - datafeedJobRunner.run("datafeed1", 0L, 60000L, task, handler); + StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, 60000L); + datafeedJobRunner.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, never()).schedule(any(), any(), any()); @@ -211,8 +211,9 @@ public class DatafeedJobRunnerTests extends ESTestCase { when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); - StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, "datafeed1"); - datafeedJobRunner.run("datafeed1", 0L, null, task, handler); + StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed1", 0L); + StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, startDatafeedRequest); + datafeedJobRunner.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); if (cancelled) { @@ -312,6 +313,14 @@ public class DatafeedJobRunnerTests extends ESTestCase { return builder; } + private static StartDatafeedAction.DatafeedTask createDatafeedTask(String datafeedId, long startTime, Long endTime) { + StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class); + when(task.getDatafeedId()).thenReturn(datafeedId); + when(task.getStartTime()).thenReturn(startTime); + when(task.getEndTime()).thenReturn(endTime); + return task; + } + @SuppressWarnings("unchecked") private Consumer mockConsumer() { return mock(Consumer.class); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java index 6ef657bd622..dcb820efe27 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java @@ -220,6 +220,7 @@ public class DatafeedJobIT extends ESRestTestCase { openJob(client(), jobId); startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4")); @@ -244,6 +245,7 @@ public class DatafeedJobIT extends ESRestTestCase { openJob(client(), jobId); startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4")); @@ -268,6 +270,7 @@ public class DatafeedJobIT extends ESRestTestCase { MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); String responseAsString = responseEntityToString(getJobResponse); assertThat(responseAsString, containsString("\"processed_record_count\":2")); + assertThat(responseAsString, containsString("\"state\":\"opened\"")); } catch (Exception e1) { throw new RuntimeException(e1); } @@ -341,6 +344,8 @@ public class DatafeedJobIT extends ESRestTestCase { openJob(client(), jobId); startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); + Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); @@ -374,6 +379,18 @@ public class DatafeedJobIT extends ESRestTestCase { }); } + private void waitUntilJobIsClosed(String jobId) throws Exception { + assertBusy(() -> { + try { + Response jobStatsResponse = client().performRequest("get", + MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + assertThat(responseEntityToString(jobStatsResponse), containsString("\"state\":\"closed\"")); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + private Response createJob(String id) throws Exception { String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n" + " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n" @@ -409,6 +426,7 @@ public class DatafeedJobIT extends ESRestTestCase { openJob(client(), jobId); startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index 98bd4d08bf1..31702ef18f0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -72,8 +72,10 @@ public class MlRestTestStateCleaner { try { client.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close"); } catch (Exception e) { - if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]")) { - logger.debug("failed to close job [" + jobId + "]", e); + logger.info("Test clean up close"); + if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]") + || e.getMessage().contains("cannot close job, expected job state [opened], but got [closing]")) { + logger.debug("job [" + jobId + "] has already been closed", e); } else { logger.warn("failed to close job [" + jobId + "]", e); } diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json index c2f497a2a2a..e6d055fcc76 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.start_datafeed.json @@ -24,6 +24,7 @@ }, "timeout": { "type": "time", + "required": false, "description": "Controls the time to wait until a datafeed has started. Default to 20 seconds" } }