Improve connection closing in `RemoteClusterConnection` (#22804)
Some tests verify that all connection have been closed but due to the async / concurrent nature of `RemoteClusterConnection` there are situations where we notify listeners that trigger tests to finish before we actually closed all connections. The race is very very small and has no impact on the code correctness. This commit documents and improves the way we close connections to ensure test won't fail with false positives. Closes #22803
This commit is contained in:
parent
f5a0d18c4c
commit
f128b7a7fe
|
@ -370,6 +370,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
ClusterStateRequest request = new ClusterStateRequest();
|
||||
request.clear();
|
||||
request.nodes(true);
|
||||
// here we pass on the connection since we can only close it once the sendRequest returns otherwise
|
||||
// due to the async nature (it will return before it's actually sent) this can cause the request to fail
|
||||
// due to an already closed connection.
|
||||
transportService.sendRequest(connection,
|
||||
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
||||
new SniffClusterStateResponseHandler(transportService, connection, listener, seedNodes,
|
||||
|
@ -443,24 +446,30 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
@Override
|
||||
public void handleResponse(ClusterStateResponse response) {
|
||||
try {
|
||||
cancellableThreads.executeIO(() -> {
|
||||
DiscoveryNodes nodes = response.getState().nodes();
|
||||
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
|
||||
for (DiscoveryNode node : nodesIter) {
|
||||
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
|
||||
try {
|
||||
transportService.connectToNode(node, remoteProfile); // noop if node is connected
|
||||
connectedNodes.add(node);
|
||||
} catch (ConnectTransportException | IllegalStateException ex) {
|
||||
// ISE if we fail the handshake with an version incompatible node
|
||||
// fair enough we can't connect just move on
|
||||
logger.debug((Supplier<?>)
|
||||
() -> new ParameterizedMessage("failed to connect to node {}", node), ex);
|
||||
try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes
|
||||
// we have to close this connection before we notify listeners - this is mainly needed for test correctness
|
||||
// since if we do it afterwards we might fail assertions that check if all high level connections are closed.
|
||||
// from a code correctness perspective we could also close it afterwards. This try/with block will
|
||||
// maintain the possibly exceptions thrown from within the try block and suppress the ones that are possible thrown
|
||||
// by closing the connection
|
||||
cancellableThreads.executeIO(() -> {
|
||||
DiscoveryNodes nodes = response.getState().nodes();
|
||||
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
|
||||
for (DiscoveryNode node : nodesIter) {
|
||||
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
|
||||
try {
|
||||
transportService.connectToNode(node, remoteProfile); // noop if node is connected
|
||||
connectedNodes.add(node);
|
||||
} catch (ConnectTransportException | IllegalStateException ex) {
|
||||
// ISE if we fail the handshake with an version incompatible node
|
||||
// fair enough we can't connect just move on
|
||||
logger.debug((Supplier<?>)
|
||||
() -> new ParameterizedMessage("failed to connect to node {}", node), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
connection.close();
|
||||
});
|
||||
}
|
||||
listener.onResponse(null);
|
||||
} catch (CancellableThreads.ExecutionCancelledException ex) {
|
||||
listener.onFailure(ex); // we got canceled - fail the listener and step out
|
||||
|
@ -469,9 +478,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
|
||||
clusterAlias), ex);
|
||||
collectRemoteNodes(seedNodes, transportService, listener);
|
||||
} finally {
|
||||
// just to make sure we don't leak anything we close the connection here again even if we managed to do so before
|
||||
IOUtils.closeWhileHandlingException(connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue