Remove ban tasks with the current thread context (#55404)

If we start unbanning when the last child task completed and that child 
task executed with a specific user, then unban requests are denied
because internal requests can't run with a user. We need to remove bans
with the current thread context.
This commit is contained in:
Nhat Nguyen 2020-04-17 12:40:32 -04:00
parent eb30cf5c89
commit 5bc8a859c6
1 changed files with 15 additions and 2 deletions

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -121,8 +123,12 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
StepListener<Void> banOnNodesListener = new StepListener<>();
setBanOnNodes(reason, waitForCompletion, task, childrenNodes, banOnNodesListener);
banOnNodesListener.whenComplete(groupedListener::onResponse, groupedListener::onFailure);
// If we start unbanning when the last child task completed and that child task executed with a specific user, then unban
// requests are denied because internal requests can't run with a user. We need to remove bans with the current thread context.
final Runnable removeBansRunnable = transportService.getThreadPool().getThreadContext()
.preserveContext(() -> removeBanOnNodes(task, childrenNodes));
// We remove bans after all child tasks are completed although in theory we can do it on a per-node basis.
completedListener.whenComplete(r -> removeBanOnNodes(task, childrenNodes), e -> removeBanOnNodes(task, childrenNodes));
completedListener.whenComplete(r -> removeBansRunnable.run(), e -> removeBansRunnable.run());
// if wait_for_completion is true, then only return when (1) bans are placed on child nodes, (2) child tasks are
// completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child nodes.
if (waitForCompletion) {
@ -162,6 +168,7 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
@Override
public void handleException(TransportException exp) {
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", banRequest.parentTaskId, node);
groupedListener.onFailure(exp);
}
@ -174,7 +181,13 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()));
for (DiscoveryNode node : childNodes) {
logger.trace("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
assert ExceptionsHelper.unwrapCause(exp) instanceof ElasticsearchSecurityException == false;
logger.info("failed to remove the parent ban for task {} on node {}", request.parentTaskId, node);
}
});
}
}