diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index ce5d92753a8..06093e1ead0 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -19,21 +19,21 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; @@ -49,9 +49,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; /** @@ -116,18 +114,15 @@ public class TransportCancelTasksAction extends TransportTasksAction listener) { - final BanLock banLock = new BanLock(nodes -> removeBanOnNodes(cancellableTask, nodes)); - Set childNodes = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished); - if (childNodes != null) { - if (childNodes.isEmpty()) { - // The task has no child tasks, so we can return immediately - logger.trace("cancelling task {} with no children", cancellableTask.getId()); - listener.onResponse(cancellableTask.taskInfo(clusterService.localNode().getId(), false)); - } else { - // The task has some child tasks, we need to wait for until ban is set on all nodes - logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes); + DiscoveryNodes childNodes = clusterService.state().nodes(); + final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes)); + boolean canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished); + if (canceled) { + if (cancellableTask.shouldCancelChildrenOnCancellation()) { + // /In case the task has some child tasks, we need to wait for until ban is set on all nodes + logger.trace("cancelling task {} on child nodes", cancellableTask.getId()); String nodeId = clusterService.localNode().getId(); - AtomicInteger responses = new AtomicInteger(childNodes.size()); + AtomicInteger responses = new AtomicInteger(childNodes.getSize()); List failures = new ArrayList<>(); setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener() { @Override @@ -157,7 +152,8 @@ public class TransportCancelTasksAction extends TransportTasksAction nodes, ActionListener listener) { + private void setBanOnNodes(String reason, CancellableTask task, DiscoveryNodes nodes, ActionListener listener) { sendSetBanRequest(nodes, BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason), listener); } - private void removeBanOnNodes(CancellableTask task, Set nodes) { + private void removeBanOnNodes(CancellableTask task, DiscoveryNodes nodes) { sendRemoveBanRequest(nodes, BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()))); } - private void sendSetBanRequest(Set nodes, BanParentTaskRequest request, ActionListener listener) { - ClusterState clusterState = clusterService.state(); - for (String node : nodes) { - DiscoveryNode discoveryNode = clusterState.getNodes().get(node); - if (discoveryNode != null) { - // Check if node still in the cluster - logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node, - request.ban); - transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, - new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty response) { - listener.onResponse(null); - } + private void sendSetBanRequest(DiscoveryNodes nodes, BanParentTaskRequest request, ActionListener listener) { + for (ObjectObjectCursor node : nodes.getNodes()) { + logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node.key, + request.ban); + transportService.sendRequest(node.value, BAN_PARENT_ACTION_NAME, request, + new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty response) { + listener.onResponse(null); + } - @Override - public void handleException(TransportException exp) { - logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node); - listener.onFailure(exp); - } - }); - } else { - listener.onResponse(null); - logger.debug("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster", - request.parentTaskId, node); - } + @Override + public void handleException(TransportException exp) { + logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node.key); + listener.onFailure(exp); + } + }); } } - private void sendRemoveBanRequest(Set nodes, BanParentTaskRequest request) { - ClusterState clusterState = clusterService.state(); - for (String node : nodes) { - DiscoveryNode discoveryNode = clusterState.getNodes().get(node); - if (discoveryNode != null) { - // Check if node still in the cluster - logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node); - transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler - .INSTANCE_SAME); - } else { - logger.debug("Cannot send remove ban request for tasks with the parent [{}] to the node [{}] - the node no longer in " + - "the cluster", request.parentTaskId, node); - } + private void sendRemoveBanRequest(DiscoveryNodes nodes, BanParentTaskRequest request) { + for (ObjectObjectCursor node : nodes.getNodes()) { + logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node.key); + transportService.sendRequest(node.value, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler + .INSTANCE_SAME); } } private static class BanLock { - private final Consumer> finish; + private final Runnable finish; private final AtomicInteger counter; - private final AtomicReference> nodes = new AtomicReference<>(); + private final int nodesSize; - public BanLock(Consumer> finish) { + public BanLock(int nodesSize, Runnable finish) { counter = new AtomicInteger(0); this.finish = finish; + this.nodesSize = nodesSize; } public void onBanSet() { @@ -242,15 +222,14 @@ public class TransportCancelTasksAction extends TransportTasksAction nodes) { - this.nodes.set(nodes); - if (counter.addAndGet(nodes.size()) == 0) { + public void onTaskFinished() { + if (counter.addAndGet(nodesSize) == 0) { finish(); } } public void finish() { - finish.accept(nodes.get()); + finish.run(); } } @@ -322,5 +301,4 @@ public class TransportCancelTasksAction extends TransportTasksAction() { @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTask.java b/core/src/main/java/org/elasticsearch/action/search/SearchTask.java index 24f94a43319..d0a1cdd456f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTask.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -31,4 +31,9 @@ public class SearchTask extends CancellableTask { super(id, type, action, description, parentTaskId); } + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + } diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index c48fa1e8122..0408b04cc83 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -175,7 +175,6 @@ public abstract class TransportBroadcastAction() { @Override public ShardResponse newInstance() { diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 9f11b9b5a70..ceca57e0520 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -318,7 +318,6 @@ public abstract class TransportBroadcastByNodeAction() { @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index fbae9f7a12b..f2bc4da423d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -160,7 +160,6 @@ public abstract class TransportMasterNodeAction(listener, TransportMasterNodeAction.this::newResponse) { @Override public void handleException(final TransportException exp) { diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 6cc063d5af1..d8010f4381f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -199,7 +199,6 @@ public abstract class TransportNodesAction shardActionListener) { ShardRequest shardRequest = newShardRequest(request, shardId); shardRequest.setParentTask(clusterService.localNode().getId(), task.getId()); - taskManager.registerChildTask(task, clusterService.localNode().getId()); replicatedBroadcastShardAction.execute(shardRequest, shardActionListener); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index f03385e3829..d3646ac98e7 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -664,7 +664,6 @@ public abstract class TransportReplicationAction< return; } final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); - taskManager.registerChildTask(task, node.getId()); if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { performLocalAction(state, primary, node); } else { diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index ee384b819b0..09026960580 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -278,7 +278,6 @@ public abstract class TransportTasksAction< } else { NodeTaskRequest nodeRequest = new NodeTaskRequest(request); nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); - taskManager.registerChildTask(task, node.getId()); transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new TransportResponseHandler() { @Override diff --git a/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java b/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java index eb084a57e10..b3c1a8929a6 100644 --- a/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java +++ b/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * A task that can be canceled */ -public class CancellableTask extends Task { +public abstract class CancellableTask extends Task { private final AtomicReference reason = new AtomicReference<>(); @@ -51,6 +51,11 @@ public class CancellableTask extends Task { return true; } + /** + * Returns true if this task should can potentially have children that needs to be cancelled when the parent is cancelled. + */ + public abstract boolean shouldCancelChildrenOnCancellation(); + public boolean isCancelled() { return reason.get() != null; } diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java index 61c36f9015d..dce39cea7d7 100644 --- a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -125,15 +125,18 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie /** * Cancels a task *

- * Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise. + * Returns true if cancellation was started successful, null otherwise. + * + * After starting cancellation on the parent task, the task manager tries to cancel all children tasks + * of the current task. Once cancellation of the children tasks is done, the listener is triggered. */ - public Set cancel(CancellableTask task, String reason, Consumer> listener) { + public boolean cancel(CancellableTask task, String reason, Runnable listener) { CancellableTaskHolder holder = cancellableTasks.get(task.getId()); if (holder != null) { logger.trace("cancelling task with id {}", task.getId()); return holder.cancel(reason, listener); } - return null; + return false; } /** @@ -344,17 +347,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie } } - public void registerChildTask(Task task, String node) { - if (task == null || task instanceof CancellableTask == false) { - // We don't have a cancellable task - not much we can do here - return; - } - CancellableTaskHolder holder = cancellableTasks.get(task.getId()); - if (holder != null) { - holder.registerChildTaskNode(node); - } - } - /** * Blocks the calling thread, waiting for the task to vanish from the TaskManager. */ @@ -378,11 +370,9 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie private final CancellableTask task; - private final Set nodesWithChildTasks = new HashSet<>(); - private volatile String cancellationReason = null; - private volatile Consumer> cancellationListener = null; + private volatile Runnable cancellationListener = null; public CancellableTaskHolder(CancellableTask task) { this.task = task; @@ -391,33 +381,33 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie /** * Marks task as cancelled. *

- * Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise. + * Returns true if cancellation was successful, false otherwise. */ - public Set cancel(String reason, Consumer> listener) { - Set nodes; + public boolean cancel(String reason, Runnable listener) { + final boolean cancelled; synchronized (this) { assert reason != null; if (cancellationReason == null) { cancellationReason = reason; cancellationListener = listener; - nodes = Collections.unmodifiableSet(nodesWithChildTasks); + cancelled = true; } else { // Already cancelled by somebody else - nodes = null; + cancelled = false; } } - if (nodes != null) { + if (cancelled) { task.cancel(reason); } - return nodes; + return cancelled; } /** * Marks task as cancelled. *

- * Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise. + * Returns true if cancellation was successful, false otherwise. */ - public Set cancel(String reason) { + public boolean cancel(String reason) { return cancel(reason, null); } @@ -425,14 +415,12 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie * Marks task as finished. */ public void finish() { - Consumer> listener = null; - Set nodes = null; + Runnable listener = null; synchronized (this) { if (cancellationReason != null) { // The task was cancelled, we need to notify the listener if (cancellationListener != null) { listener = cancellationListener; - nodes = Collections.unmodifiableSet(nodesWithChildTasks); cancellationListener = null; } } else { @@ -442,7 +430,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie // We need to call the listener outside of the synchronised section to avoid potential bottle necks // in the listener synchronization if (listener != null) { - listener.accept(nodes); + listener.run(); } } @@ -454,14 +442,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie public CancellableTask getTask() { return task; } - - public synchronized void registerChildTaskNode(String nodeId) { - if (cancellationReason == null) { - nodesWithChildTasks.add(nodeId); - } else { - throw new TaskCancelledException("cannot register child task request, the task is already cancelled"); - } - } } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index e7dd454b4d2..1978fd0f48f 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -498,7 +498,6 @@ public class TransportService extends AbstractLifecycleComponent { final TransportResponseHandler handler) { request.setParentTask(localNode.getId(), parentTask.getId()); try { - taskManager.registerChildTask(parentTask, node.getId()); final Transport.Connection connection = getConnection(node); sendRequest(connection, action, request, options, handler); } catch (TaskCancelledException ex) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index 6d0a0824490..da60b564cec 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -91,7 +91,12 @@ public class CancellableTasksTests extends TaskManagerTestCase { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId); + return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + }; } } @@ -126,7 +131,12 @@ public class CancellableTasksTests extends TaskManagerTestCase { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId); + return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + }; } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 28aff0d3bda..99d058886a5 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -83,6 +83,11 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin { super(id, type, action, description, parentTaskId); } + @Override + public boolean shouldCancelChildrenOnCancellation() { + return false; + } + public boolean isBlocked() { return blocked; } @@ -242,7 +247,12 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskId); + return new CancellableTask(id, type, action, getDescription(), parentTaskId) { + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + }; } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 705b048f2af..6d2151a1405 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -71,6 +71,11 @@ public abstract class BulkByScrollTask extends CancellableTask { */ public abstract TaskInfo getInfoGivenSliceInfo(String localNodeId, List sliceInfo); + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + public static class Status implements Task.Status, SuccessfullyProcessed { public static final String NAME = "bulk-by-scroll"; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexParallelizationHelper.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexParallelizationHelper.java index b2dbd51f381..e14b33b5db5 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexParallelizationHelper.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexParallelizationHelper.java @@ -47,9 +47,6 @@ public class ReindexParallelizationHelper { r -> task.onSliceResponse(listener, slice.source().slice().getId(), r), e -> task.onSliceFailure(listener, slice.source().slice().getId(), e)); client.execute(action, requestForSlice, sliceListener); - /* Explicitly tell the task manager that we're running child tasks on the local node so it will cancel them when the parent is - * cancelled. */ - taskManager.registerChildTask(task, localNodeId); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 932d4de9174..e54d7c72efe 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -604,7 +604,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { * that good stuff. */ if (delay.nanos() > 0) { - generic().execute(() -> taskManager.cancel(testTask, reason, (Set s) -> {})); + generic().execute(() -> taskManager.cancel(testTask, reason, () -> {})); } return super.schedule(delay, name, command); } @@ -637,7 +637,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { action.setScroll(scrollId()); } String reason = randomSimpleString(random()); - taskManager.cancel(testTask, reason, (Set s) -> {}); + taskManager.cancel(testTask, reason, () -> {}); testMe.accept(action); assertEquals(reason, listener.get().getReasonCancelled()); if (previousScrollSet) {