From 03d880de38c88f178b80ce96c45a2a689e6dbb84 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 29 Sep 2014 08:45:28 +0200 Subject: [PATCH] Discovery: master fault detection fall back to cluster state thread upon error With #7834, we simplified ZenDiscovery by making it use the current cluster state for all it's decision. This had the side effect a node may start it's Master FD before the master has fully processed that cluster state update that adds that node (or elects the master master). This is due to the fact that master FD is started when a node receives a cluster state from the master but the master it self may still be publishing to other node. This commit makes sure that a master FD ping is only failed once we know that there is no current cluster state update in progress. Closes #7908 --- .../discovery/zen/ZenDiscovery.java | 7 +- .../zen/fd/MasterFaultDetection.java | 77 ++++++++--- .../DiscoveryWithServiceDisruptions.java | 125 ++++++++++++++---- .../discovery/ZenFaultDetectionTests.java | 21 +-- .../test/cluster/NoopClusterService.java | 26 +++- .../SlowClusterStateProcessing.java | 21 ++- 6 files changed, 212 insertions(+), 65 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 7390f7b6029..a2d1fbdb3ff 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -22,7 +22,10 @@ package org.elasticsearch.discovery.zen; import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.elasticsearch.*; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -180,7 +183,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen nodeSettingsService.addListener(new ApplySettings()); - this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName); + this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterName, clusterService); this.masterFD.addListener(new MasterNodeFailureListener()); this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 8bb8f452023..21e9b21022b 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -19,16 +19,20 @@ package org.elasticsearch.discovery.zen.fd; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -54,8 +58,7 @@ public class MasterFaultDetection extends FaultDetection { void notListedOnMaster(); } - private final DiscoveryNodesProvider nodesProvider; - + private final ClusterService clusterService; private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); private volatile MasterPinger masterPinger; @@ -69,9 +72,9 @@ public class MasterFaultDetection extends FaultDetection { private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean(); public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, - DiscoveryNodesProvider nodesProvider, ClusterName clusterName) { + ClusterName clusterName, ClusterService clusterService) { super(settings, threadPool, transportService, clusterName); - this.nodesProvider = nodesProvider; + this.clusterService = clusterService; logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); @@ -231,7 +234,7 @@ public class MasterFaultDetection extends FaultDetection { threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); return; } - final MasterPingRequest request = new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id(), clusterName); + final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName); final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler() { @@ -343,8 +346,8 @@ public class MasterFaultDetection extends FaultDetection { } @Override - public void messageReceived(MasterPingRequest request, TransportChannel channel) throws Exception { - DiscoveryNodes nodes = nodesProvider.nodes(); + public void messageReceived(final MasterPingRequest request, final TransportChannel channel) throws Exception { + final DiscoveryNodes nodes = clusterService.state().nodes(); // check if we are really the same master as the one we seemed to be think we are // this can happen if the master got "kill -9" and then another node started using the same port if (!request.masterNodeId.equals(nodes.localNodeId())) { @@ -357,15 +360,57 @@ public class MasterFaultDetection extends FaultDetection { throw new NotMasterException("master fault detection ping request is targeted for a different [" + request.clusterName + "] cluster then us [" + clusterName + "]"); } - // if we are no longer master, fail... - if (!nodes.localNodeMaster()) { - throw new NoLongerMasterException(); + // when we are elected as master or when a node joins, we use a cluster state update thread + // to incorporate that information in the cluster state. That cluster state is published + // before we make it available locally. This means that a master ping can come from a node + // that has already processed the new CS but it is not known locally. + // Therefore, if we fail we have to check again under a cluster state thread to make sure + // all processing is finished. + // + + + if (!nodes.localNodeMaster() || !nodes.nodeExists(request.nodeId)) { + logger.trace("checking ping from [{}] under a cluster state thread", request.nodeId); + clusterService.submitStateUpdateTask("master ping (from: [" + request.nodeId + "])", new ProcessedClusterStateNonMasterUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // if we are no longer master, fail... + DiscoveryNodes nodes = currentState.nodes(); + if (!nodes.localNodeMaster()) { + throw new NoLongerMasterException(); + } + if (!nodes.nodeExists(request.nodeId)) { + throw new NodeDoesNotExistOnMasterException(); + } + return currentState; + } + + @Override + public void onFailure(String source, @Nullable Throwable t) { + if (t == null) { + t = new ElasticsearchException("unknown error while processing ping"); + } + try { + channel.sendResponse(t); + } catch (IOException e) { + logger.warn("error while sending ping response", e); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try { + channel.sendResponse(new MasterPingResponseResponse(true)); + } catch (IOException e) { + logger.warn("error while sending ping response", e); + } + } + }); + } else { + // send a response, and note if we are connected to the master or not + channel.sendResponse(new MasterPingResponseResponse(true)); } - if (!nodes.nodeExists(request.nodeId)) { - throw new NodeDoesNotExistOnMasterException(); - } - // send a response, and note if we are connected to the master or not - channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.nodeId))); } @Override diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java index 6d0288506c8..80625c477e7 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java @@ -107,11 +107,36 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes } private List startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { - if (randomBoolean()) { - return startMulticastCluster(numberOfNodes, minimumMasterNode); - } else { - return startUnicastCluster(numberOfNodes, null, minimumMasterNode); + configureCluster(numberOfNodes, minimumMasterNode); + List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); + ensureStableCluster(numberOfNodes); + + // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results + for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { + for (ZenPing zenPing : pingService.zenPings()) { + if (zenPing instanceof UnicastZenPing) { + ((UnicastZenPing) zenPing).clearTemporalResponses(); + } + } } + return nodes; + } + + + private List startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { + configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); + List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); + ensureStableCluster(numberOfNodes); + + // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results + for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { + for (ZenPing zenPing : pingService.zenPings()) { + if (zenPing instanceof UnicastZenPing) { + ((UnicastZenPing) zenPing).clearTemporalResponses(); + } + } + } + return nodes; } final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder() @@ -124,7 +149,16 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) .build(); - private List startMulticastCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { + private void configureCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { + if (randomBoolean()) { + configureMulticastCluster(numberOfNodes, minimumMasterNode); + } else { + configureUnicastCluster(numberOfNodes, null, minimumMasterNode); + } + + } + + private void configureMulticastCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { if (minimumMasterNode < 0) { minimumMasterNode = numberOfNodes / 2 + 1; } @@ -137,13 +171,9 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes if (discoveryConfig == null) { discoveryConfig = new ClusterDiscoveryConfiguration(numberOfNodes, settings); } - List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); - ensureStableCluster(numberOfNodes); - - return nodes; } - private List startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { + private void configureUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException { if (minimumMasterNode < 0) { minimumMasterNode = numberOfNodes / 2 + 1; } @@ -160,19 +190,6 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals); } } - List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); - ensureStableCluster(numberOfNodes); - - // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results - for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { - for (ZenPing zenPing : pingService.zenPings()) { - if (zenPing instanceof UnicastZenPing) { - ((UnicastZenPing) zenPing).clearTemporalResponses(); - } - } - } - - return nodes; } @@ -763,6 +780,68 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes } + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE") + public void testClusterFormingWithASlowNode() throws Exception { + configureCluster(3, 2); + + SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(getRandom(), 0, 0, 5000, 6000); + + // don't wait for initial state, wat want to add the disruption while the cluster is forming.. + internalCluster().startNodesAsync(3, + ImmutableSettings.builder() + .put(DiscoveryService.SETTING_INITIAL_STATE_TIMEOUT, "1ms") + .put(DiscoverySettings.PUBLISH_TIMEOUT, "3s") + .build()).get(); + + logger.info("applying disruption while cluster is forming ..."); + + internalCluster().setDisruptionScheme(disruption); + disruption.startDisrupting(); + + ensureStableCluster(3); + } + + /** + * Adds an asymetric break between a master and one of the nodes and makes + * sure that the node is removed form the cluster, that the node start pinging and that + * the cluster reforms when healed. + */ + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE") + public void testNodeNotReachableFromMaster() throws Exception { + startCluster(3); + + String masterNode = internalCluster().getMasterName(); + String nonMasterNode = null; + while (nonMasterNode == null) { + nonMasterNode = randomFrom(internalCluster().getNodeNames()); + if (nonMasterNode.equals(masterNode)) { + masterNode = null; + } + } + + logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode); + MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); + if (randomBoolean()) { + masterTransportService.addUnresponsiveRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode()); + } else { + masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode()); + } + + logger.info("waiting for [{}] to be removed from cluster", nonMasterNode); + ensureStableCluster(2, masterNode); + + logger.info("waiting for [{}] to have no master", nonMasterNode); + assertNoMaster(nonMasterNode); + + logger.info("healing partition and checking cluster reforms"); + masterTransportService.clearAllRules(); + + ensureStableCluster(3); + } + + protected NetworkPartition addRandomPartition() { NetworkPartition partition; if (randomBoolean()) { diff --git a/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index fd73ce17b45..8f6b44b4339 100644 --- a/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -27,12 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; -import org.elasticsearch.node.service.NodeService; import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.cluster.NoopClusterService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportConnectionListener; @@ -172,21 +171,9 @@ public class ZenFaultDetectionTests extends ElasticsearchTestCase { settings.put(FaultDetection.SETTING_CONNECT_ON_NETWORK_DISCONNECT, shouldRetry) .put(FaultDetection.SETTING_PING_INTERVAL, "5m"); ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20)); - final DiscoveryNodes nodes = buildNodesForA(false); - MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, - new DiscoveryNodesProvider() { - @Override - public DiscoveryNodes nodes() { - return nodes; - } - - @Override - public NodeService nodeService() { - return null; - } - }, - clusterName - ); + final ClusterState state = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build(); + MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName, + new NoopClusterService(state)); masterFD.start(nodeB, "test"); final String[] failureReason = new String[1]; diff --git a/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java b/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java index 4821ac1f9d7..54d29e06b50 100644 --- a/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java +++ b/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java @@ -20,28 +20,50 @@ package org.elasticsearch.test.cluster; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; import java.util.List; public class NoopClusterService implements ClusterService { + final ClusterState state; + + public NoopClusterService() { + this(ClusterState.builder(new ClusterName("noop")).build()); + } + + public NoopClusterService(ClusterState state) { + if (state.getNodes().size() == 0) { + state = ClusterState.builder(state).nodes( + DiscoveryNodes.builder() + .put(new DiscoveryNode("noop_id", DummyTransportAddress.INSTANCE, Version.CURRENT)) + .localNodeId("noop_id")).build(); + } + + assert state.getNodes().localNode() != null; + this.state = state; + + } + @Override public DiscoveryNode localNode() { - return null; + return state.getNodes().localNode(); } @Override public ClusterState state() { - return null; + return state; } @Override diff --git a/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java b/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java index 46ae0afe54c..746d7f942ba 100644 --- a/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java +++ b/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.TimeValue; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; public class SlowClusterStateProcessing extends SingleNodeDisruption { @@ -99,11 +100,19 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption { if (clusterService == null) { return false; } + final AtomicBoolean stopped = new AtomicBoolean(false); clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - Thread.sleep(duration.millis()); + long count = duration.millis() / 200; + // wait while checking for a stopped + for (; count > 0 && !stopped.get(); count--) { + Thread.sleep(200); + } + if (!stopped.get()) { + Thread.sleep(duration.millis() % 200); + } countDownLatch.countDown(); return currentState; } @@ -116,6 +125,7 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption { try { countDownLatch.await(); } catch (InterruptedException e) { + stopped.set(true); // try to wait again, we really want the cluster state thread to be freed up when stopping disruption countDownLatch.await(); } @@ -137,10 +147,11 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption { if (!interruptClusterStateProcessing(duration)) { continue; } - - duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin))); - if (disrupting && disruptedNode != null) { - Thread.sleep(duration.millis()); + if (intervalBetweenDelaysMax > 0) { + duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin))); + if (disrupting && disruptedNode != null) { + Thread.sleep(duration.millis()); + } } } catch (InterruptedException e) { } catch (Exception e) {