[ML] No need to remove the job task manually, when job task is cancelled the persistent task will be removed too.

Original commit: elastic/x-pack-elasticsearch@e0e3947a53
This commit is contained in:
Martijn van Groningen 2017-02-17 14:57:52 +01:00
parent 6739f84efb
commit 75470b4036
2 changed files with 40 additions and 29 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
@ -46,7 +47,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
@ -225,22 +226,22 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
public static class TransportAction extends TransportMasterNodeAction<Request, Response> { public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final JobStateObserver observer;
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportListTasksAction listTasksAction; private final TransportListTasksAction listTasksAction;
private final TransportCancelTasksAction cancelTasksAction; private final TransportCancelTasksAction cancelTasksAction;
private final PersistentTaskClusterService persistentTaskClusterService;
@Inject @Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, TransportListTasksAction listTasksAction, ClusterService clusterService, TransportListTasksAction listTasksAction,
TransportCancelTasksAction cancelTasksAction, PersistentTaskClusterService persistentTaskClusterService) { TransportCancelTasksAction cancelTasksAction) {
super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters, super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new); indexNameExpressionResolver, Request::new);
this.observer = new JobStateObserver(threadPool, clusterService);
this.clusterService = clusterService; this.clusterService = clusterService;
this.listTasksAction = listTasksAction; this.listTasksAction = listTasksAction;
this.cancelTasksAction = cancelTasksAction; this.cancelTasksAction = cancelTasksAction;
this.persistentTaskClusterService = persistentTaskClusterService;
} }
@Override @Override
@ -255,7 +256,6 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override @Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception { protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
PersistentTaskInProgress<?> task = validateAndFindTask(request.getJobId(), state);
clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
@ -269,28 +269,39 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
ListTasksRequest listTasksRequest = new ListTasksRequest(); threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
listTasksRequest.setDetailed(true); @Override
listTasksRequest.setActions(OpenJobAction.NAME + "[c]"); public void onFailure(Exception e) {
listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> { listener.onFailure(e);
String expectedDescription = "job-" + request.getJobId();
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedDescription.equals(taskInfo.getDescription())) {
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> {
persistentTaskClusterService.completeOrRestartPersistentTask(task.getId(), null,
ActionListener.wrap(
empty -> listener.onResponse(new CloseJobAction.Response(true)),
listener::onFailure
)
);
}, listener::onFailure));
return;
}
} }
listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]"));
}, listener::onFailure)); @Override
protected void doRun() throws Exception {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(OpenJobAction.NAME + "[c]");
listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> {
String expectedDescription = "job-" + request.getJobId();
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (expectedDescription.equals(taskInfo.getDescription())) {
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(taskInfo.getTaskId());
cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> {
observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> {
if (e == null) {
listener.onResponse(new CloseJobAction.Response(true));
} else {
listener.onFailure(e);
}
});
}, listener::onFailure));
return;
}
}
listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]"));
}, listener::onFailure));
}
});
} }
}); });
} }

View File

@ -165,8 +165,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals(JobState.OPENED, task.getStatus()); assertEquals(JobState.OPENED, task.getStatus());
}); });
// stop the only running ml node logger.info("stop the only running ml node");
logger.info("!!!!");
internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true));
ensureStableCluster(2); ensureStableCluster(2);
assertBusy(() -> { assertBusy(() -> {
@ -180,7 +179,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals(JobState.OPENED, task.getStatus()); assertEquals(JobState.OPENED, task.getStatus());
}); });
// start ml node logger.info("start ml node");
internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true));
ensureStableCluster(3); ensureStableCluster(3);
assertBusy(() -> { assertBusy(() -> {
@ -197,6 +196,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(expectedNodeAttr, node.getAttributes());
assertEquals(JobState.OPENED, task.getStatus()); assertEquals(JobState.OPENED, task.getStatus());
}); });
cleanupWorkaround(3);
} }
} }