[ML] Change close job api to no longer extend from TransportJobTaskAction.

The execution has diverged too much from post data, flush and update process apis, since the close all jobs have been added.
The logic is now easier to understand as it exist in a single source file instead of in both CloseJobAction and TransportJobTaskAction.

Original commit: elastic/x-pack-elasticsearch@daf5fabad5
This commit is contained in:
Martijn van Groningen 2017-04-10 11:48:37 +02:00
parent cb07e93fde
commit 63c04ef19d
2 changed files with 80 additions and 66 deletions

View File

@ -14,7 +14,9 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
@ -41,11 +43,12 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
@ -55,6 +58,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -83,7 +87,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
return new Response();
}
public static class Request extends TransportJobTaskAction.JobTaskRequest<Request> implements ToXContent {
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
@ -99,22 +103,32 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
public static Request parseRequest(String jobId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (jobId != null) {
request.jobId = jobId;
request.setJobId(jobId);
}
return request;
}
private TimeValue timeout = TimeValue.timeValueMinutes(20);
private String jobId;
private boolean force = false;
private TimeValue timeout = TimeValue.timeValueMinutes(20);
String[] resolvedJobIds;
Request() {}
public Request(String jobId) {
super(jobId);
this.jobId = jobId;
// the default implementation just returns 1 jobId
this.resolvedJobIds = new String[] { jobId };
}
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
this.resolvedJobIds = new String[] { jobId };
}
public TimeValue getCloseTimeout() {
@ -136,15 +150,29 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
timeout = new TimeValue(in);
force = in.readBoolean();
resolvedJobIds = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
timeout.writeTo(out);
out.writeBoolean(force);
out.writeStringArray(resolvedJobIds);
}
@Override
public boolean match(Task task) {
for (String id : resolvedJobIds) {
if (OpenJobAction.JobTask.match(task, id)) {
return true;
}
}
return false;
}
@Override
@ -238,7 +266,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
}
}
public static class TransportAction extends TransportJobTaskAction<OpenJobAction.JobTask, Request, Response> {
public static class TransportAction extends TransportTasksAction<OpenJobAction.JobTask, Request, Response, Response> {
private final InternalClient client;
private final ClusterService clusterService;
@ -248,10 +276,10 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, AutodetectProcessManager manager, InternalClient client,
ClusterService clusterService, InternalClient client,
Auditor auditor, PersistentTasksService persistentTasksService) {
super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT, manager);
indexNameExpressionResolver, Request::new, Response::new, ThreadPool.Names.MANAGEMENT);
this.client = client;
this.clusterService = clusterService;
this.auditor = auditor;
@ -274,30 +302,46 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
* result/failure
*/
ClusterState currentState = clusterService.state();
List<String> resolvedJobs = resolveAndValidateJobId(request.getJobId(), currentState);
ClusterState state = clusterService.state();
request.resolvedJobIds = resolveAndValidateJobId(request.getJobId(), state).toArray(new String[0]);
Set<String> executorNodes = new HashSet<>();
for (String resolvedJobId : request.resolvedJobIds) {
JobManager.getJobOrThrowIfUnknown(state, resolvedJobId);
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot perform requested action because job [" + resolvedJobId
+ "] is not open";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
executorNodes.add(jobTask.getExecutorNode());
}
}
if (resolvedJobs.isEmpty()) {
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
if (request.resolvedJobIds.length == 0) {
listener.onResponse(new Response(true));
return;
}
request.setResolvedJobIds(resolvedJobs.toArray(new String[0]));
if (request.isForce()) {
forceCloseJob(currentState, request, listener);
forceCloseJob(state, request, listener);
} else {
normalCloseJob(currentState, task, request, listener);
normalCloseJob(state, task, request, listener);
}
}
@Override
protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener,
ClusterState state) {
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
task.closeJob("close job (api)");
listener.onResponse(new Response(true));
}
@Override
protected boolean accumulateExceptions() {
return true;
}
@Override
protected Response newResponse(Request request, List<Response> tasks,
List<TaskOperationFailure> taskOperationFailures,
@ -305,7 +349,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
// number of resolved jobs should be equal to the number of tasks,
// otherwise something went wrong
if (request.getResolvedJobIds().length != tasks.size()) {
if (request.resolvedJobIds.length != tasks.size()) {
if (taskOperationFailures.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper
.convertToElastic(taskOperationFailures.get(0).getCause());
@ -314,7 +358,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
.convertToElastic(failedNodeExceptions.get(0));
} else {
throw new IllegalStateException(
"Expected [" + request.getResolvedJobIds().length
"Expected [" + request.resolvedJobIds.length
+ "] number of tasks but " + "got [" + tasks.size() + "]");
}
}
@ -330,12 +374,10 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
private void forceCloseJob(ClusterState currentState, Request request,
ActionListener<Response> listener) {
String[] jobIds = request.getResolvedJobIds();
final int numberOfJobs = jobIds.length;
final int numberOfJobs = request.resolvedJobIds.length;
final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(jobIds.length);
for (String jobId : jobIds) {
final AtomicArray<Exception> failures = new AtomicArray<>(numberOfJobs);
for (String jobId : request.resolvedJobIds) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
PersistentTask<?> jobTask = validateAndReturnJobTask(jobId, currentState);
persistentTasksService.cancelPersistentTask(jobTask.getId(),
@ -384,7 +426,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
ActionListener<Response> listener) {
Map<String, Long> jobIdToPersistentTaskId = new HashMap<>();
for (String jobId : request.getResolvedJobIds()) {
for (String jobId : request.resolvedJobIds) {
auditor.info(jobId, Messages.JOB_AUDIT_CLOSING);
PersistentTask<?> jobTask = validateAndReturnJobTask(jobId, currentState);
jobIdToPersistentTaskId.put(jobId, jobTask.getId());

View File

@ -31,9 +31,7 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
/**
@ -57,27 +55,21 @@ public abstract class TransportJobTaskAction<OperationTask extends OpenJobAction
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
String jobId = request.getJobId();
// We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the
// node running the job task.
Set<String> executorNodes = new HashSet<>();
for (String resolvedJobId : request.getResolvedJobIds()) {
JobManager.getJobOrThrowIfUnknown(state, resolvedJobId);
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
ClusterState state = clusterService.state();
JobManager.getJobOrThrowIfUnknown(state, jobId);
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot perform requested action because job [" + resolvedJobId
+ "] is not open";
String message = "Cannot perform requested action because job [" + jobId + "] is not open";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
executorNodes.add(jobTask.getExecutorNode());
}
}
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
request.setNodes(jobTask.getExecutorNode());
super.doExecute(task, request, listener);
}
}
@Override
protected final void taskOperation(Request request, OperationTask task, ActionListener<Response> listener) {
@ -133,53 +125,33 @@ public abstract class TransportJobTaskAction<OperationTask extends OpenJobAction
public static class JobTaskRequest<R extends JobTaskRequest<R>> extends BaseTasksRequest<R> {
String jobId;
String[] resolvedJobIds;
JobTaskRequest() {
}
JobTaskRequest(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
// the default implementation just returns 1 jobId
this.resolvedJobIds = new String[] { jobId };
}
public String getJobId() {
return jobId;
}
protected String[] getResolvedJobIds() {
return resolvedJobIds;
}
protected void setResolvedJobIds(String[] resolvedJobIds) {
this.resolvedJobIds = resolvedJobIds;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
resolvedJobIds = in.readStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeStringArray(resolvedJobIds);
}
@Override
public boolean match(Task task) {
for (String id : resolvedJobIds) {
if (OpenJobAction.JobTask.match(task, id)) {
return true;
}
}
return false;
return OpenJobAction.JobTask.match(task, jobId);
}
}
}