From cccd060a0c1e052b261a15b4edf671b10e13ee1d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 23 Jul 2014 01:03:18 +0300 Subject: [PATCH] [Discovery] verify we have a master after a successful join request After master election, nodes send join requests to the elected master. Master is then responsible for publishing a new cluster state which sets the master on the local node's cluster state. If something goes wrong with the cluster state publishing, this process will not successfully complete. We should check it after the join request returns and if it failed, retry pinging. Closes #6969 --- .../discovery/zen/ZenDiscovery.java | 6 ++ .../DiscoveryWithNetworkFailuresTests.java | 64 ++++++++++++++++++- .../test/transport/MockTransportService.java | 11 +++- 3 files changed, 77 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f1621484df9..fa40467da81 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -364,6 +364,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen continue; } + if (latestDiscoNodes.masterNode() == null) { + logger.debug("no master node is set, despite of join request completing. retrying pings"); + retry = true; + continue; + } + masterFD.start(masterNode, "initial_join"); // no need to submit the received cluster state, we will get it from the master when it publishes // the fact that we joined diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java index b03c6ea6b66..a08ba12f4fb 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java @@ -25,10 +25,13 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; @@ -45,10 +48,11 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.*; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportModule; +import org.elasticsearch.transport.*; import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -93,10 +97,14 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } private List startCluster(int numberOfNodes) throws ExecutionException, InterruptedException { + return startCluster(numberOfNodes, -1); + } + + private List startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { if (randomBoolean()) { - return startMulticastCluster(numberOfNodes, -1); + return startMulticastCluster(numberOfNodes, minimumMasterNode); } else { - return startUnicastCluster(numberOfNodes, null, -1); + return startUnicastCluster(numberOfNodes, null, minimumMasterNode); } } @@ -585,6 +593,56 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT assertMaster(masterNode, nodes); } + + /** Test cluster join with issues in cluster state publishing * */ + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE") + public void testClusterJoinDespiteOfPublishingIssues() throws Exception { + List nodes = startCluster(2, 1); + + String masterNode = internalCluster().getMasterName(); + String nonMasterNode; + if (masterNode.equals(nodes.get(0))) { + nonMasterNode = nodes.get(1); + } else { + nonMasterNode = nodes.get(0); + } + + DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes(); + + logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode); + MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode); + nonMasterTransportService.addFailToSendNoConnectRule(discoveryNodes.masterNode()); + + assertNoMaster(nonMasterNode); + + logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode); + MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); + masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), "discovery/zen/publish"); + + logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode); + final CountDownLatch countDownLatch = new CountDownLatch(2); + nonMasterTransportService.addDelegate(discoveryNodes.masterNode(), new MockTransportService.DelegateTransport(nonMasterTransportService.original()) { + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + if (action.equals("discovery/zen/join")) { + countDownLatch.countDown(); + } + super.sendRequest(node, requestId, action, request, options); + } + }); + + countDownLatch.await(); + + logger.info("waiting for cluster to reform"); + masterTransportService.clearRule(discoveryNodes.localNode()); + nonMasterTransportService.clearRule(discoveryNodes.masterNode()); + + ensureStableCluster(2); + + } + + protected NetworkPartition addRandomPartition() { NetworkPartition partition; if (randomBoolean()) { diff --git a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java index 3f180f9c5e5..cf088bab476 100644 --- a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java @@ -37,6 +37,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -98,6 +100,13 @@ public class MockTransportService extends TransportService { }); } + /** + * Adds a rule that will cause matching operations to throw ConnectTransportExceptions + */ + public void addFailToSendNoConnectRule(DiscoveryNode node, final String... blockedActions) { + addFailToSendNoConnectRule(node, new HashSet<>(Arrays.asList(blockedActions))); + } + /** * Adds a rule that will cause matching operations to throw ConnectTransportExceptions */ @@ -307,11 +316,11 @@ public class MockTransportService extends TransportService { protected final Transport transport; + public DelegateTransport(Transport transport) { this.transport = transport; } - @Override public void transportServiceAdapter(TransportServiceAdapter service) { transport.transportServiceAdapter(service);