[ML] Close job after a lookback-only datafeed stops (elastic/x-pack-elasticsearch#580)

Original commit: elastic/x-pack-elasticsearch@febf026ba9
This commit is contained in:
Dimitris Athanasiou 2017-02-16 17:48:57 +00:00 committed by GitHub
parent 15987f49c0
commit 6edc051e8d
10 changed files with 109 additions and 25 deletions

View File

@ -150,7 +150,7 @@ public class StartDatafeedAction
@Override @Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) { public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new DatafeedTask(id, type, action, parentTaskId, datafeedId); return new DatafeedTask(id, type, action, parentTaskId, this);
} }
@Override @Override
@ -219,10 +219,33 @@ public class StartDatafeedAction
public static class DatafeedTask extends PersistentTask { public static class DatafeedTask extends PersistentTask {
private final String datafeedId;
private final long startTime;
private final Long endTime;
private volatile DatafeedJobRunner.Holder holder; private volatile DatafeedJobRunner.Holder holder;
public DatafeedTask(long id, String type, String action, TaskId parentTaskId, String datafeedId) { public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) {
super(id, type, action, "datafeed-" + datafeedId, parentTaskId); super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId);
this.datafeedId = request.getDatafeedId();
this.startTime = request.startTime;
this.endTime = request.endTime;
}
public String getDatafeedId() {
return datafeedId;
}
public long getStartTime() {
return startTime;
}
@org.elasticsearch.common.Nullable
public Long getEndTime() {
return endTime;
}
public boolean isLookbackOnly() {
return endTime != null;
} }
public void setHolder(DatafeedJobRunner.Holder holder) { public void setHolder(DatafeedJobRunner.Holder holder) {
@ -300,8 +323,7 @@ public class StartDatafeedAction
@Override @Override
protected void nodeOperation(PersistentTask persistentTask, Request request, ActionListener<TransportResponse.Empty> listener) { protected void nodeOperation(PersistentTask persistentTask, Request request, ActionListener<TransportResponse.Empty> listener) {
DatafeedTask datafeedTask = (DatafeedTask) persistentTask; DatafeedTask datafeedTask = (DatafeedTask) persistentTask;
datafeedJobRunner.run(request.getDatafeedId(), request.getStartTime(), request.getEndTime(), datafeedJobRunner.run(datafeedTask,
datafeedTask,
(error) -> { (error) -> {
if (error != null) { if (error != null) {
listener.onFailure(error); listener.onFailure(error);

View File

@ -116,7 +116,6 @@ class DatafeedJob {
*/ */
public boolean stop() { public boolean stop() {
if (running.compareAndSet(true, false)) { if (running.compareAndSet(true, false)) {
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
return true; return true;
} else { } else {
return false; return false;

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@ -26,6 +27,7 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorF
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -62,10 +64,9 @@ public class DatafeedJobRunner extends AbstractComponent {
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
} }
public void run(String datafeedId, long startTime, Long endTime, StartDatafeedAction.DatafeedTask task, public void run(StartDatafeedAction.DatafeedTask task, Consumer<Exception> handler) {
Consumer<Exception> handler) {
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); DatafeedConfig datafeed = mlMetadata.getDatafeed(task.getDatafeedId());
Job job = mlMetadata.getJobs().get(datafeed.getJobId()); Job job = mlMetadata.getJobs().get(datafeed.getJobId());
gatherInformation(job.getId(), (buckets, dataCounts) -> { gatherInformation(job.getId(), (buckets, dataCounts) -> {
long latestFinalBucketEndMs = -1L; long latestFinalBucketEndMs = -1L;
@ -81,7 +82,7 @@ public class DatafeedJobRunner extends AbstractComponent {
UpdatePersistentTaskStatusAction.Request updateDatafeedStatus = UpdatePersistentTaskStatusAction.Request updateDatafeedStatus =
new UpdatePersistentTaskStatusAction.Request(task.getPersistentTaskId(), DatafeedState.STARTED); new UpdatePersistentTaskStatusAction.Request(task.getPersistentTaskId(), DatafeedState.STARTED);
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateDatafeedStatus, ActionListener.wrap(r -> { client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateDatafeedStatus, ActionListener.wrap(r -> {
innerRun(holder, startTime, endTime); innerRun(holder, task.getStartTime(), task.getEndTime());
}, handler)); }, handler));
}, handler); }, handler);
} }
@ -165,7 +166,7 @@ public class DatafeedJobRunner extends AbstractComponent {
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job); DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job);
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
Holder holder = new Holder(datafeed, datafeedJob, new ProblemTracker(() -> auditor), handler); Holder holder = new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(() -> auditor), handler);
task.setHolder(holder); task.setHolder(holder);
return holder; return holder;
} }
@ -225,13 +226,16 @@ public class DatafeedJobRunner extends AbstractComponent {
private final DatafeedConfig datafeed; private final DatafeedConfig datafeed;
private final DatafeedJob datafeedJob; private final DatafeedJob datafeedJob;
private final boolean autoCloseJob;
private final ProblemTracker problemTracker; private final ProblemTracker problemTracker;
private final Consumer<Exception> handler; private final Consumer<Exception> handler;
volatile Future<?> future; volatile Future<?> future;
private Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer<Exception> handler) { private Holder(DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker,
Consumer<Exception> handler) {
this.datafeed = datafeed; this.datafeed = datafeed;
this.datafeedJob = datafeedJob; this.datafeedJob = datafeedJob;
this.autoCloseJob = autoCloseJob;
this.problemTracker = problemTracker; this.problemTracker = problemTracker;
this.handler = handler; this.handler = handler;
} }
@ -259,7 +263,11 @@ public class DatafeedJobRunner extends AbstractComponent {
if (datafeedJob.stop()) { if (datafeedJob.stop()) {
FutureUtils.cancel(future); FutureUtils.cancel(future);
handler.accept(e); handler.accept(e);
jobProvider.audit(datafeed.getJobId()).info(Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId()); logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
if (autoCloseJob) {
closeJob();
}
} else { } else {
logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId()); logger.info("[{}] datafeed [{}] for job [{}] was already stopped", source, datafeed.getId(), datafeed.getJobId());
} }
@ -267,5 +275,25 @@ public class DatafeedJobRunner extends AbstractComponent {
}); });
} }
private void closeJob() {
logger.info("[{}] closing job", datafeed.getJobId());
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId());
client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener<CloseJobAction.Response>() {
@Override
public void onResponse(CloseJobAction.Response response) {
if (response.isClosed()) {
logger.info("[{}] job closed", datafeed.getJobId());
} else {
logger.error("[{}] job close action was not acknowledged", datafeed.getJobId());
}
}
@Override
public void onFailure(Exception e) {
logger.error("[" + datafeed.getJobId() + "] failed to close job", e);
}
});
}
} }
} }

