diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index 06093e1ead0..ef9f718411f 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -114,14 +114,15 @@ public class TransportCancelTasksAction extends TransportTasksAction listener) { - DiscoveryNodes childNodes = clusterService.state().nodes(); - final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes)); - boolean canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished); - if (canceled) { - if (cancellableTask.shouldCancelChildrenOnCancellation()) { + String nodeId = clusterService.localNode().getId(); + final boolean canceled; + if (cancellableTask.shouldCancelChildrenOnCancellation()) { + DiscoveryNodes childNodes = clusterService.state().nodes(); + final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes)); + canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished); + if (canceled) { // /In case the task has some child tasks, we need to wait for until ban is set on all nodes logger.trace("cancelling task {} on child nodes", cancellableTask.getId()); - String nodeId = clusterService.localNode().getId(); AtomicInteger responses = new AtomicInteger(childNodes.getSize()); List failures = new ArrayList<>(); setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener() { @@ -152,15 +153,21 @@ public class TransportCancelTasksAction extends TransportTasksAction listener.onResponse(cancellableTask.taskInfo(nodeId, false))); + if (canceled) { logger.trace("task {} doesn't have any children that should be cancelled", cancellableTask.getId()); } - } else { + } + if (canceled == false) { logger.trace("task {} is already cancelled", cancellableTask.getId()); throw new IllegalStateException("task with id " + cancellableTask.getId() + " is already cancelled"); } } + @Override protected boolean accumulateExceptions() { return true; diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index da60b564cec..decff2ffc37 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -51,6 +51,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -299,6 +300,54 @@ public class CancellableTasksTests extends TaskManagerTestCase { }); } + public void testChildTasksCancellation() throws Exception { + setupTestNodes(Settings.EMPTY); + connectNodes(testNodes); + CountDownLatch responseLatch = new CountDownLatch(1); + final AtomicReference responseReference = new AtomicReference<>(); + final AtomicReference throwableReference = new AtomicReference<>(); + Task mainTask = startCancellableTestNodesAction(true, nodesCount, new ActionListener() { + @Override + public void onResponse(NodesResponse listTasksResponse) { + responseReference.set(listTasksResponse); + responseLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throwableReference.set(e); + responseLatch.countDown(); + } + }); + + // Cancel all child tasks without cancelling the main task, which should quit on its own + CancelTasksRequest request = new CancelTasksRequest(); + request.setReason("Testing Cancellation"); + request.setParentTaskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId())); + // And send the cancellation request to a random node + CancelTasksResponse response = testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction.execute(request) + .get(); + + // Awaiting for the main task to finish + responseLatch.await(); + + // Should have cancelled tasks on all nodes + assertThat(response.getTasks().size(), equalTo(testNodes.length)); + + assertBusy(() -> { + try { + // Make sure that main task is no longer running + ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] + .transportListTasksAction.execute(new ListTasksRequest().setTaskId( + new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()))).get(); + assertEquals(0, listTasksResponse.getTasks().size()); + + } catch (ExecutionException | InterruptedException ex) { + throw new RuntimeException(ex); + } + }); + } + public void testTaskCancellationOnCoordinatingNodeLeavingTheCluster() throws Exception { setupTestNodes(Settings.EMPTY); connectNodes(testNodes);