diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index c36dc601a1b..cd1f09ebc53 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -46,7 +47,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; +import org.elasticsearch.xpack.ml.utils.JobStateObserver; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; @@ -225,22 +226,22 @@ public class CloseJobAction extends Action { + private final JobStateObserver observer; private final ClusterService clusterService; private final TransportListTasksAction listTasksAction; private final TransportCancelTasksAction cancelTasksAction; - private final PersistentTaskClusterService persistentTaskClusterService; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, TransportListTasksAction listTasksAction, - TransportCancelTasksAction cancelTasksAction, PersistentTaskClusterService persistentTaskClusterService) { + TransportCancelTasksAction cancelTasksAction) { super(settings, CloseJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); + this.observer = new JobStateObserver(threadPool, clusterService); this.clusterService = clusterService; this.listTasksAction = listTasksAction; this.cancelTasksAction = cancelTasksAction; - this.persistentTaskClusterService = persistentTaskClusterService; } @Override @@ -255,7 +256,6 @@ public class CloseJobAction extends Action listener) throws Exception { - PersistentTaskInProgress task = validateAndFindTask(request.getJobId(), state); clusterService.submitStateUpdateTask("closing job [" + request.getJobId() + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -269,28 +269,39 @@ public class CloseJobAction extends Action { - String expectedDescription = "job-" + request.getJobId(); - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (expectedDescription.equals(taskInfo.getDescription())) { - CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); - cancelTasksRequest.setTaskId(taskInfo.getTaskId()); - cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> { - persistentTaskClusterService.completeOrRestartPersistentTask(task.getId(), null, - ActionListener.wrap( - empty -> listener.onResponse(new CloseJobAction.Response(true)), - listener::onFailure - ) - ); - }, listener::onFailure)); - return; - } + threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); } - listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]")); - }, listener::onFailure)); + + @Override + protected void doRun() throws Exception { + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + listTasksRequest.setActions(OpenJobAction.NAME + "[c]"); + listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> { + String expectedDescription = "job-" + request.getJobId(); + for (TaskInfo taskInfo : listTasksResponse.getTasks()) { + if (expectedDescription.equals(taskInfo.getDescription())) { + CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setTaskId(taskInfo.getTaskId()); + cancelTasksAction.execute(cancelTasksRequest, ActionListener.wrap(cancelTaskResponse -> { + observer.waitForState(request.getJobId(), request.getTimeout(), JobState.CLOSED, e -> { + if (e == null) { + listener.onResponse(new CloseJobAction.Response(true)); + } else { + listener.onFailure(e); + } + }); + }, listener::onFailure)); + return; + } + } + listener.onFailure(new ResourceNotFoundException("task not found for job [" + request.getJobId() + "]")); + }, listener::onFailure)); + } + }); } }); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index d2cf6c3cfdc..7cc5d1d4a3f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -165,8 +165,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(JobState.OPENED, task.getStatus()); }); - // stop the only running ml node - logger.info("!!!!"); + logger.info("stop the only running ml node"); internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); ensureStableCluster(2); assertBusy(() -> { @@ -180,7 +179,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(JobState.OPENED, task.getStatus()); }); - // start ml node + logger.info("start ml node"); internalCluster().startNode(Settings.builder().put(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); ensureStableCluster(3); assertBusy(() -> { @@ -197,6 +196,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertEquals(expectedNodeAttr, node.getAttributes()); assertEquals(JobState.OPENED, task.getStatus()); }); + cleanupWorkaround(3); } }