View File

@ -279,7 +279,12 @@ public class AutodetectProcessManager extends AbstractComponent {
return; return;
} }
logger.info("Closing job [{}], because [{}]", jobId, errorReason); if (errorReason == null) {
logger.info("Closing job [{}]", jobId);
} else {
logger.info("Closing job [{}], because [{}]", jobId, errorReason);
}
try { try {
communicator.close(errorReason); communicator.close(errorReason);
} catch (Exception e) { } catch (Exception e) {

View File

@ -54,8 +54,9 @@ public class RestStartDatafeedAction extends BaseRestHandler {
} }
jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis); jobDatafeedRequest = new StartDatafeedAction.Request(datafeedId, startTimeMillis);
jobDatafeedRequest.setEndTime(endTimeMillis); jobDatafeedRequest.setEndTime(endTimeMillis);
if (restRequest.hasParam("timeout")) { if (restRequest.hasParam(StartDatafeedAction.TIMEOUT.getPreferredName())) {
TimeValue openTimeout = restRequest.paramAsTime("timeout", TimeValue.timeValueSeconds(20)); TimeValue openTimeout = restRequest.paramAsTime(
StartDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20));
jobDatafeedRequest.setTimeout(openTimeout); jobDatafeedRequest.setTimeout(openTimeout);
} }
} }

View File

@ -80,8 +80,7 @@ public class DatafeedJobsIT extends BaseMlIntegTestCase {
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L);
startDatafeedRequest.setEndTime(now); startDatafeedRequest.setEndTime(now);
PersistentActionResponse startDatafeedResponse = client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertBusy(() -> { assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId()); DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2));

View File

