Discovery: master fault detection fall back to cluster state thread upon error

With #7834, we simplified ZenDiscovery by making it use the current cluster state for all it's decision. This had the side effect a node may start it's Master FD before the master  has fully processed that cluster state update that adds that node (or elects the master master). This is due to the fact that master FD is started when a node receives a cluster state from the master but the master it self may still be publishing to other node.

This commit makes sure that a master FD ping is only failed once we know that there is no current cluster state update in progress.

Closes #7908
This commit is contained in:
Boaz Leskes 2014-09-29 08:45:28 +02:00
parent 168b3752ef
commit 03d880de38
6 changed files with 212 additions and 65 deletions

View File

@ -22,7 +22,10 @@ package org.elasticsearch.discovery.zen;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.*;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -180,7 +183,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
nodeSettingsService.addListener(new ApplySettings());
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName);
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterName, clusterService);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);

View File

@ -19,16 +19,20 @@
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -54,8 +58,7 @@ public class MasterFaultDetection extends FaultDetection {
void notListedOnMaster();
}
private final DiscoveryNodesProvider nodesProvider;
private final ClusterService clusterService;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
private volatile MasterPinger masterPinger;
@ -69,9 +72,9 @@ public class MasterFaultDetection extends FaultDetection {
private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();
public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
DiscoveryNodesProvider nodesProvider, ClusterName clusterName) {
ClusterName clusterName, ClusterService clusterService) {
super(settings, threadPool, transportService, clusterName);
this.nodesProvider = nodesProvider;
this.clusterService = clusterService;
logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
@ -231,7 +234,7 @@ public class MasterFaultDetection extends FaultDetection {
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
return;
}
final MasterPingRequest request = new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id(), clusterName);
final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@ -343,8 +346,8 @@ public class MasterFaultDetection extends FaultDetection {
}
@Override
public void messageReceived(MasterPingRequest request, TransportChannel channel) throws Exception {
DiscoveryNodes nodes = nodesProvider.nodes();
public void messageReceived(final MasterPingRequest request, final TransportChannel channel) throws Exception {
final DiscoveryNodes nodes = clusterService.state().nodes();
// check if we are really the same master as the one we seemed to be think we are
// this can happen if the master got "kill -9" and then another node started using the same port
if (!request.masterNodeId.equals(nodes.localNodeId())) {
@ -357,15 +360,57 @@ public class MasterFaultDetection extends FaultDetection {
throw new NotMasterException("master fault detection ping request is targeted for a different [" + request.clusterName + "] cluster then us [" + clusterName + "]");
}
// if we are no longer master, fail...
if (!nodes.localNodeMaster()) {
throw new NoLongerMasterException();
// when we are elected as master or when a node joins, we use a cluster state update thread
// to incorporate that information in the cluster state. That cluster state is published
// before we make it available locally. This means that a master ping can come from a node
// that has already processed the new CS but it is not known locally.
// Therefore, if we fail we have to check again under a cluster state thread to make sure
// all processing is finished.
//
if (!nodes.localNodeMaster() || !nodes.nodeExists(request.nodeId)) {
logger.trace("checking ping from [{}] under a cluster state thread", request.nodeId);
clusterService.submitStateUpdateTask("master ping (from: [" + request.nodeId + "])", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// if we are no longer master, fail...
DiscoveryNodes nodes = currentState.nodes();
if (!nodes.localNodeMaster()) {
throw new NoLongerMasterException();
}
if (!nodes.nodeExists(request.nodeId)) {
throw new NodeDoesNotExistOnMasterException();
}
return currentState;
}
@Override
public void onFailure(String source, @Nullable Throwable t) {
if (t == null) {
t = new ElasticsearchException("unknown error while processing ping");
}
try {
channel.sendResponse(t);
} catch (IOException e) {
logger.warn("error while sending ping response", e);
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
channel.sendResponse(new MasterPingResponseResponse(true));
} catch (IOException e) {
logger.warn("error while sending ping response", e);
}
}
});
} else {
// send a response, and note if we are connected to the master or not
channel.sendResponse(new MasterPingResponseResponse(true));
}
if (!nodes.nodeExists(request.nodeId)) {
throw new NodeDoesNotExistOnMasterException();
}
// send a response, and note if we are connected to the master or not
channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.nodeId)));
}
@Override

View File

