Fail sniff process if no connections opened (#54934)
Currently the remote cluster sniff connection process can succeed even if no connections are opened. This commit fixes this by failing the connection process if no connections are successfully opened.
This commit is contained in:
parent
96a903b17f
commit
98fba92022
|
@ -373,14 +373,6 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|||
}
|
||||
}
|
||||
|
||||
List<String> getSeedNodes() {
|
||||
return configuredSeedNodes;
|
||||
}
|
||||
|
||||
int getMaxConnections() {
|
||||
return maxNumRemoteConnections;
|
||||
}
|
||||
|
||||
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
|
||||
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
|
||||
|
||||
|
@ -438,7 +430,12 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|||
// since if we do it afterwards we might fail assertions that check if all high level connections are closed.
|
||||
// from a code correctness perspective we could also close it afterwards.
|
||||
IOUtils.closeWhileHandlingException(connection);
|
||||
listener.onResponse(null);
|
||||
int openConnections = connectionManager.size();
|
||||
if (openConnections == 0) {
|
||||
listener.onFailure(new IllegalStateException("Unable to open any connections to remote cluster [" + clusterAlias + "]"));
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -324,6 +324,34 @@ public class SniffConnectionStrategyTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testConnectFailsIfNoConnectionsOpened() {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
|
||||
MockTransportService closedTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
|
||||
DiscoveryNode seedNode = seedTransport.getLocalNode();
|
||||
DiscoveryNode discoverableNode = closedTransport.getLocalNode();
|
||||
knownNodes.add(discoverableNode);
|
||||
closedTransport.close();
|
||||
|
||||
try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
|
||||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
// Predicate excludes seed node as a possible connection
|
||||
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
null, 3, n -> n.equals(seedNode) == false, seedNodes(seedNode))) {
|
||||
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
|
||||
strategy.connect(connectFuture);
|
||||
final IllegalStateException ise = expectThrows(IllegalStateException.class, connectFuture::actionGet);
|
||||
assertEquals("Unable to open any connections to remote cluster [cluster-alias]", ise.getMessage());
|
||||
assertTrue(strategy.assertNoRunningConnections());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testClusterNameValidationPreventConnectingToDifferentClusters() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
List<DiscoveryNode> otherKnownNodes = new CopyOnWriteArrayList<>();
|
||||
|
|
Loading…
Reference in New Issue