@ -144,8 +144,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataExtractor.next()).thenReturn(Optional.of(in));
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class); StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, 60000L);
datafeedJobRunner.run("datafeed1", 0L, 60000L, task, handler); datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any()); verify(threadPool, never()).schedule(any(), any(), any());
@ -181,8 +181,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class); StartDatafeedAction.DatafeedTask task = createDatafeedTask("datafeed1", 0L, 60000L);
datafeedJobRunner.run("datafeed1", 0L, 60000L, task, handler); datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any()); verify(threadPool, never()).schedule(any(), any(), any());
@ -211,8 +211,9 @@ public class DatafeedJobRunnerTests extends ESTestCase {
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean(); boolean cancelled = randomBoolean();
StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, "datafeed1"); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed1", 0L);
datafeedJobRunner.run("datafeed1", 0L, null, task, handler); StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, startDatafeedRequest);
datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);
if (cancelled) { if (cancelled) {
@ -312,6 +313,14 @@ public class DatafeedJobRunnerTests extends ESTestCase {
return builder; return builder;
} }
private static StartDatafeedAction.DatafeedTask createDatafeedTask(String datafeedId, long startTime, Long endTime) {
StartDatafeedAction.DatafeedTask task = mock(StartDatafeedAction.DatafeedTask.class);
when(task.getDatafeedId()).thenReturn(datafeedId);
when(task.getStartTime()).thenReturn(startTime);
when(task.getEndTime()).thenReturn(endTime);
return task;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Consumer<Exception> mockConsumer() { private Consumer<Exception> mockConsumer() {
return mock(Consumer.class); return mock(Consumer.class);

View File

@ -220,6 +220,7 @@ public class DatafeedJobIT extends ESRestTestCase {
openJob(client(), jobId); openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId); startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4")); assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4"));
@ -244,6 +245,7 @@ public class DatafeedJobIT extends ESRestTestCase {
openJob(client(), jobId); openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId); startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4")); assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4"));
@ -268,6 +270,7 @@ public class DatafeedJobIT extends ESRestTestCase {
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String responseAsString = responseEntityToString(getJobResponse); String responseAsString = responseEntityToString(getJobResponse);
assertThat(responseAsString, containsString("\"processed_record_count\":2")); assertThat(responseAsString, containsString("\"processed_record_count\":2"));
assertThat(responseAsString, containsString("\"state\":\"opened\""));
} catch (Exception e1) { } catch (Exception e1) {
throw new RuntimeException(e1); throw new RuntimeException(e1);
} }
@ -341,6 +344,8 @@ public class DatafeedJobIT extends ESRestTestCase {
openJob(client(), jobId); openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId); startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", Response jobStatsResponse = client().performRequest("get",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
@ -374,6 +379,18 @@ public class DatafeedJobIT extends ESRestTestCase {
}); });
} }
private void waitUntilJobIsClosed(String jobId) throws Exception {
assertBusy(() -> {
try {
Response jobStatsResponse = client().performRequest("get",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
assertThat(responseEntityToString(jobStatsResponse), containsString("\"state\":\"closed\""));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
private Response createJob(String id) throws Exception { private Response createJob(String id) throws Exception {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n" String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n" + " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n"
@ -409,6 +426,7 @@ public class DatafeedJobIT extends ESRestTestCase {
openJob(client(), jobId); openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId); startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));

View File

@ -72,8 +72,10 @@ public class MlRestTestStateCleaner {
try { try {
client.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close"); client.performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close");
} catch (Exception e) { } catch (Exception e) {
if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]")) { logger.info("Test clean up close");
logger.debug("failed to close job [" + jobId + "]", e); if (e.getMessage().contains("cannot close job, expected job state [opened], but got [closed]")
|| e.getMessage().contains("cannot close job, expected job state [opened], but got [closing]")) {
logger.debug("job [" + jobId + "] has already been closed", e);
} else { } else {
logger.warn("failed to close job [" + jobId + "]", e); logger.warn("failed to close job [" + jobId + "]", e);
} }

View File

@ -24,6 +24,7 @@
}, },
"timeout": { "timeout": {
"type": "time", "type": "time",
"required": false,
"description": "Controls the time to wait until a datafeed has started. Default to 20 seconds" "description": "Controls the time to wait until a datafeed has started. Default to 20 seconds"
} }
} }