Fix hanging cancelling task with no children

Cancelling tasks with no cancellable children can cause the cancellation operation to hang. This commit fixes this issue.
This commit is contained in:
Igor Motov 2017-01-25 14:56:48 -05:00
parent ea7077fb1b
commit b068814d10
2 changed files with 64 additions and 8 deletions

View File

@ -114,14 +114,15 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
@Override
protected synchronized void taskOperation(CancelTasksRequest request, CancellableTask cancellableTask,
ActionListener<TaskInfo> listener) {
DiscoveryNodes childNodes = clusterService.state().nodes();
final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes));
boolean canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
if (canceled) {
if (cancellableTask.shouldCancelChildrenOnCancellation()) {
String nodeId = clusterService.localNode().getId();
final boolean canceled;
if (cancellableTask.shouldCancelChildrenOnCancellation()) {
DiscoveryNodes childNodes = clusterService.state().nodes();
final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes));
canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
if (canceled) {
// /In case the task has some child tasks, we need to wait for until ban is set on all nodes
logger.trace("cancelling task {} on child nodes", cancellableTask.getId());
String nodeId = clusterService.localNode().getId();
AtomicInteger responses = new AtomicInteger(childNodes.getSize());
List<Exception> failures = new ArrayList<>();
setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener<Void>() {
@ -152,15 +153,21 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
}
}
});
} else {
}
} else {
canceled = taskManager.cancel(cancellableTask, request.getReason(),
() -> listener.onResponse(cancellableTask.taskInfo(nodeId, false)));
if (canceled) {
logger.trace("task {} doesn't have any children that should be cancelled", cancellableTask.getId());
}
} else {
}
if (canceled == false) {
logger.trace("task {} is already cancelled", cancellableTask.getId());
throw new IllegalStateException("task with id " + cancellableTask.getId() + " is already cancelled");
}
}
@Override
protected boolean accumulateExceptions() {
return true;

View File

@ -51,6 +51,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -299,6 +300,54 @@ public class CancellableTasksTests extends TaskManagerTestCase {
});
}
public void testChildTasksCancellation() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch responseLatch = new CountDownLatch(1);
final AtomicReference<NodesResponse> responseReference = new AtomicReference<>();
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
Task mainTask = startCancellableTestNodesAction(true, nodesCount, new ActionListener<NodesResponse>() {
@Override
public void onResponse(NodesResponse listTasksResponse) {
responseReference.set(listTasksResponse);
responseLatch.countDown();
}
@Override
public void onFailure(Exception e) {
throwableReference.set(e);
responseLatch.countDown();
}
});
// Cancel all child tasks without cancelling the main task, which should quit on its own
CancelTasksRequest request = new CancelTasksRequest();
request.setReason("Testing Cancellation");
request.setParentTaskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
// Awaiting for the main task to finish
responseLatch.await();
// Should have cancelled tasks on all nodes
assertThat(response.getTasks().size(), equalTo(testNodes.length));
assertBusy(() -> {
try {
// Make sure that main task is no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(
new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()))).get();
assertEquals(0, listTasksResponse.getTasks().size());
} catch (ExecutionException | InterruptedException ex) {
throw new RuntimeException(ex);
}
});
}
public void testTaskCancellationOnCoordinatingNodeLeavingTheCluster() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);