[ML] Add job closing notification (elastic/x-pack-elasticsearch#893)

relates elastic/x-pack-elasticsearch#878

Original commit: elastic/x-pack-elasticsearch@921c25537a
This commit is contained in:
Dimitris Athanasiou 2017-03-30 11:53:00 +01:00 committed by GitHub
parent 12fd8e04e5
commit d33bce2ed2
2 changed files with 17 additions and 4 deletions

View File

@ -36,7 +36,9 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -225,15 +227,18 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
private final InternalClient client;
private final ClusterService clusterService;
private final Auditor auditor;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, AutodetectProcessManager manager, InternalClient client) {
ClusterService clusterService, AutodetectProcessManager manager, InternalClient client,
Auditor auditor) {
super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT, manager);
this.client = client;
this.clusterService = clusterService;
this.auditor = auditor;
}
@Override
@ -241,9 +246,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
if (request.isForce()) {
forceCloseJob(request.getJobId(), listener);
} else {
ActionListener<Response> finalListener =
ActionListener.wrap(r -> waitForJobClosed(request, r, listener), listener::onFailure);
super.doExecute(task, request, finalListener);
normalCloseJob(task, request, listener);
}
}
@ -261,6 +264,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
private void forceCloseJob(String jobId, ActionListener<Response> listener) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
ClusterState currentState = clusterService.state();
PersistentTask<?> task = MlMetadata.getJobTask(jobId,
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
@ -278,6 +282,13 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
}
private void normalCloseJob(Task task, Request request, ActionListener<Response> listener) {
auditor.info(request.getJobId(), Messages.JOB_AUDIT_CLOSING);
ActionListener<Response> finalListener =
ActionListener.wrap(r -> waitForJobClosed(request, r, listener), listener::onFailure);
super.doExecute(task, request, finalListener);
}
// Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed
// This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state,
// so wait for that to happen here.

View File

@ -30,6 +30,8 @@ public final class Messages {
public static final String JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN = "Datafeed has started retrieving data again";
public static final String JOB_AUDIT_CREATED = "Job created";
public static final String JOB_AUDIT_CLOSING = "Job is closing";
public static final String JOB_AUDIT_FORCE_CLOSING = "Job is closing (forced)";
public static final String JOB_AUDIT_DATAFEED_CONTINUED_REALTIME = "Datafeed continued in real-time";
public static final String JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR = "Datafeed is encountering errors submitting data for analysis: {0}";
public static final String JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR = "Datafeed is encountering errors extracting data: {0}";