Add trace log for task cancellation (#55940)
Adding trace logs to the task cancellation and its tests to debug the test failure in #55875. Relates ##55875
This commit is contained in:
parent
144e8ce092
commit
edbaa19a5d
|
@ -113,13 +113,19 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
}
|
||||
|
||||
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
|
||||
final TaskId taskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
|
||||
if (task.shouldCancelChildrenOnCancellation()) {
|
||||
logger.trace("cancelling task [{}] and its descendants", taskId);
|
||||
StepListener<Void> completedListener = new StepListener<>();
|
||||
GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(completedListener, r -> null), 3);
|
||||
Collection<DiscoveryNode> childrenNodes =
|
||||
taskManager.startBanOnChildrenNodes(task.getId(), () -> groupedListener.onResponse(null));
|
||||
taskManager.cancel(task, reason, () -> groupedListener.onResponse(null));
|
||||
|
||||
Collection<DiscoveryNode> childrenNodes = taskManager.startBanOnChildrenNodes(task.getId(), () -> {
|
||||
logger.trace("child tasks of parent [{}] are completed", taskId);
|
||||
groupedListener.onResponse(null);
|
||||
});
|
||||
taskManager.cancel(task, reason, () -> {
|
||||
logger.trace("task [{}] is cancelled", taskId);
|
||||
groupedListener.onResponse(null);
|
||||
});
|
||||
StepListener<Void> banOnNodesListener = new StepListener<>();
|
||||
setBanOnNodes(reason, waitForCompletion, task, childrenNodes, banOnNodesListener);
|
||||
banOnNodesListener.whenComplete(groupedListener::onResponse, groupedListener::onFailure);
|
||||
|
@ -137,7 +143,7 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
banOnNodesListener.whenComplete(r -> listener.onResponse(null), listener::onFailure);
|
||||
}
|
||||
} else {
|
||||
logger.trace("task {} doesn't have any children that should be cancelled", task.getId());
|
||||
logger.trace("task [{}] doesn't have any children that should be cancelled", taskId);
|
||||
if (waitForCompletion) {
|
||||
taskManager.cancel(task, reason, () -> listener.onResponse(null));
|
||||
} else {
|
||||
|
@ -153,23 +159,24 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
logger.trace("cancelling task {} on child nodes {}", task.getId(), childNodes);
|
||||
final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
|
||||
logger.trace("cancelling child tasks of [{}] on child nodes {}", taskId, childNodes);
|
||||
GroupedActionListener<Void> groupedListener =
|
||||
new GroupedActionListener<>(ActionListener.map(listener, r -> null), childNodes.size());
|
||||
final BanParentTaskRequest banRequest = BanParentTaskRequest.createSetBanParentTaskRequest(
|
||||
new TaskId(clusterService.localNode().getId(), task.getId()), reason, waitForCompletion);
|
||||
final BanParentTaskRequest banRequest = BanParentTaskRequest.createSetBanParentTaskRequest(taskId, reason, waitForCompletion);
|
||||
for (DiscoveryNode node : childNodes) {
|
||||
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, banRequest,
|
||||
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
logger.trace("sent ban for tasks with the parent [{}] to the node [{}]", taskId, node);
|
||||
groupedListener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", banRequest.parentTaskId, node);
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", taskId, node);
|
||||
groupedListener.onFailure(exp);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -197,8 +197,12 @@ public class TaskManager implements ClusterStateApplier {
|
|||
public Releasable registerChildNode(long taskId, DiscoveryNode node) {
|
||||
final CancellableTaskHolder holder = cancellableTasks.get(taskId);
|
||||
if (holder != null) {
|
||||
logger.trace("register child node [{}] task [{}]", node, taskId);
|
||||
holder.registerChildNode(node);
|
||||
return Releasables.releaseOnce(() -> holder.unregisterChildNode(node));
|
||||
return Releasables.releaseOnce(() -> {
|
||||
logger.trace("unregister child node [{}] task [{}]", node, taskId);
|
||||
holder.unregisterChildNode(node);
|
||||
});
|
||||
}
|
||||
return () -> {};
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.tasks.TaskId;
|
|||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
|
@ -79,6 +80,9 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
@TestIssueLogging(
|
||||
value = "org.elasticsearch.action.admin.cluster.node.tasks.cancel:TRACE,org.elasticsearch.tasks:TRACE",
|
||||
issueUrl = "https://github.com/elastic/elasticsearch/issues/55875")
|
||||
public class CancellableTasksIT extends ESIntegTestCase {
|
||||
|
||||
static int idGenerator = 0;
|
||||
|
|
Loading…
Reference in New Issue