Add cluster name validation to RemoteClusterConnection (#25568)

This change adds validation to the RemoteClusterConnection to ensure
we always use seed nodes from the same cluster. While we still allow to use
an arbitrary cluster alias we ensure that we, once we connected to a cluster the first time,
we always check against that initial cluster name when we execute a seed node handshake.
This commit is contained in:
Simon Willnauer 2017-07-06 19:18:10 +02:00 committed by GitHub
parent dda68643b6
commit e9f6210dac
2 changed files with 68 additions and 4 deletions

View File

@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
@ -33,6 +34,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
@ -86,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final Predicate<DiscoveryNode> nodePredicate;
private volatile List<DiscoveryNode> seedNodes;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
/**
* Creates a new {@link RemoteClusterConnection}
@ -405,9 +408,15 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
boolean success = false;
try {
try {
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
(c) -> true);
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
} catch (IllegalStateException ex) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
throw ex;
}
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
transportService.connectToNode(handshakeNode, remoteProfile);
connectedNodes.add(handshakeNode);
@ -501,6 +510,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
public void handleResponse(ClusterStateResponse response) {
assert transportService.getThreadPool().getThreadContext().isSystemContext() == false : "context is a system context";
try {
if (remoteClusterName.get() == null) {
assert response.getClusterName().value() != null;
remoteClusterName.set(response.getClusterName());
}
try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes
// we have to close this connection before we notify listeners - this is mainly needed for test correctness
// since if we do it afterwards we might fail assertions that check if all high level connections are closed.

View File

@ -56,6 +56,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.net.InetAddress;
@ -106,6 +107,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
final Settings settings) {
boolean success = false;
final Settings s = Settings.builder().put(settings).put("node.name", id).build();
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(s);
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ClusterSearchShardsRequest::new, ThreadPool.Names.SAME,
@ -119,8 +121,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
for (DiscoveryNode node : knownNodes) {
builder.add(node);
}
ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(builder.build()).build();
channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L));
ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build();
channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L));
});
newService.start();
newService.acceptIncomingRequests();
@ -906,4 +908,53 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
public void testClusterNameIsChecked() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
List<DiscoveryNode> otherClusterKnownNodes = new CopyOnWriteArrayList<>();
Settings settings = Settings.builder().put("cluster.name", "testClusterNameIsChecked").build();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool,
settings);
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
otherClusterKnownNodes.add(otherClusterDiscoverable.getLocalDiscoNode());
otherClusterKnownNodes.add(otherClusterTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
List<DiscoveryNode> discoveryNodes = Arrays.asList(otherClusterTransport.getLocalDiscoNode(), seedNode);
Collections.shuffle(discoveryNodes, random());
updateSeedNodes(connection, discoveryNodes);
assertTrue(service.nodeConnected(seedNode));
for (DiscoveryNode otherClusterNode : otherClusterKnownNodes) {
assertFalse(service.nodeConnected(otherClusterNode));
}
assertFalse(service.nodeConnected(otherClusterTransport.getLocalDiscoNode()));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () ->
updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode())));
assertThat(illegalStateException.getMessage(),
Matchers.startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" +
" - {other_cluster_discoverable_node}"));
}
}
}
}
}