Discovery: concurrent node failures can cause unneeded cluster state publishing

When a node fails (or closes), the master processes the network disconnect event and removes the node from the cluster state. If multiple nodes fail (or shut down) in rapid succession, we process the events and remove the nodes one by one. During this process, the intermediate cluster states may cause the node fault detection to signal the failure of nodes that are not yet removed from the cluster state. While this is fine, it currently causes unneeded reroutes and cluster state publishing, which can be cumbersome in big clusters.

Closes #8804
Closes #8933
This commit is contained in:
Boaz Leskes 2014-12-12 20:53:50 +01:00
parent 9b18c44b67
commit d62bf5f67f
2 changed files with 58 additions and 1 deletions

View File

@ -547,6 +547,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().get(node.id()) == null) {
logger.debug("node [{}] already removed from cluster state. ignoring.", node);
return currentState;
}
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes())
.remove(node.id());
currentState = ClusterState.builder(currentState).nodes(builder).build();

View File

@ -21,20 +21,30 @@ package org.elasticsearch.discovery.zen;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.*;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class ZenDiscoveryRejoinOnMaster extends ElasticsearchIntegrationTest {
public class ZenDiscoveryTests extends ElasticsearchIntegrationTest {
@Test
public void testChangeRejoinOnMasterOptionIsDynamic() throws Exception {
@ -99,4 +109,47 @@ public class ZenDiscoveryRejoinOnMaster extends ElasticsearchIntegrationTest {
assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster));
}
@Test
public void testNodeFailuresAreProcessedOnce() throws ExecutionException, InterruptedException, IOException {
Settings defaultSettings = ImmutableSettings.builder()
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s")
.put(FaultDetection.SETTING_PING_RETRIES, "1")
.put("discovery.type", "zen")
.build();
Settings masterNodeSettings = ImmutableSettings.builder()
.put("node.data", false)
.put(defaultSettings)
.build();
String master = internalCluster().startNode(masterNodeSettings);
Settings dateNodeSettings = ImmutableSettings.builder()
.put("node.master", false)
.put(defaultSettings)
.build();
internalCluster().startNodesAsync(2, dateNodeSettings).get();
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, master);
final ArrayList<ClusterState> statesFound = new ArrayList<>();
final CountDownLatch nodesStopped = new CountDownLatch(1);
clusterService.add(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
statesFound.add(event.state());
try {
// block until both nodes have stopped to accumulate node failures
nodesStopped.await();
} catch (InterruptedException e) {
//meh
}
}
});
internalCluster().stopRandomNonMasterNode();
internalCluster().stopRandomNonMasterNode();
nodesStopped.countDown();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); // wait for all to be processed
assertThat(statesFound, Matchers.hasSize(2));
}
}