Nodes who are not currently master do not update the ElectMasterService when dynamically setting min_master_nodes

When updating the min_master_nodes setting via the Cluster Settings API, the change is propagated to all nodes. The current master node also updates the ElectMasterService and validates that is still sees enough master eligible nodes and that it's election is still valid. Other master eligible nodes do not go through this validation (good) but also didn't update the ElectMasterService with the new settings. The result is that if the current master goes away, the next election will not be done with the latest setting.

Note - min_master_node set in the elasticsearch.yml file are processed correctly

Closes #5494
This commit is contained in:
Boaz Leskes 2014-03-22 16:03:04 +01:00
parent 5babf59813
commit c74f1de3a6
2 changed files with 57 additions and 16 deletions

View File

@ -437,15 +437,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// not started, ignore a node failure
return;
}
final int prevMinimumMasterNode = ZenDiscovery.this.electMaster.minimumMasterNodes();
ZenDiscovery.this.electMaster.minimumMasterNodes(minimumMasterNodes);
if (!master) {
// nothing to do here...
// We only set the new value. If the master doesn't see enough nodes it will revoke it's mastership.
return;
}
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final int prevMinimumMasterNode = ZenDiscovery.this.electMaster.minimumMasterNodes();
ZenDiscovery.this.electMaster.minimumMasterNodes(minimumMasterNodes);
// check if we have enough master nodes, if not, we need to move into joining the cluster again
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
return rejoin(currentState, "not enough master nodes on change of minimum_master_nodes from [" + prevMinimumMasterNode + "] to [" + minimumMasterNodes + "]");

View File

@ -225,19 +225,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
logger.info("--> verify that there is no master anymore on remaining nodes");
// spin here to wait till the state is set
assertThat(awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
boolean success = true;
for(Client client : cluster()) {
ClusterState state = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
success &= state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
if (logger.isDebugEnabled()) {
logger.debug("Checking for NO_MASTER_BLOCK on client: {} NO_MASTER_BLOCK: [{}]", client, state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK));
}
}
return success;
}
}, 20, TimeUnit.SECONDS), equalTo(true));
assertNoMasterBlockOnAllNodes();
logger.info("--> start back the 2 nodes ");
cluster().startNode(settings);
@ -260,4 +248,57 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
assertThat(client().prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
}
}
@Test
public void dynamicUpdateMinimumMasterNodes() throws InterruptedException {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.put("gateway.type", "local")
.build();
logger.info("--> start 2 nodes");
cluster().startNode(settings);
cluster().startNode(settings);
// wait until second node join the cluster
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
logger.info("--> setting minimum master node to 2");
setMinimumMasterNodes(2);
// make sure it has been processed on all nodes (master node spawns a secondary cluster state update task)
for (Client client : cluster()) {
assertThat(client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).get().isTimedOut(),
equalTo(false));
}
logger.info("--> stopping a node");
cluster().stopRandomNode();
logger.info("--> verifying min master node has effect");
assertNoMasterBlockOnAllNodes();
logger.info("--> bringing another node up");
cluster().startNode(settings);
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
}
private void assertNoMasterBlockOnAllNodes() throws InterruptedException {
assertThat(awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
boolean success = true;
for (Client client : cluster()) {
ClusterState state = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
success &= state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
if (logger.isDebugEnabled()) {
logger.debug("Checking for NO_MASTER_BLOCK on client: {} NO_MASTER_BLOCK: [{}]", client, state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK));
}
}
return success;
}
}, 20, TimeUnit.SECONDS), equalTo(true));
}
}