diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 23159e221bd..94135372f7b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -38,6 +38,7 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.ml.action.CloseJobAction; +import org.elasticsearch.xpack.ml.action.CloseJobService; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteFilterAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction; @@ -309,7 +310,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { persistentActionService, persistentActionRegistry, new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService), - auditor + auditor, + new CloseJobService(client, threadPool, clusterService) ); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 964fdda198c..4c44766e62b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -6,16 +6,11 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -40,14 +35,12 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; -import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.utils.JobStateObserver; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; @@ -226,22 +219,17 @@ public class CloseJobAction extends Action { - private final JobStateObserver observer; private final ClusterService clusterService; - private final TransportListTasksAction listTasksAction; - private final TransportCancelTasksAction cancelTasksAction; + private final CloseJobService closeJobService; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, TransportListTasksAction listTasksAction, - TransportCancelTasksAction cancelTasksAction) { + ClusterService clusterService, CloseJobService closeJobService) { super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); - this.observer = new JobStateObserver(threadPool, clusterService); this.clusterService = clusterService; - this.listTasksAction = listTasksAction; - this.cancelTasksAction = cancelTasksAction; + this.closeJobService = closeJobService; } @Override @@ -277,29 +265,7 @@ public class CloseJobAction extends Action { - String expectedDescription = "job-" + request.getJobId(); - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (expectedDescription.equals(taskInfo.getDescription())) { - CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); - cancelTasksRequest.setTaskId(taskInfo.getTaskId()); - cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> { - observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> { - if (e == null) { - listener.onResponse(new CloseJobAction.Response(true)); - } else { - listener.onFailure(e); - } - }); - }, listener::onFailure)); - return; - } - } - listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]")); - }, listener::onFailure)); + closeJobService.closeJob(request, listener); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobService.java new file mode 100644 index 00000000000..1e2134cbae1 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobService.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.utils.JobStateObserver; + +/** + * Service that interacts with a client to close jobs remotely. + */ +// Ideally this would sit in CloseJobAction.TransportAction, but we can't inject a client there as +// it would lead to cyclic dependency issue, so we isolate it here. +public class CloseJobService { + + private final Client client; + private final JobStateObserver observer; + + public CloseJobService(Client client, ThreadPool threadPool, ClusterService clusterService) { + this.client = client; + this.observer = new JobStateObserver(threadPool, clusterService); + } + + void closeJob(CloseJobAction.Request request, ActionListener listener) { + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + listTasksRequest.setActions(OpenJobAction.NAME + "[c]"); + client.admin().cluster().listTasks(listTasksRequest, ActionListener.wrap(listTasksResponse -> { + String expectedDescription = "job-" + request.getJobId(); + for (TaskInfo taskInfo : listTasksResponse.getTasks()) { + if (expectedDescription.equals(taskInfo.getDescription())) { + CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setTaskId(taskInfo.getTaskId()); + client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> { + observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> { + if (e == null) { + listener.onResponse(new CloseJobAction.Response(true)); + } else { + listener.onFailure(e); + } + }); + }, listener::onFailure)); + return; + } + } + listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]")); + }, listener::onFailure)); + } + +}