diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index af8ecdbf535..39fb515984f 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; @@ -33,6 +34,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; @@ -86,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final Predicate nodePredicate; private volatile List seedNodes; private final ConnectHandler connectHandler; + private SetOnce remoteClusterName = new SetOnce<>(); /** * Creates a new {@link RemoteClusterConnection} @@ -406,8 +409,14 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); boolean success = false; try { - handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), - (c) -> true); + try { + handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), + (c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get())); + } catch (IllegalStateException ex) { + logger.warn((Supplier) () -> new ParameterizedMessage("seed node {} cluster name mismatch expected " + + "cluster name {}", connection.getNode(), remoteClusterName.get()), ex); + throw ex; + } if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { transportService.connectToNode(handshakeNode, remoteProfile); connectedNodes.add(handshakeNode); @@ -501,6 +510,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo public void handleResponse(ClusterStateResponse response) { assert transportService.getThreadPool().getThreadContext().isSystemContext() == false : "context is a system context"; try { + if (remoteClusterName.get() == null) { + assert response.getClusterName().value() != null; + remoteClusterName.set(response.getClusterName()); + } try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes // we have to close this connection before we notify listeners - this is mainly needed for test correctness // since if we do it afterwards we might fail assertions that check if all high level connections are closed. diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index c872c4a39be..d70032ca065 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -56,6 +56,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.hamcrest.Matchers; import java.io.IOException; import java.net.InetAddress; @@ -106,6 +107,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { final Settings settings) { boolean success = false; final Settings s = Settings.builder().put(settings).put("node.name", id).build(); + ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(s); MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null); try { newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ClusterSearchShardsRequest::new, ThreadPool.Names.SAME, @@ -119,8 +121,8 @@ public class RemoteClusterConnectionTests extends ESTestCase { for (DiscoveryNode node : knownNodes) { builder.add(node); } - ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(builder.build()).build(); - channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L)); + ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build(); + channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L)); }); newService.start(); newService.acceptIncomingRequests(); @@ -906,4 +908,53 @@ public class RemoteClusterConnectionTests extends ESTestCase { } } + public void testClusterNameIsChecked() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + List otherClusterKnownNodes = new CopyOnWriteArrayList<>(); + + Settings settings = Settings.builder().put("cluster.name", "testClusterNameIsChecked").build(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool, + settings); + MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); + MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + otherClusterKnownNodes.add(otherClusterDiscoverable.getLocalDiscoNode()); + otherClusterKnownNodes.add(otherClusterTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + List discoveryNodes = Arrays.asList(otherClusterTransport.getLocalDiscoNode(), seedNode); + Collections.shuffle(discoveryNodes, random()); + updateSeedNodes(connection, discoveryNodes); + assertTrue(service.nodeConnected(seedNode)); + for (DiscoveryNode otherClusterNode : otherClusterKnownNodes) { + assertFalse(service.nodeConnected(otherClusterNode)); + } + assertFalse(service.nodeConnected(otherClusterTransport.getLocalDiscoNode())); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> + updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode()))); + assertThat(illegalStateException.getMessage(), + Matchers.startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" + + " - {other_cluster_discoverable_node}")); + } + } + } + } + }