From 24cfca53fa71dc63700d6e83336095b28a0e9c72 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 20 Jun 2019 10:06:14 +0200 Subject: [PATCH] Reconnect remote cluster when seeds are changed (#43379) The RemoteClusterService should close the current RemoteClusterConnection and should build it again if the seeds are changed, similarly to what is done when the ping interval or the compression settings are changed. Closes #37799 --- .../transport/RemoteClusterService.java | 13 +++- .../transport/RemoteClusterServiceTests.java | 76 +++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index dbd9cca5271..ccf8876318f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -233,7 +233,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, getNodePredicate(settings), proxyAddress, connectionProfile); remoteClusters.put(clusterAlias, remote); - } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)) { + } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile) + || seedsChanged(remote.getSeedNodes(), seedList)) { // New ConnectionProfile. Must tear down existing connection try { IOUtils.close(remote); @@ -472,6 +473,16 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; } + private boolean seedsChanged(final List>> oldSeedNodes, + final List>> newSeedNodes) { + if (oldSeedNodes.size() != newSeedNodes.size()) { + return true; + } + Set oldSeeds = oldSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet()); + Set newSeeds = newSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet()); + return oldSeeds.equals(newSeeds) == false; + } + /** * Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode} * function on success. diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 200df75cc6c..41695872f03 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -882,6 +883,81 @@ public class RemoteClusterServiceTests extends ESTestCase { } } + public void testReconnectWhenSeedsNodesAreUpdated() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster_node_0 = startTransport("cluster_node_0", knownNodes, Version.CURRENT); + MockTransportService cluster_node_1 = startTransport("cluster_node_1", knownNodes, Version.CURRENT)) { + + final DiscoveryNode node0 = cluster_node_0.getLocalDiscoNode(); + final DiscoveryNode node1 = cluster_node_1.getLocalDiscoNode(); + knownNodes.add(node0); + knownNodes.add(node1); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = + MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + + final Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + + final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); + assertFalse(firstRemoteClusterConnection.isClosed()); + + final CountDownLatch firstLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_test", + Collections.singletonList(node0.getAddress().toString()), null, + genericProfile("cluster_test"), connectionListener(firstLatch)); + firstLatch.await(); + + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); + assertFalse(firstRemoteClusterConnection.isClosed()); + assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test")); + + final List newSeeds = new ArrayList<>(); + newSeeds.add(node1.getAddress().toString()); + if (randomBoolean()) { + newSeeds.add(node0.getAddress().toString()); + Collections.shuffle(newSeeds, random()); + } + + final CountDownLatch secondLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_test", + newSeeds, null, + genericProfile("cluster_test"), connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(service.isCrossClusterSearchEnabled()); + assertBusy(() -> { + assertFalse(firstRemoteClusterConnection.isNodeConnected(node0)); + assertFalse(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected()); + assertTrue(firstRemoteClusterConnection.isClosed()); + }); + + final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); + assertTrue(secondRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(secondRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected()); + assertFalse(secondRemoteClusterConnection.isClosed()); + } + } + } + } + public void testRemoteClusterWithProxy() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT);