Ensure unregister child node if failed to register task (#56254)

We fail to unregister the child node in registerAndExecute if the parent
task is being canceled. This leads to a bug where a cancel request never
completes.

Closes #55875
Relates #54312
This commit is contained in:
Nhat Nguyen 2020-05-07 10:10:13 -04:00 committed by GitHub
parent 8e005db3e6
commit bd0e0f41a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 11 deletions

View File

@ -52,7 +52,6 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
@ -80,9 +79,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
@TestIssueLogging(
value = "org.elasticsearch.action.admin.cluster.node.tasks.cancel:TRACE,org.elasticsearch.tasks:TRACE",
issueUrl = "https://github.com/elastic/elasticsearch/issues/55875")
public class CancellableTasksIT extends ESIntegTestCase {
static int idGenerator = 0;
@ -241,7 +237,6 @@ public class CancellableTasksIT extends ESIntegTestCase {
ensureAllBansRemoved();
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55875")
public void testDoNotWaitForCompletion() throws Exception {
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.tasks.TaskManager;
@ -72,13 +73,18 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
* this method.
*/
final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
Task task = taskManager.register("transport", actionName, request);
final Task task;
try {
task = taskManager.register("transport", actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
throw e;
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
taskManager.unregister(task);
unregisterChildNode.close();
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(response);
}
@ -102,13 +108,18 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
*/
public final Task execute(Request request, TaskListener<Response> listener) {
final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
Task task = taskManager.register("transport", actionName, request);
final Task task;
try {
task = taskManager.register("transport", actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
throw e;
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
taskManager.unregister(task);
unregisterChildNode.close();
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(task, response);
}

View File

@ -53,6 +53,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
@ -61,6 +62,7 @@ import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;
public class CancellableTasksTests extends TaskManagerTestCase {
@ -351,6 +353,26 @@ public class CancellableTasksTests extends TaskManagerTestCase {
});
}
public void testRegisterAndExecuteChildTaskWhileParentTaskIsBeingCanceled() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
final TaskManager taskManager = testNodes[0].transportService.getTaskManager();
CancellableNodesRequest parentRequest = new CancellableNodesRequest("parent");
final Task parentTask = taskManager.register("test", "test", parentRequest);
final TaskId parentTaskId = parentTask.taskInfo(testNodes[0].getNodeId(), false).getTaskId();
taskManager.setBan(new TaskId(testNodes[0].getNodeId(), parentTask.getId()), "test");
CancellableNodesRequest childRequest = new CancellableNodesRequest("child");
childRequest.setParentTask(parentTaskId);
CancellableTestNodesAction testAction = new CancellableTestNodesAction("internal:testAction", threadPool, testNodes[1]
.clusterService, testNodes[0].transportService, false, new CountDownLatch(1));
TaskCancelledException cancelledException = expectThrows(TaskCancelledException.class,
() -> testAction.execute(childRequest, ActionListener.wrap(() -> fail("must not execute"))));
assertThat(cancelledException.getMessage(), startsWith("Task cancelled before it started:"));
CountDownLatch latch = new CountDownLatch(1);
taskManager.startBanOnChildrenNodes(parentTaskId.getId(), latch::countDown);
assertTrue("onChildTasksCompleted() is not invoked", latch.await(1, TimeUnit.SECONDS));
}
public void testTaskCancellationOnCoordinatingNodeLeavingTheCluster() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);