[Discovery] verify we have a master after a successful join request

After master election, nodes send join requests to the elected master. Master is then responsible for publishing a new cluster state which sets the master on the local node's cluster state. If something goes wrong with the cluster state publishing, this process will not successfully complete. We should check it after the join request returns and if it failed, retry pinging.

Closes #6969
This commit is contained in:
Boaz Leskes 2014-07-23 01:03:18 +03:00
parent ffcf1077d8
commit cccd060a0c
3 changed files with 77 additions and 4 deletions

View File

@ -364,6 +364,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
continue; continue;
} }
if (latestDiscoNodes.masterNode() == null) {
logger.debug("no master node is set, despite of join request completing. retrying pings");
retry = true;
continue;
}
masterFD.start(masterNode, "initial_join"); masterFD.start(masterNode, "initial_join");
// no need to submit the received cluster state, we will get it from the master when it publishes // no need to submit the received cluster state, we will get it from the master when it publishes
// the fact that we joined // the fact that we joined

View File

@ -25,10 +25,13 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction; import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -45,10 +48,11 @@ import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.*; import org.elasticsearch.test.disruption.*;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.*;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -93,10 +97,14 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
} }
private List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException { private List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException {
return startCluster(numberOfNodes, -1);
}
private List<String> startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
if (randomBoolean()) { if (randomBoolean()) {
return startMulticastCluster(numberOfNodes, -1); return startMulticastCluster(numberOfNodes, minimumMasterNode);
} else { } else {
return startUnicastCluster(numberOfNodes, null, -1); return startUnicastCluster(numberOfNodes, null, minimumMasterNode);
} }
} }
@ -585,6 +593,56 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertMaster(masterNode, nodes); assertMaster(masterNode, nodes);
} }
/** Test cluster join with issues in cluster state publishing * */
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE")
public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
List<String> nodes = startCluster(2, 1);
String masterNode = internalCluster().getMasterName();
String nonMasterNode;
if (masterNode.equals(nodes.get(0))) {
nonMasterNode = nodes.get(1);
} else {
nonMasterNode = nodes.get(0);
}
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode);
nonMasterTransportService.addFailToSendNoConnectRule(discoveryNodes.masterNode());
assertNoMaster(nonMasterNode);
logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), "discovery/zen/publish");
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
final CountDownLatch countDownLatch = new CountDownLatch(2);
nonMasterTransportService.addDelegate(discoveryNodes.masterNode(), new MockTransportService.DelegateTransport(nonMasterTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (action.equals("discovery/zen/join")) {
countDownLatch.countDown();
}
super.sendRequest(node, requestId, action, request, options);
}
});
countDownLatch.await();
logger.info("waiting for cluster to reform");
masterTransportService.clearRule(discoveryNodes.localNode());
nonMasterTransportService.clearRule(discoveryNodes.masterNode());
ensureStableCluster(2);
}
protected NetworkPartition addRandomPartition() { protected NetworkPartition addRandomPartition() {
NetworkPartition partition; NetworkPartition partition;
if (randomBoolean()) { if (randomBoolean()) {

View File

@ -37,6 +37,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -98,6 +100,13 @@ public class MockTransportService extends TransportService {
}); });
} }
/**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/
public void addFailToSendNoConnectRule(DiscoveryNode node, final String... blockedActions) {
addFailToSendNoConnectRule(node, new HashSet<>(Arrays.asList(blockedActions)));
}
/** /**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions * Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/ */
@ -307,11 +316,11 @@ public class MockTransportService extends TransportService {
protected final Transport transport; protected final Transport transport;
public DelegateTransport(Transport transport) { public DelegateTransport(Transport transport) {
this.transport = transport; this.transport = transport;
} }
@Override @Override
public void transportServiceAdapter(TransportServiceAdapter service) { public void transportServiceAdapter(TransportServiceAdapter service) {
transport.transportServiceAdapter(service); transport.transportServiceAdapter(service);