From d33bce2ed2ce097c155c6c37bc281abd0beab81a Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 30 Mar 2017 11:53:00 +0100 Subject: [PATCH] [ML] Add job closing notification (elastic/x-pack-elasticsearch#893) relates elastic/x-pack-elasticsearch#878 Original commit: elastic/x-pack-elasticsearch@921c25537ac8dab56a92265edc5baa66d3d62cf5 --- .../xpack/ml/action/CloseJobAction.java | 19 +++++++++++++++---- .../xpack/ml/job/messages/Messages.java | 2 ++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index f52544b477c..2251bb3daf5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -36,7 +36,9 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; +import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.JobStateObserver; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -225,15 +227,18 @@ public class CloseJobAction extends Action finalListener = - ActionListener.wrap(r -> waitForJobClosed(request, r, listener), listener::onFailure); - super.doExecute(task, request, finalListener); + normalCloseJob(task, request, listener); } } @@ -261,6 +264,7 @@ public class CloseJobAction extends Action listener) { + auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); ClusterState currentState = clusterService.state(); PersistentTask task = MlMetadata.getJobTask(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); @@ -278,6 +282,13 @@ public class CloseJobAction extends Action listener) { + auditor.info(request.getJobId(), Messages.JOB_AUDIT_CLOSING); + ActionListener finalListener = + ActionListener.wrap(r -> waitForJobClosed(request, r, listener), listener::onFailure); + super.doExecute(task, request, finalListener); + } + // Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed // This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state, // so wait for that to happen here. diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index 07d43bff3a3..74f7f8f6d44 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -30,6 +30,8 @@ public final class Messages { public static final String JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN = "Datafeed has started retrieving data again"; public static final String JOB_AUDIT_CREATED = "Job created"; + public static final String JOB_AUDIT_CLOSING = "Job is closing"; + public static final String JOB_AUDIT_FORCE_CLOSING = "Job is closing (forced)"; public static final String JOB_AUDIT_DATAFEED_CONTINUED_REALTIME = "Datafeed continued in real-time"; public static final String JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR = "Datafeed is encountering errors submitting data for analysis: {0}"; public static final String JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR = "Datafeed is encountering errors extracting data: {0}";