[ML] Close job api should use provided client instead of transport actions directly

Original commit: elastic/x-pack-elasticsearch@cb92f24ee1
This commit is contained in:
Martijn van Groningen 2017-02-28 13:44:21 +01:00
parent fc35b6022f
commit 759f0b1281
3 changed files with 68 additions and 40 deletions

View File

@ -38,6 +38,7 @@ import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.CloseJobService;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteFilterAction; import org.elasticsearch.xpack.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction;
@ -309,7 +310,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
persistentActionService, persistentActionService,
persistentActionRegistry, persistentActionRegistry,
new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService), new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService),
auditor auditor,
new CloseJobService(client, threadPool, clusterService)
); );
} }

View File

@ -6,16 +6,11 @@
package org.elasticsearch.xpack.ml.action; package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -40,14 +35,12 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
@ -226,22 +219,17 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
public static class TransportAction extends TransportMasterNodeAction<Request, Response> { public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final JobStateObserver observer;
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportListTasksAction listTasksAction; private final CloseJobService closeJobService;
private final TransportCancelTasksAction cancelTasksAction;
@Inject @Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, TransportListTasksAction listTasksAction, ClusterService clusterService, CloseJobService closeJobService) {
TransportCancelTasksAction cancelTasksAction) {
super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters, super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new); indexNameExpressionResolver, Request::new);
this.observer = new JobStateObserver(threadPool, clusterService);
this.clusterService = clusterService; this.clusterService = clusterService;
this.listTasksAction = listTasksAction; this.closeJobService = closeJobService;
this.cancelTasksAction = cancelTasksAction;
} }
@Override @Override
@ -277,29 +265,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
ListTasksRequest listTasksRequest = new ListTasksRequest(); closeJobService.closeJob(request, listener);
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(OpenJobAction.NAME + "[c]");
listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> {
String expectedDescription = "job-" + request.getJobId();
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedDescription.equals(taskInfo.getDescription())) {
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> {
observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> {
if (e == null) {
listener.onResponse(new CloseJobAction.Response(true));
} else {
listener.onFailure(e);
}
});
}, listener::onFailure));
return;
}
}
listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]"));
}, listener::onFailure));
} }
}); });
} }

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
/**
* Service that interacts with a client to close jobs remotely.
*/
// Ideally this would sit in CloseJobAction.TransportAction, but we can't inject a client there as
// it would lead to cyclic dependency issue, so we isolate it here.
public class CloseJobService {
private final Client client;
private final JobStateObserver observer;
public CloseJobService(Client client, ThreadPool threadPool, ClusterService clusterService) {
this.client = client;
this.observer = new JobStateObserver(threadPool, clusterService);
}
void closeJob(CloseJobAction.Request request, ActionListener<CloseJobAction.Response> listener) {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(OpenJobAction.NAME + "[c]");
client.admin().cluster().listTasks(listTasksRequest, ActionListener.wrap(listTasksResponse -> {
String expectedDescription = "job-" + request.getJobId();
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedDescription.equals(taskInfo.getDescription())) {
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> {
observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> {
if (e == null) {
listener.onResponse(new CloseJobAction.Response(true));
} else {
listener.onFailure(e);
}
});
}, listener::onFailure));
return;
}
}
listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]"));
}, listener::onFailure));
}
}