Use system context for looking up connected nodes (#43991)
When finding nodes in a connected cluster for cross cluster search the requests to get cluster state on the connected cluster should be made in the system context because logically they are equivalent to checking a single detail in the local cluster state and should not require that the user who made the request that is using this method in its implementation is authorized to view the entire cluster state. Fixes #43974
This commit is contained in:
parent
899c62ad02
commit
5e3010a606
|
@ -192,39 +192,52 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
||||||
/**
|
/**
|
||||||
* Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function
|
* Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function
|
||||||
* that returns <code>null</code> if the node ID is not found.
|
* that returns <code>null</code> if the node ID is not found.
|
||||||
|
*
|
||||||
|
* The requests to get cluster state on the connected cluster are made in the system context because logically
|
||||||
|
* they are equivalent to checking a single detail in the local cluster state and should not require that the
|
||||||
|
* user who made the request that is using this method in its implementation is authorized to view the entire
|
||||||
|
* cluster state.
|
||||||
*/
|
*/
|
||||||
void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
|
void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
|
||||||
Runnable runnable = () -> {
|
Runnable runnable = () -> {
|
||||||
final ClusterStateRequest request = new ClusterStateRequest();
|
final ThreadContext threadContext = threadPool.getThreadContext();
|
||||||
request.clear();
|
final ContextPreservingActionListener<Function<String, DiscoveryNode>> contextPreservingActionListener =
|
||||||
request.nodes(true);
|
new ContextPreservingActionListener<>(threadContext.newRestorableContext(false), listener);
|
||||||
request.local(true); // run this on the node that gets the request it's as good as any other
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
final DiscoveryNode node = getAnyConnectedNode();
|
// we stash any context here since this is an internal execution and should not leak any existing context information
|
||||||
Transport.Connection connection = connectionManager.getConnection(node);
|
threadContext.markAsSystemContext();
|
||||||
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
|
||||||
new TransportResponseHandler<ClusterStateResponse>() {
|
|
||||||
|
|
||||||
@Override
|
final ClusterStateRequest request = new ClusterStateRequest();
|
||||||
public ClusterStateResponse read(StreamInput in) throws IOException {
|
request.clear();
|
||||||
return new ClusterStateResponse(in);
|
request.nodes(true);
|
||||||
}
|
request.local(true); // run this on the node that gets the request it's as good as any other
|
||||||
|
final DiscoveryNode node = getAnyConnectedNode();
|
||||||
|
Transport.Connection connection = connectionManager.getConnection(node);
|
||||||
|
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
||||||
|
new TransportResponseHandler<ClusterStateResponse>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleResponse(ClusterStateResponse response) {
|
public ClusterStateResponse read(StreamInput in) throws IOException {
|
||||||
DiscoveryNodes nodes = response.getState().nodes();
|
return new ClusterStateResponse(in);
|
||||||
listener.onResponse(nodes::get);
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleException(TransportException exp) {
|
public void handleResponse(ClusterStateResponse response) {
|
||||||
listener.onFailure(exp);
|
DiscoveryNodes nodes = response.getState().nodes();
|
||||||
}
|
contextPreservingActionListener.onResponse(nodes::get);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String executor() {
|
public void handleException(TransportException exp) {
|
||||||
return ThreadPool.Names.SAME;
|
contextPreservingActionListener.onFailure(exp);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
@Override
|
||||||
|
public String executor() {
|
||||||
|
return ThreadPool.Names.SAME;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
};
|
};
|
||||||
try {
|
try {
|
||||||
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
|
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
|
||||||
|
|
Loading…
Reference in New Issue