From c9b0816b2983915413faa068ea95eb08c7990854 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 8 Jul 2014 14:22:39 +0200 Subject: [PATCH] [Discovery] verify connect when sending a rejoin cluster request When a master receives a cluster state from another node, it compares the local cluster state with the one it got. If the local one has a higher version, it sends a JoinClusterRequest to the other master to tell it step down. Because our network layer is asymmetric, we need to make sure we're connected before sending. Closes #6779 --- .../discovery/zen/ZenDiscovery.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index deb34f3523d..a16d6212863 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -554,7 +554,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return; } if (master) { - logger.debug("received cluster state from [{}] which is also master but with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName); + logger.debug("received cluster state from [{}] which is also master with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName); final ClusterState newState = newClusterState; clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { @Override @@ -564,12 +564,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return rejoin(currentState, "zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]"); } else { logger.warn("received cluster state from [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster", newState.nodes().masterNode(), newState.nodes().masterNode()); - transportService.sendRequest(newState.nodes().masterNode(), RejoinClusterRequestHandler.ACTION, new RejoinClusterRequest(currentState.nodes().localNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.warn("failed to send rejoin request to [{}]", exp, newState.nodes().masterNode()); - } - }); + + try { + // make sure we're connected to this node (connect to node does nothing if we're already connected) + // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node + // in the past (after a master failure, for example) + transportService.connectToNode(newState.nodes().masterNode()); + transportService.sendRequest(newState.nodes().masterNode(), RejoinClusterRequestHandler.ACTION, new RejoinClusterRequest(currentState.nodes().localNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + logger.warn("failed to send rejoin request to [{}]", exp, newState.nodes().masterNode()); + } + }); + } catch (Exception e) { + logger.warn("failed to send rejoin request to [{}]", e, newState.nodes().masterNode()); + } + return currentState; } }