@ -107,11 +107,36 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
}
private List<String> startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
if (randomBoolean()) {
return startMulticastCluster(numberOfNodes, minimumMasterNode);
} else {
return startUnicastCluster(numberOfNodes, null, minimumMasterNode);
configureCluster(numberOfNodes, minimumMasterNode);
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
ensureStableCluster(numberOfNodes);
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
}
}
return nodes;
}
private List<String> startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
ensureStableCluster(numberOfNodes);
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
}
}
return nodes;
}
final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder()
@ -124,7 +149,16 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build();
private List<String> startMulticastCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
private void configureCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
if (randomBoolean()) {
configureMulticastCluster(numberOfNodes, minimumMasterNode);
} else {
configureUnicastCluster(numberOfNodes, null, minimumMasterNode);
}
}
private void configureMulticastCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
@ -137,13 +171,9 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
if (discoveryConfig == null) {
discoveryConfig = new ClusterDiscoveryConfiguration(numberOfNodes, settings);
}
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
ensureStableCluster(numberOfNodes);
return nodes;
}
private List<String> startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
private void configureUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
@ -160,19 +190,6 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
}
}
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
ensureStableCluster(numberOfNodes);
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
}
}
return nodes;
}
@ -763,6 +780,68 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
}
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE")
public void testClusterFormingWithASlowNode() throws Exception {
configureCluster(3, 2);
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(getRandom(), 0, 0, 5000, 6000);
// don't wait for initial state, wat want to add the disruption while the cluster is forming..
internalCluster().startNodesAsync(3,
ImmutableSettings.builder()
.put(DiscoveryService.SETTING_INITIAL_STATE_TIMEOUT, "1ms")
.put(DiscoverySettings.PUBLISH_TIMEOUT, "3s")
.build()).get();
logger.info("applying disruption while cluster is forming ...");
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();
ensureStableCluster(3);
}
/**
* Adds an asymetric break between a master and one of the nodes and makes
* sure that the node is removed form the cluster, that the node start pinging and that
* the cluster reforms when healed.
*/
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE")
public void testNodeNotReachableFromMaster() throws Exception {
startCluster(3);
String masterNode = internalCluster().getMasterName();
String nonMasterNode = null;
while (nonMasterNode == null) {
nonMasterNode = randomFrom(internalCluster().getNodeNames());
if (nonMasterNode.equals(masterNode)) {
masterNode = null;
}
}
logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
if (randomBoolean()) {
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode());
} else {
masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode());
}
logger.info("waiting for [{}] to be removed from cluster", nonMasterNode);
ensureStableCluster(2, masterNode);
logger.info("waiting for [{}] to have no master", nonMasterNode);
assertNoMaster(nonMasterNode);
logger.info("healing partition and checking cluster reforms");
masterTransportService.clearAllRules();
ensureStableCluster(3);
}
protected NetworkPartition addRandomPartition() {
NetworkPartition partition;
if (randomBoolean()) {

View File

@ -27,12 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.cluster.NoopClusterService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportConnectionListener;
@ -172,21 +171,9 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase {
settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry)
.put(FaultDetection.SETTING_PING_INTERVAL, "5m");
ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20));
final DiscoveryNodes nodes = buildNodesForA(false);
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA,
new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
return nodes;
}
@Override
public NodeService nodeService() {
return null;
}
},
clusterName
);
final ClusterState state = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build();
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName,
new NoopClusterService(state));
masterFD.start(nodeB, "test");
final String[] failureReason = new String[1];

View File

@ -20,28 +20,50 @@ package org.elasticsearch.test.cluster;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import java.util.List;
public class NoopClusterService implements ClusterService {
final ClusterState state;
public NoopClusterService() {
this(ClusterState.builder(new ClusterName("noop")).build());
}
public NoopClusterService(ClusterState state) {
if (state.getNodes().size() == 0) {
state = ClusterState.builder(state).nodes(
DiscoveryNodes.builder()
.put(new DiscoveryNode("noop_id", DummyTransportAddress.INSTANCE, Version.CURRENT))
.localNodeId("noop_id")).build();
}
assert state.getNodes().localNode() != null;
this.state = state;
}
@Override
public DiscoveryNode localNode() {
return null;
return state.getNodes().localNode();
}
@Override
public ClusterState state() {
return null;
return state;
}
@Override

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.TimeValue;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class SlowClusterStateProcessing extends SingleNodeDisruption {
@ -99,11 +100,19 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
if (clusterService == null) {
return false;
}
final AtomicBoolean stopped = new AtomicBoolean(false);
clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(duration.millis());
long count = duration.millis() / 200;
// wait while checking for a stopped
for (; count > 0 && !stopped.get(); count--) {
Thread.sleep(200);
}
if (!stopped.get()) {
Thread.sleep(duration.millis() % 200);
}
countDownLatch.countDown();
return currentState;
}
@ -116,6 +125,7 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
try {
countDownLatch.await();
} catch (InterruptedException e) {
stopped.set(true);
// try to wait again, we really want the cluster state thread to be freed up when stopping disruption
countDownLatch.await();
}
@ -137,10 +147,11 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
if (!interruptClusterStateProcessing(duration)) {
continue;
}
duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin)));
if (disrupting && disruptedNode != null) {
Thread.sleep(duration.millis());
if (intervalBetweenDelaysMax > 0) {
duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin)));
if (disrupting && disruptedNode != null) {
Thread.sleep(duration.millis());
}
}
} catch (InterruptedException e) {
} catch (Exception e) {