diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index dbd9cca5271..ccf8876318f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -233,7 +233,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, getNodePredicate(settings), proxyAddress, connectionProfile); remoteClusters.put(clusterAlias, remote); - } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)) { + } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile) + || seedsChanged(remote.getSeedNodes(), seedList)) { // New ConnectionProfile. Must tear down existing connection try { IOUtils.close(remote); @@ -472,6 +473,16 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; } + private boolean seedsChanged(final List>> oldSeedNodes, + final List>> newSeedNodes) { + if (oldSeedNodes.size() != newSeedNodes.size()) { + return true; + } + Set oldSeeds = oldSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet()); + Set newSeeds = newSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet()); + return oldSeeds.equals(newSeeds) == false; + } + /** * Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode} * function on success. diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 200df75cc6c..41695872f03 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -882,6 +883,81 @@ public class RemoteClusterServiceTests extends ESTestCase { } } + public void testReconnectWhenSeedsNodesAreUpdated() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster_node_0 = startTransport("cluster_node_0", knownNodes, Version.CURRENT); + MockTransportService cluster_node_1 = startTransport("cluster_node_1", knownNodes, Version.CURRENT)) { + + final DiscoveryNode node0 = cluster_node_0.getLocalDiscoNode(); + final DiscoveryNode node1 = cluster_node_1.getLocalDiscoNode(); + knownNodes.add(node0); + knownNodes.add(node1); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = + MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + + final Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + + final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); + assertFalse(firstRemoteClusterConnection.isClosed()); + + final CountDownLatch firstLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_test", + Collections.singletonList(node0.getAddress().toString()), null, + genericProfile("cluster_test"), connectionListener(firstLatch)); + firstLatch.await(); + + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); + assertFalse(firstRemoteClusterConnection.isClosed()); + assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test")); + + final List newSeeds = new ArrayList<>(); + newSeeds.add(node1.getAddress().toString()); + if (randomBoolean()) { + newSeeds.add(node0.getAddress().toString()); + Collections.shuffle(newSeeds, random()); + } + + final CountDownLatch secondLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_test", + newSeeds, null, + genericProfile("cluster_test"), connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(service.isCrossClusterSearchEnabled()); + assertBusy(() -> { + assertFalse(firstRemoteClusterConnection.isNodeConnected(node0)); + assertFalse(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected()); + assertTrue(firstRemoteClusterConnection.isClosed()); + }); + + final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); + assertTrue(secondRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(secondRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected()); + assertFalse(secondRemoteClusterConnection.isClosed()); + } + } + } + } + public void testRemoteClusterWithProxy() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT);