From 63c04ef19d849a9c76b5a39e31e1003ec2fc254e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 10 Apr 2017 11:48:37 +0200 Subject: [PATCH] [ML] Change close job api to no longer extend from TransportJobTaskAction. The execution has diverged too much from post data, flush and update process apis, since the close all jobs have been added. The logic is now easier to understand as it exist in a single source file instead of in both CloseJobAction and TransportJobTaskAction. Original commit: elastic/x-pack-elasticsearch@daf5fabad53265757b9ddaad2d2b32e923d618fd --- .../xpack/ml/action/CloseJobAction.java | 94 ++++++++++++++----- .../ml/action/TransportJobTaskAction.java | 52 +++------- 2 files changed, 80 insertions(+), 66 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 49e58a6235f..6469fdc616e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -14,7 +14,9 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -41,11 +43,12 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.messages.Messages; -import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; @@ -55,6 +58,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -83,7 +87,7 @@ public class CloseJobAction extends Action implements ToXContent { + public static class Request extends BaseTasksRequest implements ToXContent { public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField FORCE = new ParseField("force"); @@ -99,22 +103,32 @@ public class CloseJobAction extends Action { + public static class TransportAction extends TransportTasksAction { private final InternalClient client; private final ClusterService clusterService; @@ -248,10 +276,10 @@ public class CloseJobAction extends Action resolvedJobs = resolveAndValidateJobId(request.getJobId(), currentState); + ClusterState state = clusterService.state(); + request.resolvedJobIds = resolveAndValidateJobId(request.getJobId(), state).toArray(new String[0]); + Set executorNodes = new HashSet<>(); + for (String resolvedJobId : request.resolvedJobIds) { + JobManager.getJobOrThrowIfUnknown(state, resolvedJobId); + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(resolvedJobId, tasks); + if (jobTask == null || jobTask.isAssigned() == false) { + String message = "Cannot perform requested action because job [" + resolvedJobId + + "] is not open"; + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); + return; + } else { + executorNodes.add(jobTask.getExecutorNode()); + } + } - if (resolvedJobs.isEmpty()) { + request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); + if (request.resolvedJobIds.length == 0) { listener.onResponse(new Response(true)); return; } - - request.setResolvedJobIds(resolvedJobs.toArray(new String[0])); - if (request.isForce()) { - forceCloseJob(currentState, request, listener); + forceCloseJob(state, request, listener); } else { - normalCloseJob(currentState, task, request, listener); + normalCloseJob(state, task, request, listener); } } @Override - protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener, - ClusterState state) { + protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { task.closeJob("close job (api)"); listener.onResponse(new Response(true)); } + @Override + protected boolean accumulateExceptions() { + return true; + } + @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, @@ -305,7 +349,7 @@ public class CloseJobAction extends Action listener) { - String[] jobIds = request.getResolvedJobIds(); - final int numberOfJobs = jobIds.length; + final int numberOfJobs = request.resolvedJobIds.length; final AtomicInteger counter = new AtomicInteger(); - final AtomicArray failures = new AtomicArray<>(jobIds.length); - - for (String jobId : jobIds) { + final AtomicArray failures = new AtomicArray<>(numberOfJobs); + for (String jobId : request.resolvedJobIds) { auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); PersistentTask jobTask = validateAndReturnJobTask(jobId, currentState); persistentTasksService.cancelPersistentTask(jobTask.getId(), @@ -384,7 +426,7 @@ public class CloseJobAction extends Action listener) { Map jobIdToPersistentTaskId = new HashMap<>(); - for (String jobId : request.getResolvedJobIds()) { + for (String jobId : request.resolvedJobIds) { auditor.info(jobId, Messages.JOB_AUDIT_CLOSING); PersistentTask jobTask = validateAndReturnJobTask(jobId, currentState); jobIdToPersistentTaskId.put(jobId, jobTask.getId()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index 4d2051f4cd2..73397d24b0e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -31,9 +31,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import java.io.IOException; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.function.Supplier; /** @@ -57,26 +55,20 @@ public abstract class TransportJobTaskAction listener) { - ClusterState state = clusterService.state(); + String jobId = request.getJobId(); // We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the // node running the job task. - Set executorNodes = new HashSet<>(); - for (String resolvedJobId : request.getResolvedJobIds()) { - JobManager.getJobOrThrowIfUnknown(state, resolvedJobId); - PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(resolvedJobId, tasks); - if (jobTask == null || jobTask.isAssigned() == false) { - String message = "Cannot perform requested action because job [" + resolvedJobId - + "] is not open"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; - } else { - executorNodes.add(jobTask.getExecutorNode()); - } + ClusterState state = clusterService.state(); + JobManager.getJobOrThrowIfUnknown(state, jobId); + PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); + if (jobTask == null || jobTask.isAssigned() == false) { + String message = "Cannot perform requested action because job [" + jobId + "] is not open"; + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); + } else { + request.setNodes(jobTask.getExecutorNode()); + super.doExecute(task, request, listener); } - - request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); - super.doExecute(task, request, listener); } @Override @@ -133,53 +125,33 @@ public abstract class TransportJobTaskAction> extends BaseTasksRequest { String jobId; - String[] resolvedJobIds; JobTaskRequest() { } JobTaskRequest(String jobId) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); - - // the default implementation just returns 1 jobId - this.resolvedJobIds = new String[] { jobId }; } public String getJobId() { return jobId; } - protected String[] getResolvedJobIds() { - return resolvedJobIds; - } - - protected void setResolvedJobIds(String[] resolvedJobIds) { - this.resolvedJobIds = resolvedJobIds; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); jobId = in.readString(); - resolvedJobIds = in.readStringArray(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(jobId); - out.writeStringArray(resolvedJobIds); } @Override public boolean match(Task task) { - for (String id : resolvedJobIds) { - if (OpenJobAction.JobTask.match(task, id)) { - return true; - } - } - - return false; + return OpenJobAction.JobTask.match(task, jobId); } } }