From d62bf5f67f102084fd66bf58744d5b30c26c4882 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 12 Dec 2014 20:53:50 +0100 Subject: [PATCH] Discovery: concurrent node failures can cause unneeded cluster state publishing When a node fails (or closes), the master processes the network disconnect event and removes the node from the cluster state. If multiple nodes fail (or shut down) in rapid succession, we process the events and remove the nodes one by one. During this process, the intermediate cluster states may cause the node fault detection to signal the failure of nodes that are not yet removed from the cluster state. While this is fine, it currently causes unneeded reroutes and cluster state publishing, which can be cumbersome in big clusters. Closes #8804 Closes #8933 --- .../discovery/zen/ZenDiscovery.java | 4 ++ ...inOnMaster.java => ZenDiscoveryTests.java} | 55 ++++++++++++++++++- 2 files changed, 58 insertions(+), 1 deletion(-) rename src/test/java/org/elasticsearch/discovery/zen/{ZenDiscoveryRejoinOnMaster.java => ZenDiscoveryTests.java} (64%) diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index bea067d9363..8c81086bf46 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -547,6 +547,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + if (currentState.nodes().get(node.id()) == null) { + logger.debug("node [{}] already removed from cluster state. ignoring.", node); + return currentState; + } DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()) .remove(node.id()); currentState = ClusterState.builder(currentState).nodes(builder).build(); diff --git a/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryRejoinOnMaster.java b/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java similarity index 64% rename from src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryRejoinOnMaster.java rename to src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java index 1ee31505d5e..daf1747844a 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryRejoinOnMaster.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java @@ -21,20 +21,30 @@ package org.elasticsearch.discovery.zen; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.hamcrest.Matchers; import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; + import static org.hamcrest.Matchers.*; /** */ @ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0, numClientNodes = 0) -public class ZenDiscoveryRejoinOnMaster extends ElasticsearchIntegrationTest { +public class ZenDiscoveryTests extends ElasticsearchIntegrationTest { @Test public void testChangeRejoinOnMasterOptionIsDynamic() throws Exception { @@ -99,4 +109,47 @@ public class ZenDiscoveryRejoinOnMaster extends ElasticsearchIntegrationTest { assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster)); } + @Test + public void testNodeFailuresAreProcessedOnce() throws ExecutionException, InterruptedException, IOException { + Settings defaultSettings = ImmutableSettings.builder() + .put(FaultDetection.SETTING_PING_TIMEOUT, "1s") + .put(FaultDetection.SETTING_PING_RETRIES, "1") + .put("discovery.type", "zen") + .build(); + + Settings masterNodeSettings = ImmutableSettings.builder() + .put("node.data", false) + .put(defaultSettings) + .build(); + String master = internalCluster().startNode(masterNodeSettings); + Settings dateNodeSettings = ImmutableSettings.builder() + .put("node.master", false) + .put(defaultSettings) + .build(); + internalCluster().startNodesAsync(2, dateNodeSettings).get(); + client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, master); + final ArrayList statesFound = new ArrayList<>(); + final CountDownLatch nodesStopped = new CountDownLatch(1); + clusterService.add(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + statesFound.add(event.state()); + try { + // block until both nodes have stopped to accumulate node failures + nodesStopped.await(); + } catch (InterruptedException e) { + //meh + } + } + }); + + internalCluster().stopRandomNonMasterNode(); + internalCluster().stopRandomNonMasterNode(); + nodesStopped.countDown(); + + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); // wait for all to be processed + assertThat(statesFound, Matchers.hasSize(2)); + } }