diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 49e58a6235f..6469fdc616e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -14,7 +14,9 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -41,11 +43,12 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.messages.Messages; -import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; @@ -55,6 +58,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -83,7 +87,7 @@ public class CloseJobAction extends Action implements ToXContent { + public static class Request extends BaseTasksRequest implements ToXContent { public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField FORCE = new ParseField("force"); @@ -99,22 +103,32 @@ public class CloseJobAction extends Action { + public static class TransportAction extends TransportTasksAction { private final InternalClient client; private final ClusterService clusterService; @@ -248,10 +276,10 @@ public class CloseJobAction extends Action resolvedJobs = resolveAndValidateJobId(request.getJobId(), currentState); + ClusterState state = clusterService.state(); + request.resolvedJobIds = resolveAndValidateJobId(request.getJobId(), state).toArray(new String[0]); + Set executorNodes = new HashSet<>(); + for (String resolvedJobId : request.resolvedJobIds) { + JobManager.getJobOrThrowIfUnknown(state, resolvedJobId); + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(resolvedJobId, tasks); + if (jobTask == null || jobTask.isAssigned() == false) { + String message = "Cannot perform requested action because job [" + resolvedJobId + + "] is not open"; + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); + return; + } else { + executorNodes.add(jobTask.getExecutorNode()); + } + } - if (resolvedJobs.isEmpty()) { + request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); + if (request.resolvedJobIds.length == 0) { listener.onResponse(new Response(true)); return; } - - request.setResolvedJobIds(resolvedJobs.toArray(new String[0])); - if (request.isForce()) { - forceCloseJob(currentState, request, listener); + forceCloseJob(state, request, listener); } else { - normalCloseJob(currentState, task, request, listener); + normalCloseJob(state, task, request, listener); } } @Override - protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener, - ClusterState state) { + protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { task.closeJob("close job (api)"); listener.onResponse(new Response(true)); } + @Override + protected boolean accumulateExceptions() { + return true; + } + @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, @@ -305,7 +349,7 @@ public class CloseJobAction extends Action listener) { - String[] jobIds = request.getResolvedJobIds(); - final int numberOfJobs = jobIds.length; + final int numberOfJobs = request.resolvedJobIds.length; final AtomicInteger counter = new AtomicInteger(); - final AtomicArray failures = new AtomicArray<>(jobIds.length); - - for (String jobId : jobIds) { + final AtomicArray failures = new AtomicArray<>(numberOfJobs); + for (String jobId : request.resolvedJobIds) { auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); PersistentTask jobTask = validateAndReturnJobTask(jobId, currentState); persistentTasksService.cancelPersistentTask(jobTask.getId(), @@ -384,7 +426,7 @@ public class CloseJobAction extends Action listener) { Map jobIdToPersistentTaskId = new HashMap<>(); - for (String jobId : request.getResolvedJobIds()) { + for (String jobId : request.resolvedJobIds) { auditor.info(jobId, Messages.JOB_AUDIT_CLOSING); PersistentTask jobTask = validateAndReturnJobTask(jobId, currentState); jobIdToPersistentTaskId.put(jobId, jobTask.getId()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index 4d2051f4cd2..73397d24b0e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -31,9 +31,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import java.io.IOException; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.function.Supplier; /** @@ -57,26 +55,20 @@ public abstract class TransportJobTaskAction listener) { - ClusterState state = clusterService.state(); + String jobId = request.getJobId(); // We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the // node running the job task. - Set executorNodes = new HashSet<>(); - for (String resolvedJobId : request.getResolvedJobIds()) { - JobManager.getJobOrThrowIfUnknown(state, resolvedJobId); - PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(resolvedJobId, tasks); - if (jobTask == null || jobTask.isAssigned() == false) { - String message = "Cannot perform requested action because job [" + resolvedJobId - + "] is not open"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; - } else { - executorNodes.add(jobTask.getExecutorNode()); - } + ClusterState state = clusterService.state(); + JobManager.getJobOrThrowIfUnknown(state, jobId); + PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); + if (jobTask == null || jobTask.isAssigned() == false) { + String message = "Cannot perform requested action because job [" + jobId + "] is not open"; + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); + } else { + request.setNodes(jobTask.getExecutorNode()); + super.doExecute(task, request, listener); } - - request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); - super.doExecute(task, request, listener); } @Override @@ -133,53 +125,33 @@ public abstract class TransportJobTaskAction> extends BaseTasksRequest { String jobId; - String[] resolvedJobIds; JobTaskRequest() { } JobTaskRequest(String jobId) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); - - // the default implementation just returns 1 jobId - this.resolvedJobIds = new String[] { jobId }; } public String getJobId() { return jobId; } - protected String[] getResolvedJobIds() { - return resolvedJobIds; - } - - protected void setResolvedJobIds(String[] resolvedJobIds) { - this.resolvedJobIds = resolvedJobIds; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); jobId = in.readString(); - resolvedJobIds = in.readStringArray(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(jobId); - out.writeStringArray(resolvedJobIds); } @Override public boolean match(Task task) { - for (String id : resolvedJobIds) { - if (OpenJobAction.JobTask.match(task, id)) { - return true; - } - } - - return false; + return OpenJobAction.JobTask.match(task, jobId); } } }