From e9f6210dacaf3faf62c14860172a345b858d9041 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 6 Jul 2017 19:18:10 +0200 Subject: [PATCH] Add cluster name validation to RemoteClusterConnection (#25568) This change adds validation to the RemoteClusterConnection to ensure we always use seed nodes from the same cluster. While we still allow to use an arbitrary cluster alias we ensure that we, once we connected to a cluster the first time, we always check against that initial cluster name when we execute a seed node handshake. --- .../transport/RemoteClusterConnection.java | 17 +++++- .../RemoteClusterConnectionTests.java | 55 ++++++++++++++++++- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index af8ecdbf535..39fb515984f 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; @@ -33,6 +34,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; @@ -86,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final Predicate nodePredicate; private volatile List seedNodes; private final ConnectHandler connectHandler; + private SetOnce remoteClusterName = new SetOnce<>(); /** * Creates a new {@link RemoteClusterConnection} @@ -406,8 +409,14 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); boolean success = false; try { - handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), - (c) -> true); + try { + handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), + (c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get())); + } catch (IllegalStateException ex) { + logger.warn((Supplier) () -> new ParameterizedMessage("seed node {} cluster name mismatch expected " + + "cluster name {}", connection.getNode(), remoteClusterName.get()), ex); + throw ex; + } if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { transportService.connectToNode(handshakeNode, remoteProfile); connectedNodes.add(handshakeNode); @@ -501,6 +510,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo public void handleResponse(ClusterStateResponse response) { assert transportService.getThreadPool().getThreadContext().isSystemContext() == false : "context is a system context"; try { + if (remoteClusterName.get() == null) { + assert response.getClusterName().value() != null; + remoteClusterName.set(response.getClusterName()); + } try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes // we have to close this connection before we notify listeners - this is mainly needed for test correctness // since if we do it afterwards we might fail assertions that check if all high level connections are closed. diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index c872c4a39be..d70032ca065 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -56,6 +56,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.hamcrest.Matchers; import java.io.IOException; import java.net.InetAddress; @@ -106,6 +107,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { final Settings settings) { boolean success = false; final Settings s = Settings.builder().put(settings).put("node.name", id).build(); + ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(s); MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null); try { newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ClusterSearchShardsRequest::new, ThreadPool.Names.SAME, @@ -119,8 +121,8 @@ public class RemoteClusterConnectionTests extends ESTestCase { for (DiscoveryNode node : knownNodes) { builder.add(node); } - ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(builder.build()).build(); - channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L)); + ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build(); + channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L)); }); newService.start(); newService.acceptIncomingRequests(); @@ -906,4 +908,53 @@ public class RemoteClusterConnectionTests extends ESTestCase { } } + public void testClusterNameIsChecked() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + List otherClusterKnownNodes = new CopyOnWriteArrayList<>(); + + Settings settings = Settings.builder().put("cluster.name", "testClusterNameIsChecked").build(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool, + settings); + MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); + MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + otherClusterKnownNodes.add(otherClusterDiscoverable.getLocalDiscoNode()); + otherClusterKnownNodes.add(otherClusterTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + List discoveryNodes = Arrays.asList(otherClusterTransport.getLocalDiscoNode(), seedNode); + Collections.shuffle(discoveryNodes, random()); + updateSeedNodes(connection, discoveryNodes); + assertTrue(service.nodeConnected(seedNode)); + for (DiscoveryNode otherClusterNode : otherClusterKnownNodes) { + assertFalse(service.nodeConnected(otherClusterNode)); + } + assertFalse(service.nodeConnected(otherClusterTransport.getLocalDiscoNode())); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> + updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode()))); + assertThat(illegalStateException.getMessage(), + Matchers.startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" + + " - {other_cluster_discoverable_node}")); + } + } + } + } + }