Reconnect remote cluster when seeds are changed (#43379)

The RemoteClusterService should close the current 
RemoteClusterConnection and should build it again if 
the seeds are changed, similarly to what is done when 
the ping interval or the compression settings are changed.

Closes #37799
This commit is contained in:
Tanguy Leroux 2019-06-20 10:06:14 +02:00
parent d684119618
commit 24cfca53fa
2 changed files with 88 additions and 1 deletions

View File

@ -233,7 +233,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections,
getNodePredicate(settings), proxyAddress, connectionProfile); getNodePredicate(settings), proxyAddress, connectionProfile);
remoteClusters.put(clusterAlias, remote); remoteClusters.put(clusterAlias, remote);
} else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)) { } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)
|| seedsChanged(remote.getSeedNodes(), seedList)) {
// New ConnectionProfile. Must tear down existing connection // New ConnectionProfile. Must tear down existing connection
try { try {
IOUtils.close(remote); IOUtils.close(remote);
@ -472,6 +473,16 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|| Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false;
} }
private boolean seedsChanged(final List<Tuple<String, Supplier<DiscoveryNode>>> oldSeedNodes,
final List<Tuple<String, Supplier<DiscoveryNode>>> newSeedNodes) {
if (oldSeedNodes.size() != newSeedNodes.size()) {
return true;
}
Set<String> oldSeeds = oldSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet());
Set<String> newSeeds = newSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet());
return oldSeeds.equals(newSeeds) == false;
}
/** /**
* Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode} * Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode}
* function on success. * function on success.

View File

@ -41,6 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -882,6 +883,81 @@ public class RemoteClusterServiceTests extends ESTestCase {
} }
} }
public void testReconnectWhenSeedsNodesAreUpdated() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService cluster_node_0 = startTransport("cluster_node_0", knownNodes, Version.CURRENT);
MockTransportService cluster_node_1 = startTransport("cluster_node_1", knownNodes, Version.CURRENT)) {
final DiscoveryNode node0 = cluster_node_0.getLocalDiscoNode();
final DiscoveryNode node1 = cluster_node_1.getLocalDiscoNode();
knownNodes.add(node0);
knownNodes.add(node1);
Collections.shuffle(knownNodes, random());
try (MockTransportService transportService =
MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
transportService.start();
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertTrue(service.isCrossClusterSearchEnabled());
final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");
assertTrue(firstRemoteClusterConnection.isNodeConnected(node0));
assertTrue(firstRemoteClusterConnection.isNodeConnected(node1));
assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected());
assertFalse(firstRemoteClusterConnection.isClosed());
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_test",
Collections.singletonList(node0.getAddress().toString()), null,
genericProfile("cluster_test"), connectionListener(firstLatch));
firstLatch.await();
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(firstRemoteClusterConnection.isNodeConnected(node0));
assertTrue(firstRemoteClusterConnection.isNodeConnected(node1));
assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected());
assertFalse(firstRemoteClusterConnection.isClosed());
assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test"));
final List<String> newSeeds = new ArrayList<>();
newSeeds.add(node1.getAddress().toString());
if (randomBoolean()) {
newSeeds.add(node0.getAddress().toString());
Collections.shuffle(newSeeds, random());
}
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_test",
newSeeds, null,
genericProfile("cluster_test"), connectionListener(secondLatch));
secondLatch.await();
assertTrue(service.isCrossClusterSearchEnabled());
assertBusy(() -> {
assertFalse(firstRemoteClusterConnection.isNodeConnected(node0));
assertFalse(firstRemoteClusterConnection.isNodeConnected(node1));
assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected());
assertTrue(firstRemoteClusterConnection.isClosed());
});
final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");
assertTrue(secondRemoteClusterConnection.isNodeConnected(node0));
assertTrue(secondRemoteClusterConnection.isNodeConnected(node1));
assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected());
assertFalse(secondRemoteClusterConnection.isClosed());
}
}
}
}
public void testRemoteClusterWithProxy() throws Exception { public void testRemoteClusterWithProxy() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>(); List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT); try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT);