mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-26 06:46:10 +00:00
Cancel task and descendants on channel disconnects (#56620)
If a channel gets disconnected, then we should cancel the tasks associated with that channel as their results won't be retrieved. Closes #56327 Relates #56619 Backport of #56620
This commit is contained in:
parent
7c8860b7e6
commit
566b23c42c
@ -52,6 +52,7 @@ import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
@ -279,6 +280,32 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
||||
ensureAllBansRemoved();
|
||||
}
|
||||
|
||||
public void testCancelOrphanedTasks() throws Exception {
|
||||
final String nodeWithRootTask = internalCluster().startDataOnlyNode();
|
||||
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
|
||||
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));
|
||||
client(nodeWithRootTask).execute(TransportTestAction.ACTION, rootRequest);
|
||||
allowPartialRequest(rootRequest);
|
||||
try {
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithRootTask));
|
||||
assertBusy(() -> {
|
||||
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
|
||||
for (CancellableTask task : transportService.getTaskManager().getCancellableTasks().values()) {
|
||||
if (task.getAction().equals(TransportTestAction.ACTION.name())) {
|
||||
final TaskInfo taskInfo = task.taskInfo(transportService.getLocalNode().getId(), false);
|
||||
assertTrue(taskInfo.toString(), task.isCancelled());
|
||||
assertNotNull(taskInfo.toString(), task.getReasonCancelled());
|
||||
assertThat(taskInfo.toString(), task.getReasonCancelled(), equalTo("channel was closed"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
} finally {
|
||||
allowEntireRequest(rootRequest);
|
||||
ensureAllBansRemoved();
|
||||
}
|
||||
}
|
||||
|
||||
static TaskId getRootTaskId(TestRequest request) throws Exception {
|
||||
SetOnce<TaskId> taskId = new SetOnce<>();
|
||||
assertBusy(() -> {
|
||||
|
@ -19,38 +19,20 @@
|
||||
|
||||
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
|
||||
|
||||
import org.elasticsearch.ElasticsearchSecurityException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ChannelActionListener;
|
||||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@ -62,14 +44,10 @@ import java.util.function.Consumer;
|
||||
*/
|
||||
public class TransportCancelTasksAction extends TransportTasksAction<CancellableTask, CancelTasksRequest, CancelTasksResponse, TaskInfo> {
|
||||
|
||||
public static final String BAN_PARENT_ACTION_NAME = "internal:admin/tasks/ban";
|
||||
|
||||
@Inject
|
||||
public TransportCancelTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
|
||||
super(CancelTasksAction.NAME, clusterService, transportService, actionFilters,
|
||||
CancelTasksRequest::new, CancelTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
|
||||
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new,
|
||||
new BanParentRequestHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -108,172 +86,8 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
||||
@Override
|
||||
protected void taskOperation(CancelTasksRequest request, CancellableTask cancellableTask, ActionListener<TaskInfo> listener) {
|
||||
String nodeId = clusterService.localNode().getId();
|
||||
cancelTaskAndDescendants(cancellableTask, request.getReason(), request.waitForCompletion(),
|
||||
taskManager.cancelTaskAndDescendants(cancellableTask, request.getReason(), request.waitForCompletion(),
|
||||
ActionListener.map(listener, r -> cancellableTask.taskInfo(nodeId, false)));
|
||||
}
|
||||
|
||||
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
|
||||
final TaskId taskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
|
||||
if (task.shouldCancelChildrenOnCancellation()) {
|
||||
logger.trace("cancelling task [{}] and its descendants", taskId);
|
||||
StepListener<Void> completedListener = new StepListener<>();
|
||||
GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(completedListener, r -> null), 3);
|
||||
Collection<DiscoveryNode> childrenNodes = taskManager.startBanOnChildrenNodes(task.getId(), () -> {
|
||||
logger.trace("child tasks of parent [{}] are completed", taskId);
|
||||
groupedListener.onResponse(null);
|
||||
});
|
||||
taskManager.cancel(task, reason, () -> {
|
||||
logger.trace("task [{}] is cancelled", taskId);
|
||||
groupedListener.onResponse(null);
|
||||
});
|
||||
StepListener<Void> banOnNodesListener = new StepListener<>();
|
||||
setBanOnNodes(reason, waitForCompletion, task, childrenNodes, banOnNodesListener);
|
||||
banOnNodesListener.whenComplete(groupedListener::onResponse, groupedListener::onFailure);
|
||||
// If we start unbanning when the last child task completed and that child task executed with a specific user, then unban
|
||||
// requests are denied because internal requests can't run with a user. We need to remove bans with the current thread context.
|
||||
final Runnable removeBansRunnable = transportService.getThreadPool().getThreadContext()
|
||||
.preserveContext(() -> removeBanOnNodes(task, childrenNodes));
|
||||
// We remove bans after all child tasks are completed although in theory we can do it on a per-node basis.
|
||||
completedListener.whenComplete(r -> removeBansRunnable.run(), e -> removeBansRunnable.run());
|
||||
// if wait_for_completion is true, then only return when (1) bans are placed on child nodes, (2) child tasks are
|
||||
// completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child nodes.
|
||||
if (waitForCompletion) {
|
||||
completedListener.whenComplete(r -> listener.onResponse(null), listener::onFailure);
|
||||
} else {
|
||||
banOnNodesListener.whenComplete(r -> listener.onResponse(null), listener::onFailure);
|
||||
}
|
||||
} else {
|
||||
logger.trace("task [{}] doesn't have any children that should be cancelled", taskId);
|
||||
if (waitForCompletion) {
|
||||
taskManager.cancel(task, reason, () -> listener.onResponse(null));
|
||||
} else {
|
||||
taskManager.cancel(task, reason, () -> {});
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setBanOnNodes(String reason, boolean waitForCompletion, CancellableTask task,
|
||||
Collection<DiscoveryNode> childNodes, ActionListener<Void> listener) {
|
||||
if (childNodes.isEmpty()) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
|
||||
logger.trace("cancelling child tasks of [{}] on child nodes {}", taskId, childNodes);
|
||||
GroupedActionListener<Void> groupedListener =
|
||||
new GroupedActionListener<>(ActionListener.map(listener, r -> null), childNodes.size());
|
||||
final BanParentTaskRequest banRequest = BanParentTaskRequest.createSetBanParentTaskRequest(taskId, reason, waitForCompletion);
|
||||
for (DiscoveryNode node : childNodes) {
|
||||
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, banRequest,
|
||||
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
logger.trace("sent ban for tasks with the parent [{}] to the node [{}]", taskId, node);
|
||||
groupedListener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", taskId, node);
|
||||
groupedListener.onFailure(exp);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void removeBanOnNodes(CancellableTask task, Collection<DiscoveryNode> childNodes) {
|
||||
final BanParentTaskRequest request =
|
||||
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()));
|
||||
for (DiscoveryNode node : childNodes) {
|
||||
logger.trace("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
|
||||
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
|
||||
logger.info("failed to remove the parent ban for task {} on node {}", request.parentTaskId, node);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static class BanParentTaskRequest extends TransportRequest {
|
||||
|
||||
private final TaskId parentTaskId;
|
||||
private final boolean ban;
|
||||
private final boolean waitForCompletion;
|
||||
private final String reason;
|
||||
|
||||
static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) {
|
||||
return new BanParentTaskRequest(parentTaskId, reason, waitForCompletion);
|
||||
}
|
||||
|
||||
static BanParentTaskRequest createRemoveBanParentTaskRequest(TaskId parentTaskId) {
|
||||
return new BanParentTaskRequest(parentTaskId);
|
||||
}
|
||||
|
||||
private BanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) {
|
||||
this.parentTaskId = parentTaskId;
|
||||
this.ban = true;
|
||||
this.reason = reason;
|
||||
this.waitForCompletion = waitForCompletion;
|
||||
}
|
||||
|
||||
private BanParentTaskRequest(TaskId parentTaskId) {
|
||||
this.parentTaskId = parentTaskId;
|
||||
this.ban = false;
|
||||
this.reason = null;
|
||||
this.waitForCompletion = false;
|
||||
}
|
||||
|
||||
private BanParentTaskRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
parentTaskId = TaskId.readFromStream(in);
|
||||
ban = in.readBoolean();
|
||||
reason = ban ? in.readString() : null;
|
||||
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
|
||||
waitForCompletion = in.readBoolean();
|
||||
} else {
|
||||
waitForCompletion = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
parentTaskId.writeTo(out);
|
||||
out.writeBoolean(ban);
|
||||
if (ban) {
|
||||
out.writeString(reason);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
|
||||
out.writeBoolean(waitForCompletion);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRequest> {
|
||||
@Override
|
||||
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
if (request.ban) {
|
||||
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
|
||||
clusterService.localNode().getId(), request.reason);
|
||||
final List<CancellableTask> childTasks = taskManager.setBan(request.parentTaskId, request.reason);
|
||||
final GroupedActionListener<Void> listener = new GroupedActionListener<>(ActionListener.map(
|
||||
new ChannelActionListener<>(channel, BAN_PARENT_ACTION_NAME, request), r -> TransportResponse.Empty.INSTANCE),
|
||||
childTasks.size() + 1);
|
||||
for (CancellableTask childTask : childTasks) {
|
||||
cancelTaskAndDescendants(childTask, request.reason, request.waitForCompletion, listener);
|
||||
}
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
logger.debug("Removing ban for the parent [{}] on the node [{}]", request.parentTaskId,
|
||||
clusterService.localNode().getId());
|
||||
taskManager.removeBan(request.parentTaskId);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -157,6 +157,7 @@ import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.SnapshotShardsService;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskCancellationService;
|
||||
import org.elasticsearch.tasks.TaskResultsService;
|
||||
import org.elasticsearch.threadpool.ExecutorBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -735,6 +736,7 @@ public class Node implements Closeable {
|
||||
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
|
||||
TransportService transportService = injector.getInstance(TransportService.class);
|
||||
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
|
||||
transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
|
||||
transportService.start();
|
||||
assert localNodeFactory.getNode() != null;
|
||||
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
|
||||
|
@ -0,0 +1,226 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.tasks;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchSecurityException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.support.ChannelActionListener;
|
||||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public class TaskCancellationService {
|
||||
public static final String BAN_PARENT_ACTION_NAME = "internal:admin/tasks/ban";
|
||||
private static final Logger logger = LogManager.getLogger(TaskCancellationService.class);
|
||||
private final TransportService transportService;
|
||||
private final TaskManager taskManager;
|
||||
|
||||
public TaskCancellationService(TransportService transportService) {
|
||||
this.transportService = transportService;
|
||||
this.taskManager = transportService.getTaskManager();
|
||||
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new,
|
||||
new BanParentRequestHandler());
|
||||
}
|
||||
|
||||
private String localNodeId() {
|
||||
return transportService.getLocalNode().getId();
|
||||
}
|
||||
|
||||
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
|
||||
final TaskId taskId = task.taskInfo(localNodeId(), false).getTaskId();
|
||||
if (task.shouldCancelChildrenOnCancellation()) {
|
||||
logger.trace("cancelling task [{}] and its descendants", taskId);
|
||||
StepListener<Void> completedListener = new StepListener<>();
|
||||
GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(completedListener, r -> null), 3);
|
||||
Collection<DiscoveryNode> childrenNodes = taskManager.startBanOnChildrenNodes(task.getId(), () -> {
|
||||
logger.trace("child tasks of parent [{}] are completed", taskId);
|
||||
groupedListener.onResponse(null);
|
||||
});
|
||||
taskManager.cancel(task, reason, () -> {
|
||||
logger.trace("task [{}] is cancelled", taskId);
|
||||
groupedListener.onResponse(null);
|
||||
});
|
||||
StepListener<Void> banOnNodesListener = new StepListener<>();
|
||||
setBanOnNodes(reason, waitForCompletion, task, childrenNodes, banOnNodesListener);
|
||||
banOnNodesListener.whenComplete(groupedListener::onResponse, groupedListener::onFailure);
|
||||
// If we start unbanning when the last child task completed and that child task executed with a specific user, then unban
|
||||
// requests are denied because internal requests can't run with a user. We need to remove bans with the current thread context.
|
||||
final Runnable removeBansRunnable = transportService.getThreadPool().getThreadContext()
|
||||
.preserveContext(() -> removeBanOnNodes(task, childrenNodes));
|
||||
// We remove bans after all child tasks are completed although in theory we can do it on a per-node basis.
|
||||
completedListener.whenComplete(r -> removeBansRunnable.run(), e -> removeBansRunnable.run());
|
||||
// if wait_for_completion is true, then only return when (1) bans are placed on child nodes, (2) child tasks are
|
||||
// completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child nodes.
|
||||
if (waitForCompletion) {
|
||||
completedListener.whenComplete(r -> listener.onResponse(null), listener::onFailure);
|
||||
} else {
|
||||
banOnNodesListener.whenComplete(r -> listener.onResponse(null), listener::onFailure);
|
||||
}
|
||||
} else {
|
||||
logger.trace("task [{}] doesn't have any children that should be cancelled", taskId);
|
||||
if (waitForCompletion) {
|
||||
taskManager.cancel(task, reason, () -> listener.onResponse(null));
|
||||
} else {
|
||||
taskManager.cancel(task, reason, () -> {});
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setBanOnNodes(String reason, boolean waitForCompletion, CancellableTask task,
|
||||
Collection<DiscoveryNode> childNodes, ActionListener<Void> listener) {
|
||||
if (childNodes.isEmpty()) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
final TaskId taskId = new TaskId(localNodeId(), task.getId());
|
||||
logger.trace("cancelling child tasks of [{}] on child nodes {}", taskId, childNodes);
|
||||
GroupedActionListener<Void> groupedListener =
|
||||
new GroupedActionListener<>(ActionListener.map(listener, r -> null), childNodes.size());
|
||||
final BanParentTaskRequest banRequest = BanParentTaskRequest.createSetBanParentTaskRequest(taskId, reason, waitForCompletion);
|
||||
for (DiscoveryNode node : childNodes) {
|
||||
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, banRequest,
|
||||
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
logger.trace("sent ban for tasks with the parent [{}] to the node [{}]", taskId, node);
|
||||
groupedListener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", taskId, node);
|
||||
groupedListener.onFailure(exp);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void removeBanOnNodes(CancellableTask task, Collection<DiscoveryNode> childNodes) {
|
||||
final BanParentTaskRequest request =
|
||||
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(localNodeId(), task.getId()));
|
||||
for (DiscoveryNode node : childNodes) {
|
||||
logger.trace("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
|
||||
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
|
||||
logger.info("failed to remove the parent ban for task {} on node {}", request.parentTaskId, node);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static class BanParentTaskRequest extends TransportRequest {
|
||||
|
||||
private final TaskId parentTaskId;
|
||||
private final boolean ban;
|
||||
private final boolean waitForCompletion;
|
||||
private final String reason;
|
||||
|
||||
static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) {
|
||||
return new BanParentTaskRequest(parentTaskId, reason, waitForCompletion);
|
||||
}
|
||||
|
||||
static BanParentTaskRequest createRemoveBanParentTaskRequest(TaskId parentTaskId) {
|
||||
return new BanParentTaskRequest(parentTaskId);
|
||||
}
|
||||
|
||||
private BanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) {
|
||||
this.parentTaskId = parentTaskId;
|
||||
this.ban = true;
|
||||
this.reason = reason;
|
||||
this.waitForCompletion = waitForCompletion;
|
||||
}
|
||||
|
||||
private BanParentTaskRequest(TaskId parentTaskId) {
|
||||
this.parentTaskId = parentTaskId;
|
||||
this.ban = false;
|
||||
this.reason = null;
|
||||
this.waitForCompletion = false;
|
||||
}
|
||||
|
||||
private BanParentTaskRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
parentTaskId = TaskId.readFromStream(in);
|
||||
ban = in.readBoolean();
|
||||
reason = ban ? in.readString() : null;
|
||||
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
|
||||
waitForCompletion = in.readBoolean();
|
||||
} else {
|
||||
waitForCompletion = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
parentTaskId.writeTo(out);
|
||||
out.writeBoolean(ban);
|
||||
if (ban) {
|
||||
out.writeString(reason);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
|
||||
out.writeBoolean(waitForCompletion);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRequest> {
|
||||
@Override
|
||||
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
if (request.ban) {
|
||||
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
|
||||
localNodeId(), request.reason);
|
||||
final List<CancellableTask> childTasks = taskManager.setBan(request.parentTaskId, request.reason);
|
||||
final GroupedActionListener<Void> listener = new GroupedActionListener<>(ActionListener.map(
|
||||
new ChannelActionListener<>(channel, BAN_PARENT_ACTION_NAME, request), r -> TransportResponse.Empty.INSTANCE),
|
||||
childTasks.size() + 1);
|
||||
for (CancellableTask childTask : childTasks) {
|
||||
cancelTaskAndDescendants(childTask, request.reason, request.waitForCompletion, listener);
|
||||
}
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
logger.debug("Removing ban for the parent [{}] on the node [{}]", request.parentTaskId, localNodeId());
|
||||
taskManager.removeBan(request.parentTaskId);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -24,6 +24,8 @@ import com.carrotsearch.hppc.ObjectIntMap;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
@ -38,10 +40,12 @@ import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpChannel;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -54,6 +58,8 @@ import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
@ -88,6 +94,8 @@ public class TaskManager implements ClusterStateApplier {
|
||||
private volatile DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
|
||||
|
||||
private final ByteSizeValue maxHeaderSize;
|
||||
private final Map<TcpChannel, ChannelPendingTaskTracker> channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap();
|
||||
private final SetOnce<TaskCancellationService> cancellationService = new SetOnce<>();
|
||||
|
||||
public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
|
||||
this.threadPool = threadPool;
|
||||
@ -100,6 +108,10 @@ public class TaskManager implements ClusterStateApplier {
|
||||
this.taskResultsService = taskResultsService;
|
||||
}
|
||||
|
||||
public void setTaskCancellationService(TaskCancellationService taskCancellationService) {
|
||||
this.cancellationService.set(taskCancellationService);
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a task without parent task
|
||||
*/
|
||||
@ -404,17 +416,6 @@ public class TaskManager implements ClusterStateApplier {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Cancel cancellable tasks for the nodes that are gone
|
||||
for (Map.Entry<Long, CancellableTaskHolder> taskEntry : cancellableTasks.entrySet()) {
|
||||
CancellableTaskHolder holder = taskEntry.getValue();
|
||||
CancellableTask task = holder.getTask();
|
||||
TaskId parentTaskId = task.getParentTaskId();
|
||||
if (parentTaskId.isSet() && lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId()) == false) {
|
||||
if (task.cancelOnParentLeaving()) {
|
||||
holder.cancel("Coordinating node [" + parentTaskId.getNodeId() + "] left the cluster");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -569,4 +570,98 @@ public class TaskManager implements ClusterStateApplier {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start tracking a cancellable task with its tcp channel, so if the channel gets closed we can get a set of
|
||||
* pending tasks associated that channel and cancel them as these results won't be retrieved by the parent task.
|
||||
*
|
||||
* @return a releasable that should be called when this pending task is completed
|
||||
*/
|
||||
public Releasable startTrackingCancellableChannelTask(TcpChannel channel, CancellableTask task) {
|
||||
assert cancellableTasks.containsKey(task.getId()) : "task [" + task.getId() + "] is not registered yet";
|
||||
final ChannelPendingTaskTracker tracker = channelPendingTaskTrackers.compute(channel, (k, curr) -> {
|
||||
if (curr == null) {
|
||||
curr = new ChannelPendingTaskTracker();
|
||||
}
|
||||
curr.addTask(task);
|
||||
return curr;
|
||||
});
|
||||
if (tracker.registered.compareAndSet(false, true)) {
|
||||
channel.addCloseListener(ActionListener.wrap(
|
||||
r -> {
|
||||
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
|
||||
assert removedTracker == tracker;
|
||||
cancelTasksOnChannelClosed(tracker.drainTasks());
|
||||
},
|
||||
e -> {
|
||||
assert false : new AssertionError("must not be here", e);
|
||||
}));
|
||||
}
|
||||
return () -> tracker.removeTask(task);
|
||||
}
|
||||
|
||||
// for testing
|
||||
final int numberOfChannelPendingTaskTrackers() {
|
||||
return channelPendingTaskTrackers.size();
|
||||
}
|
||||
|
||||
private static class ChannelPendingTaskTracker {
|
||||
final AtomicBoolean registered = new AtomicBoolean();
|
||||
final Semaphore permits = Assertions.ENABLED ? new Semaphore(Integer.MAX_VALUE) : null;
|
||||
final Set<CancellableTask> pendingTasks = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
void addTask(CancellableTask task) {
|
||||
assert permits.tryAcquire() : "tracker was drained";
|
||||
final boolean added = pendingTasks.add(task);
|
||||
assert added : "task " + task.getId() + " is in the pending list already";
|
||||
assert releasePermit();
|
||||
}
|
||||
|
||||
boolean acquireAllPermits() {
|
||||
permits.acquireUninterruptibly(Integer.MAX_VALUE);
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean releasePermit() {
|
||||
permits.release();
|
||||
return true;
|
||||
}
|
||||
|
||||
Set<CancellableTask> drainTasks() {
|
||||
assert acquireAllPermits(); // do not release permits so we can't add tasks to this tracker after draining
|
||||
return Collections.unmodifiableSet(pendingTasks);
|
||||
}
|
||||
|
||||
void removeTask(CancellableTask task) {
|
||||
final boolean removed = pendingTasks.remove(task);
|
||||
assert removed : "task " + task.getId() + " is not in the pending list";
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelTasksOnChannelClosed(Set<CancellableTask> tasks) {
|
||||
if (tasks.isEmpty() == false) {
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn("failed to cancel tasks on channel closed", e);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
for (CancellableTask task : tasks) {
|
||||
cancelTaskAndDescendants(task, "channel was closed", false, ActionListener.wrap(() -> {}));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
|
||||
final TaskCancellationService service = cancellationService.get();
|
||||
if (service != null) {
|
||||
service.cancelTaskAndDescendants(task, reason, waitForCompletion, listener);
|
||||
} else {
|
||||
assert false : "TaskCancellationService is not initialized";
|
||||
throw new IllegalStateException("TaskCancellationService is not initialized");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,9 @@ package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
|
||||
@ -58,14 +61,18 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
|
||||
|
||||
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
|
||||
final Task task = taskManager.register(channel.getChannelType(), action, request);
|
||||
boolean success = false;
|
||||
Releasable unregisterTask = () -> taskManager.unregister(task);
|
||||
try {
|
||||
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
taskManager.unregister(task);
|
||||
if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) {
|
||||
final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel();
|
||||
final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);
|
||||
unregisterTask = Releasables.wrap(unregisterTask, stopTracking);
|
||||
}
|
||||
final TaskTransportChannel taskTransportChannel = new TaskTransportChannel(channel, unregisterTask);
|
||||
handler.messageReceived(request, taskTransportChannel, task);
|
||||
unregisterTask = null;
|
||||
} finally {
|
||||
Releasables.close(unregisterTask);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,22 +20,18 @@
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TaskTransportChannel implements TransportChannel {
|
||||
|
||||
private final Task task;
|
||||
|
||||
private final TaskManager taskManager;
|
||||
private final TransportChannel channel;
|
||||
private final Releasable onTaskFinished;
|
||||
|
||||
TaskTransportChannel(TaskManager taskManager, Task task, TransportChannel channel) {
|
||||
TaskTransportChannel(TransportChannel channel, Releasable onTaskFinished) {
|
||||
this.channel = channel;
|
||||
this.task = task;
|
||||
this.taskManager = taskManager;
|
||||
this.onTaskFinished = onTaskFinished;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -50,14 +46,20 @@ public class TaskTransportChannel implements TransportChannel {
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response) throws IOException {
|
||||
endTask();
|
||||
channel.sendResponse(response);
|
||||
try {
|
||||
onTaskFinished.close();
|
||||
} finally {
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Exception exception) throws IOException {
|
||||
endTask();
|
||||
channel.sendResponse(exception);
|
||||
try {
|
||||
onTaskFinished.close();
|
||||
} finally {
|
||||
channel.sendResponse(exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -68,8 +70,4 @@ public class TaskTransportChannel implements TransportChannel {
|
||||
public TransportChannel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
private void endTask() {
|
||||
taskManager.unregister(task);
|
||||
}
|
||||
}
|
||||
|
@ -377,7 +377,6 @@ public class CancellableTasksTests extends TaskManagerTestCase {
|
||||
setupTestNodes(Settings.EMPTY);
|
||||
connectNodes(testNodes);
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
boolean simulateBanBeforeLeaving = randomBoolean();
|
||||
final AtomicReference<NodesResponse> responseReference = new AtomicReference<>();
|
||||
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
|
||||
int blockedNodesCount = randomIntBetween(0, nodesCount - 1);
|
||||
@ -410,40 +409,51 @@ public class CancellableTasksTests extends TaskManagerTestCase {
|
||||
assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size()));
|
||||
|
||||
// Simulate the coordinating node leaving the cluster
|
||||
DiscoveryNode[] discoveryNodes = new DiscoveryNode[testNodes.length - 1];
|
||||
for (int i = 1; i < testNodes.length; i++) {
|
||||
discoveryNodes[i - 1] = testNodes[i].discoveryNode();
|
||||
}
|
||||
DiscoveryNode master = discoveryNodes[0];
|
||||
for (int i = 1; i < testNodes.length; i++) {
|
||||
// Notify only nodes that should remain in the cluster
|
||||
setState(testNodes[i].clusterService, ClusterStateCreationUtils.state(testNodes[i].discoveryNode(), master, discoveryNodes));
|
||||
}
|
||||
|
||||
if (simulateBanBeforeLeaving) {
|
||||
logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster");
|
||||
// Simulate issuing cancel request on the node that is about to leave the cluster
|
||||
CancelTasksRequest request = new CancelTasksRequest();
|
||||
request.setReason("Testing Cancellation");
|
||||
request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
|
||||
// And send the cancellation request to a random node
|
||||
CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request);
|
||||
logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster");
|
||||
// This node still thinks that's part of the cluster, so cancelling should look successful
|
||||
if (response.getTasks().size() == 0) {
|
||||
logger.error("!!!!");
|
||||
if (randomBoolean()) {
|
||||
DiscoveryNode[] discoveryNodes = new DiscoveryNode[testNodes.length - 1];
|
||||
for (int i = 1; i < testNodes.length; i++) {
|
||||
discoveryNodes[i - 1] = testNodes[i].discoveryNode();
|
||||
}
|
||||
DiscoveryNode master = discoveryNodes[0];
|
||||
for (int i = 1; i < testNodes.length; i++) {
|
||||
// Notify only nodes that should remain in the cluster
|
||||
setState(testNodes[i].clusterService,
|
||||
ClusterStateCreationUtils.state(testNodes[i].discoveryNode(), master, discoveryNodes));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster");
|
||||
// Simulate issuing cancel request on the node that is about to leave the cluster
|
||||
CancelTasksRequest request = new CancelTasksRequest();
|
||||
request.setReason("Testing Cancellation");
|
||||
request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
|
||||
// And send the cancellation request to a random node
|
||||
CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request);
|
||||
logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster");
|
||||
// This node still thinks that's part of the cluster, so cancelling should look successful
|
||||
if (response.getTasks().size() == 0) {
|
||||
logger.error("!!!!");
|
||||
}
|
||||
assertThat(response.getTasks().size(), lessThanOrEqualTo(1));
|
||||
assertThat(response.getTaskFailures().size(), lessThanOrEqualTo(1));
|
||||
assertThat(response.getTaskFailures().size() + response.getTasks().size(), lessThanOrEqualTo(1));
|
||||
}
|
||||
assertThat(response.getTasks().size(), lessThanOrEqualTo(1));
|
||||
assertThat(response.getTaskFailures().size(), lessThanOrEqualTo(1));
|
||||
assertThat(response.getTaskFailures().size() + response.getTasks().size(), lessThanOrEqualTo(1));
|
||||
}
|
||||
|
||||
for (int i = 1; i < testNodes.length; i++) {
|
||||
assertEquals("No bans on the node " + i, 0, testNodes[i].transportService.getTaskManager().getBanCount());
|
||||
}
|
||||
|
||||
// Close the first node
|
||||
testNodes[0].close();
|
||||
if (randomBoolean()) {
|
||||
testNodes[0].close();
|
||||
} else {
|
||||
for (TestNode blockOnNode : blockOnNodes) {
|
||||
if (randomBoolean()) {
|
||||
testNodes[0].transportService.disconnectFromNode(blockOnNode.discoveryNode());
|
||||
} else {
|
||||
testNodes[0].transportService.getConnection(blockOnNode.discoveryNode()).close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertBusy(() -> {
|
||||
// Make sure that tasks are no longer running
|
||||
@ -455,7 +465,6 @@ public class CancellableTasksTests extends TaskManagerTestCase {
|
||||
|
||||
// Wait for clean up
|
||||
responseLatch.await();
|
||||
|
||||
}
|
||||
|
||||
public void testNonExistingTaskCancellation() throws Exception {
|
||||
|
@ -44,6 +44,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.tasks.TaskCancellationService;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
@ -91,8 +92,11 @@ public abstract class TaskManagerTestCase extends ESTestCase {
|
||||
|
||||
@After
|
||||
public final void shutdownTestNodes() throws Exception {
|
||||
for (TestNode testNode : testNodes) {
|
||||
testNode.close();
|
||||
if (testNodes != null) {
|
||||
for (TestNode testNode : testNodes) {
|
||||
testNode.close();
|
||||
}
|
||||
testNodes = null;
|
||||
}
|
||||
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
|
||||
threadPool = null;
|
||||
@ -182,6 +186,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
|
||||
}
|
||||
}
|
||||
};
|
||||
transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
|
||||
transportService.start();
|
||||
clusterService = createClusterService(threadPool, discoveryNode.get());
|
||||
clusterService.addStateApplier(transportService.getTaskManager());
|
||||
|
@ -0,0 +1,203 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.tasks;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.TransportTasksActionTests;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.FakeTcpChannel;
|
||||
import org.elasticsearch.transport.TcpChannel;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Phaser;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TaskManagerTests extends ESTestCase {
|
||||
private ThreadPool threadPool;
|
||||
|
||||
@Before
|
||||
public void setupThreadPool() {
|
||||
threadPool = new TestThreadPool(TransportTasksActionTests.class.getSimpleName());
|
||||
}
|
||||
|
||||
@After
|
||||
public void terminateThreadPool() {
|
||||
terminate(threadPool);
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes sure that tasks that attempt to store themselves on completion retry if
|
||||
* they don't succeed at first.
|
||||
*/
|
||||
public void testResultsServiceRetryTotalTime() {
|
||||
Iterator<TimeValue> times = TaskResultsService.STORE_BACKOFF_POLICY.iterator();
|
||||
long total = 0;
|
||||
while (times.hasNext()) {
|
||||
total += times.next().millis();
|
||||
}
|
||||
assertEquals(600000L, total);
|
||||
}
|
||||
|
||||
public void testTrackingChannelTask() throws Exception {
|
||||
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
|
||||
Set<CancellableTask> cancelledTasks = new HashSet<>();
|
||||
taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) {
|
||||
@Override
|
||||
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
|
||||
assertThat(reason, equalTo("channel was closed"));
|
||||
assertFalse(waitForCompletion);
|
||||
assertTrue("task [" + task + "] was cancelled already", cancelledTasks.add(task));
|
||||
}
|
||||
});
|
||||
Map<TcpChannel, Set<Task>> pendingTasks = new HashMap<>();
|
||||
Set<Task> expectedCancelledTasks = new HashSet<>();
|
||||
FakeTcpChannel[] channels = new FakeTcpChannel[randomIntBetween(1, 10)];
|
||||
List<Releasable> stopTrackingTasks = new ArrayList<>();
|
||||
for (int i = 0; i < channels.length; i++) {
|
||||
channels[i] = new SingleThreadedTcpChannel();
|
||||
}
|
||||
int iterations = randomIntBetween(1, 200);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
final List<Releasable> subset = randomSubsetOf(stopTrackingTasks);
|
||||
stopTrackingTasks.removeAll(subset);
|
||||
Releasables.close(subset);
|
||||
final FakeTcpChannel channel = randomFrom(channels);
|
||||
final Task task = taskManager.register("transport", "test", new CancellableRequest(Integer.toString(i)));
|
||||
if (channel.isOpen() && randomBoolean()) {
|
||||
channel.close();
|
||||
expectedCancelledTasks.addAll(pendingTasks.getOrDefault(channel, Collections.emptySet()));
|
||||
}
|
||||
final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(channel, (CancellableTask) task);
|
||||
if (channel.isOpen()) {
|
||||
pendingTasks.computeIfAbsent(channel, k -> new HashSet<>()).add(task);
|
||||
stopTrackingTasks.add(() -> {
|
||||
stopTracking.close();
|
||||
pendingTasks.get(channel).remove(task);
|
||||
});
|
||||
} else {
|
||||
expectedCancelledTasks.add(task);
|
||||
}
|
||||
}
|
||||
assertBusy(() -> assertThat(cancelledTasks, equalTo(expectedCancelledTasks)));
|
||||
for (FakeTcpChannel channel : channels) {
|
||||
channel.close();
|
||||
}
|
||||
assertThat(taskManager.numberOfChannelPendingTaskTrackers(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testTrackingTaskAndCloseChannelConcurrently() throws Exception {
|
||||
final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
|
||||
Set<CancellableTask> cancelledTasks = ConcurrentCollections.newConcurrentSet();
|
||||
taskManager.setTaskCancellationService(new TaskCancellationService(mock(TransportService.class)) {
|
||||
@Override
|
||||
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
|
||||
assertTrue("task [" + task + "] was cancelled already", cancelledTasks.add(task));
|
||||
}
|
||||
});
|
||||
Set<Task> expectedCancelledTasks = ConcurrentCollections.newConcurrentSet();
|
||||
FakeTcpChannel[] channels = new FakeTcpChannel[randomIntBetween(2, 20)];
|
||||
for (int i = 0; i < channels.length; i++) {
|
||||
channels[i] = new FakeTcpChannel();
|
||||
}
|
||||
Thread[] threads = new Thread[randomIntBetween(2, 8)];
|
||||
Phaser phaser = new Phaser(threads.length);
|
||||
for (int t = 0; t < threads.length; t++) {
|
||||
String threadName = "thread-" + t;
|
||||
threads[t] = new Thread(() -> {
|
||||
phaser.arriveAndAwaitAdvance();
|
||||
int iterations = randomIntBetween(100, 1000);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
final FakeTcpChannel channel = randomFrom(channels);
|
||||
final Task task = taskManager.register("transport", "test", new CancellableRequest(threadName + ":" + i));
|
||||
expectedCancelledTasks.add(task);
|
||||
taskManager.startTrackingCancellableChannelTask(channel, (CancellableTask) task);
|
||||
if (randomInt(100) < 5) {
|
||||
randomFrom(channels).close();
|
||||
}
|
||||
}
|
||||
});
|
||||
threads[t].start();
|
||||
}
|
||||
for (FakeTcpChannel channel : channels) {
|
||||
channel.close();
|
||||
}
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
assertBusy(() -> assertThat(cancelledTasks, equalTo(expectedCancelledTasks)));
|
||||
assertThat(taskManager.numberOfChannelPendingTaskTrackers(), equalTo(0));
|
||||
}
|
||||
|
||||
static class CancellableRequest extends TransportRequest {
|
||||
private final String requestId;
|
||||
|
||||
CancellableRequest(String requestId) {
|
||||
this.requestId = requestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
|
||||
return new CancellableTask(id, type, action, "request-" + requestId, parentTaskId, headers) {
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getDescription();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
static class SingleThreadedTcpChannel extends FakeTcpChannel {
|
||||
private boolean registeredListener = false;
|
||||
|
||||
@Override
|
||||
public void addCloseListener(ActionListener<Void> listener) {
|
||||
if (isOpen()) {
|
||||
assertFalse("listener was registered already", registeredListener);
|
||||
registeredListener = true;
|
||||
}
|
||||
super.addCloseListener(listener);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.tasks;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* Makes sure that tasks that attempt to store themselves on completion retry if
|
||||
* they don't succeed at first.
|
||||
*/
|
||||
public class TaskResultsServiceTests extends ESTestCase {
|
||||
public void testRetryTotalTime() {
|
||||
Iterator<TimeValue> times = TaskResultsService.STORE_BACKOFF_POLICY.iterator();
|
||||
long total = 0;
|
||||
while (times.hasNext()) {
|
||||
total += times.next().millis();
|
||||
}
|
||||
assertEquals(600000L, total);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user