Remove taskManager.registerChildTask
Instead of forcing each task to register all nodes where its children are running, this commit runs cancellation on all nodes. The task cancellation operation doesn't run too frequently, so this optimization doesn't seem to be worth additional complexity of the interface.
This commit is contained in:
parent
099d229138
commit
500548fcda
|
@ -19,21 +19,21 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
|
@ -49,9 +49,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
|
@ -116,18 +114,15 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
@Override
|
||||
protected synchronized void taskOperation(CancelTasksRequest request, CancellableTask cancellableTask,
|
||||
ActionListener<TaskInfo> listener) {
|
||||
final BanLock banLock = new BanLock(nodes -> removeBanOnNodes(cancellableTask, nodes));
|
||||
Set<String> childNodes = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
|
||||
if (childNodes != null) {
|
||||
if (childNodes.isEmpty()) {
|
||||
// The task has no child tasks, so we can return immediately
|
||||
logger.trace("cancelling task {} with no children", cancellableTask.getId());
|
||||
listener.onResponse(cancellableTask.taskInfo(clusterService.localNode().getId(), false));
|
||||
} else {
|
||||
// The task has some child tasks, we need to wait for until ban is set on all nodes
|
||||
logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes);
|
||||
DiscoveryNodes childNodes = clusterService.state().nodes();
|
||||
final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes));
|
||||
boolean canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
|
||||
if (canceled) {
|
||||
if (cancellableTask.shouldCancelChildrenOnCancellation()) {
|
||||
// /In case the task has some child tasks, we need to wait for until ban is set on all nodes
|
||||
logger.trace("cancelling task {} on child nodes", cancellableTask.getId());
|
||||
String nodeId = clusterService.localNode().getId();
|
||||
AtomicInteger responses = new AtomicInteger(childNodes.size());
|
||||
AtomicInteger responses = new AtomicInteger(childNodes.getSize());
|
||||
List<Exception> failures = new ArrayList<>();
|
||||
setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener<Void>() {
|
||||
@Override
|
||||
|
@ -157,7 +152,8 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
}
|
||||
}
|
||||
});
|
||||
|
||||
} else {
|
||||
logger.trace("task {} doesn't have any children that should be cancelled", cancellableTask.getId());
|
||||
}
|
||||
} else {
|
||||
logger.trace("task {} is already cancelled", cancellableTask.getId());
|
||||
|
@ -170,70 +166,54 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
return true;
|
||||
}
|
||||
|
||||
private void setBanOnNodes(String reason, CancellableTask task, Set<String> nodes, ActionListener<Void> listener) {
|
||||
private void setBanOnNodes(String reason, CancellableTask task, DiscoveryNodes nodes, ActionListener<Void> listener) {
|
||||
sendSetBanRequest(nodes,
|
||||
BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason),
|
||||
listener);
|
||||
}
|
||||
|
||||
private void removeBanOnNodes(CancellableTask task, Set<String> nodes) {
|
||||
private void removeBanOnNodes(CancellableTask task, DiscoveryNodes nodes) {
|
||||
sendRemoveBanRequest(nodes,
|
||||
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId())));
|
||||
}
|
||||
|
||||
private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request, ActionListener<Void> listener) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
for (String node : nodes) {
|
||||
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
|
||||
if (discoveryNode != null) {
|
||||
// Check if node still in the cluster
|
||||
logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node,
|
||||
request.ban);
|
||||
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request,
|
||||
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
private void sendSetBanRequest(DiscoveryNodes nodes, BanParentTaskRequest request, ActionListener<Void> listener) {
|
||||
for (ObjectObjectCursor<String, DiscoveryNode> node : nodes.getNodes()) {
|
||||
logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node.key,
|
||||
request.ban);
|
||||
transportService.sendRequest(node.value, BAN_PARENT_ACTION_NAME, request,
|
||||
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
logger.debug("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster",
|
||||
request.parentTaskId, node);
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node.key);
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void sendRemoveBanRequest(Set<String> nodes, BanParentTaskRequest request) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
for (String node : nodes) {
|
||||
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
|
||||
if (discoveryNode != null) {
|
||||
// Check if node still in the cluster
|
||||
logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
|
||||
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler
|
||||
.INSTANCE_SAME);
|
||||
} else {
|
||||
logger.debug("Cannot send remove ban request for tasks with the parent [{}] to the node [{}] - the node no longer in " +
|
||||
"the cluster", request.parentTaskId, node);
|
||||
}
|
||||
private void sendRemoveBanRequest(DiscoveryNodes nodes, BanParentTaskRequest request) {
|
||||
for (ObjectObjectCursor<String, DiscoveryNode> node : nodes.getNodes()) {
|
||||
logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node.key);
|
||||
transportService.sendRequest(node.value, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler
|
||||
.INSTANCE_SAME);
|
||||
}
|
||||
}
|
||||
|
||||
private static class BanLock {
|
||||
private final Consumer<Set<String>> finish;
|
||||
private final Runnable finish;
|
||||
private final AtomicInteger counter;
|
||||
private final AtomicReference<Set<String>> nodes = new AtomicReference<>();
|
||||
private final int nodesSize;
|
||||
|
||||
public BanLock(Consumer<Set<String>> finish) {
|
||||
public BanLock(int nodesSize, Runnable finish) {
|
||||
counter = new AtomicInteger(0);
|
||||
this.finish = finish;
|
||||
this.nodesSize = nodesSize;
|
||||
}
|
||||
|
||||
public void onBanSet() {
|
||||
|
@ -242,15 +222,14 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
}
|
||||
}
|
||||
|
||||
public void onTaskFinished(Set<String> nodes) {
|
||||
this.nodes.set(nodes);
|
||||
if (counter.addAndGet(nodes.size()) == 0) {
|
||||
public void onTaskFinished() {
|
||||
if (counter.addAndGet(nodesSize) == 0) {
|
||||
finish();
|
||||
}
|
||||
}
|
||||
|
||||
public void finish() {
|
||||
finish.accept(nodes.get());
|
||||
finish.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -322,5 +301,4 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -120,7 +120,6 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
|
|||
return;
|
||||
}
|
||||
GetTaskRequest nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId());
|
||||
taskManager.registerChildTask(thisTask, node.getId());
|
||||
transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(),
|
||||
new TransportResponseHandler<GetTaskResponse>() {
|
||||
@Override
|
||||
|
|
|
@ -31,4 +31,9 @@ public class SearchTask extends CancellableTask {
|
|||
super(id, type, action, description, parentTaskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -175,7 +175,6 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
|||
// no node connected, act as failure
|
||||
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
|
||||
} else {
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
transportService.sendRequest(node, transportShardAction, shardRequest, new TransportResponseHandler<ShardResponse>() {
|
||||
@Override
|
||||
public ShardResponse newInstance() {
|
||||
|
|
|
@ -318,7 +318,6 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|||
NodeRequest nodeRequest = new NodeRequest(node.getId(), request, shards);
|
||||
if (task != null) {
|
||||
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
}
|
||||
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler<NodeResponse>() {
|
||||
@Override
|
||||
|
|
|
@ -160,7 +160,6 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
}
|
||||
}
|
||||
};
|
||||
taskManager.registerChildTask(task, nodes.getLocalNodeId());
|
||||
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
|
@ -173,7 +172,6 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
logger.debug("no known master node, scheduling a retry");
|
||||
retry(null, masterChangePredicate);
|
||||
} else {
|
||||
taskManager.registerChildTask(task, nodes.getMasterNode().getId());
|
||||
transportService.sendRequest(nodes.getMasterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener, TransportMasterNodeAction.this::newResponse) {
|
||||
@Override
|
||||
public void handleException(final TransportException exp) {
|
||||
|
|
|
@ -199,7 +199,6 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
|||
TransportRequest nodeRequest = newNodeRequest(nodeId, request);
|
||||
if (task != null) {
|
||||
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
}
|
||||
|
||||
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
|
||||
|
|
|
@ -119,7 +119,6 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
|||
protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
|
||||
ShardRequest shardRequest = newShardRequest(request, shardId);
|
||||
shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
taskManager.registerChildTask(task, clusterService.localNode().getId());
|
||||
replicatedBroadcastShardAction.execute(shardRequest, shardActionListener);
|
||||
}
|
||||
|
||||
|
|
|
@ -664,7 +664,6 @@ public abstract class TransportReplicationAction<
|
|||
return;
|
||||
}
|
||||
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
|
||||
performLocalAction(state, primary, node);
|
||||
} else {
|
||||
|
|
|
@ -278,7 +278,6 @@ public abstract class TransportTasksAction<
|
|||
} else {
|
||||
NodeTaskRequest nodeRequest = new NodeTaskRequest(request);
|
||||
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
taskManager.registerChildTask(task, node.getId());
|
||||
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
|
||||
new TransportResponseHandler<NodeTasksResponse>() {
|
||||
@Override
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
/**
|
||||
* A task that can be canceled
|
||||
*/
|
||||
public class CancellableTask extends Task {
|
||||
public abstract class CancellableTask extends Task {
|
||||
|
||||
private final AtomicReference<String> reason = new AtomicReference<>();
|
||||
|
||||
|
@ -51,6 +51,11 @@ public class CancellableTask extends Task {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this task should can potentially have children that needs to be cancelled when the parent is cancelled.
|
||||
*/
|
||||
public abstract boolean shouldCancelChildrenOnCancellation();
|
||||
|
||||
public boolean isCancelled() {
|
||||
return reason.get() != null;
|
||||
}
|
||||
|
|
|
@ -125,15 +125,18 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
/**
|
||||
* Cancels a task
|
||||
* <p>
|
||||
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
|
||||
* Returns true if cancellation was started successful, null otherwise.
|
||||
*
|
||||
* After starting cancellation on the parent task, the task manager tries to cancel all children tasks
|
||||
* of the current task. Once cancellation of the children tasks is done, the listener is triggered.
|
||||
*/
|
||||
public Set<String> cancel(CancellableTask task, String reason, Consumer<Set<String>> listener) {
|
||||
public boolean cancel(CancellableTask task, String reason, Runnable listener) {
|
||||
CancellableTaskHolder holder = cancellableTasks.get(task.getId());
|
||||
if (holder != null) {
|
||||
logger.trace("cancelling task with id {}", task.getId());
|
||||
return holder.cancel(reason, listener);
|
||||
}
|
||||
return null;
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -344,17 +347,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
}
|
||||
}
|
||||
|
||||
public void registerChildTask(Task task, String node) {
|
||||
if (task == null || task instanceof CancellableTask == false) {
|
||||
// We don't have a cancellable task - not much we can do here
|
||||
return;
|
||||
}
|
||||
CancellableTaskHolder holder = cancellableTasks.get(task.getId());
|
||||
if (holder != null) {
|
||||
holder.registerChildTaskNode(node);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the calling thread, waiting for the task to vanish from the TaskManager.
|
||||
*/
|
||||
|
@ -378,11 +370,9 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
|
||||
private final CancellableTask task;
|
||||
|
||||
private final Set<String> nodesWithChildTasks = new HashSet<>();
|
||||
|
||||
private volatile String cancellationReason = null;
|
||||
|
||||
private volatile Consumer<Set<String>> cancellationListener = null;
|
||||
private volatile Runnable cancellationListener = null;
|
||||
|
||||
public CancellableTaskHolder(CancellableTask task) {
|
||||
this.task = task;
|
||||
|
@ -391,33 +381,33 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
/**
|
||||
* Marks task as cancelled.
|
||||
* <p>
|
||||
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
|
||||
* Returns true if cancellation was successful, false otherwise.
|
||||
*/
|
||||
public Set<String> cancel(String reason, Consumer<Set<String>> listener) {
|
||||
Set<String> nodes;
|
||||
public boolean cancel(String reason, Runnable listener) {
|
||||
final boolean cancelled;
|
||||
synchronized (this) {
|
||||
assert reason != null;
|
||||
if (cancellationReason == null) {
|
||||
cancellationReason = reason;
|
||||
cancellationListener = listener;
|
||||
nodes = Collections.unmodifiableSet(nodesWithChildTasks);
|
||||
cancelled = true;
|
||||
} else {
|
||||
// Already cancelled by somebody else
|
||||
nodes = null;
|
||||
cancelled = false;
|
||||
}
|
||||
}
|
||||
if (nodes != null) {
|
||||
if (cancelled) {
|
||||
task.cancel(reason);
|
||||
}
|
||||
return nodes;
|
||||
return cancelled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks task as cancelled.
|
||||
* <p>
|
||||
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
|
||||
* Returns true if cancellation was successful, false otherwise.
|
||||
*/
|
||||
public Set<String> cancel(String reason) {
|
||||
public boolean cancel(String reason) {
|
||||
return cancel(reason, null);
|
||||
}
|
||||
|
||||
|
@ -425,14 +415,12 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
* Marks task as finished.
|
||||
*/
|
||||
public void finish() {
|
||||
Consumer<Set<String>> listener = null;
|
||||
Set<String> nodes = null;
|
||||
Runnable listener = null;
|
||||
synchronized (this) {
|
||||
if (cancellationReason != null) {
|
||||
// The task was cancelled, we need to notify the listener
|
||||
if (cancellationListener != null) {
|
||||
listener = cancellationListener;
|
||||
nodes = Collections.unmodifiableSet(nodesWithChildTasks);
|
||||
cancellationListener = null;
|
||||
}
|
||||
} else {
|
||||
|
@ -442,7 +430,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
// We need to call the listener outside of the synchronised section to avoid potential bottle necks
|
||||
// in the listener synchronization
|
||||
if (listener != null) {
|
||||
listener.accept(nodes);
|
||||
listener.run();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -454,14 +442,6 @@ public class TaskManager extends AbstractComponent implements ClusterStateApplie
|
|||
public CancellableTask getTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
public synchronized void registerChildTaskNode(String nodeId) {
|
||||
if (cancellationReason == null) {
|
||||
nodesWithChildTasks.add(nodeId);
|
||||
} else {
|
||||
throw new TaskCancelledException("cannot register child task request, the task is already cancelled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -498,7 +498,6 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
final TransportResponseHandler<T> handler) {
|
||||
request.setParentTask(localNode.getId(), parentTask.getId());
|
||||
try {
|
||||
taskManager.registerChildTask(parentTask, node.getId());
|
||||
final Transport.Connection connection = getConnection(node);
|
||||
sendRequest(connection, action, request, options, handler);
|
||||
} catch (TaskCancelledException ex) {
|
||||
|
|
|
@ -91,7 +91,12 @@ public class CancellableTasksTests extends TaskManagerTestCase {
|
|||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,7 +131,12 @@ public class CancellableTasksTests extends TaskManagerTestCase {
|
|||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -83,6 +83,11 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
|
|||
super(id, type, action, description, parentTaskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isBlocked() {
|
||||
return blocked;
|
||||
}
|
||||
|
@ -242,7 +247,12 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
|
|||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -71,6 +71,11 @@ public abstract class BulkByScrollTask extends CancellableTask {
|
|||
*/
|
||||
public abstract TaskInfo getInfoGivenSliceInfo(String localNodeId, List<TaskInfo> sliceInfo);
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class Status implements Task.Status, SuccessfullyProcessed {
|
||||
public static final String NAME = "bulk-by-scroll";
|
||||
|
||||
|
|
|
@ -47,9 +47,6 @@ public class ReindexParallelizationHelper {
|
|||
r -> task.onSliceResponse(listener, slice.source().slice().getId(), r),
|
||||
e -> task.onSliceFailure(listener, slice.source().slice().getId(), e));
|
||||
client.execute(action, requestForSlice, sliceListener);
|
||||
/* Explicitly tell the task manager that we're running child tasks on the local node so it will cancel them when the parent is
|
||||
* cancelled. */
|
||||
taskManager.registerChildTask(task, localNodeId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -604,7 +604,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
* that good stuff.
|
||||
*/
|
||||
if (delay.nanos() > 0) {
|
||||
generic().execute(() -> taskManager.cancel(testTask, reason, (Set<String> s) -> {}));
|
||||
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
|
||||
}
|
||||
return super.schedule(delay, name, command);
|
||||
}
|
||||
|
@ -637,7 +637,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
action.setScroll(scrollId());
|
||||
}
|
||||
String reason = randomSimpleString(random());
|
||||
taskManager.cancel(testTask, reason, (Set<String> s) -> {});
|
||||
taskManager.cancel(testTask, reason, () -> {});
|
||||
testMe.accept(action);
|
||||
assertEquals(reason, listener.get().getReasonCancelled());
|
||||
if (previousScrollSet) {
|
||||
|
|
Loading…
Reference in New Issue