[Zen2] Only elect master-eligible nodes (#35996)

Today any node can win an election. However, the whole point of
master-eligibility is that master-ineligible nodes should not be elected as the
leader; furthermore master-ineligible nodes do not have any outgoing STATE
channels so cannot publish cluster states, so their leadership is ineffective
and disruptive.

This change ensures that the elected leader is master-eligible by preventing
master-ineligible nodes from scheduling an election.
This commit is contained in:
David Turner 2018-11-29 12:10:43 +00:00 committed by GitHub
parent 0b45fb98b9
commit 87408b04d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 27 deletions

View File

@ -344,6 +344,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
assert Thread.holdsLock(mutex) == false; assert Thread.holdsLock(mutex) == false;
assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible";
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest); logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest);
transportService.connectToNode(joinRequest.getSourceNode()); transportService.connectToNode(joinRequest.getSourceNode());
@ -392,6 +393,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
void becomeLeader(String method) { void becomeLeader(String method) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert mode == Mode.CANDIDATE : "expected candidate but was " + mode; assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
assert getLocalNode().isMasterNode() : getLocalNode() + " became a leader but is not master-eligible";
logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader); logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader);
mode = Mode.LEADER; mode = Mode.LEADER;
@ -409,6 +412,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
void becomeFollower(String method, DiscoveryNode leaderNode) { void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible";
logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader); logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader);
final boolean restartLeaderChecker = (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) == false; final boolean restartLeaderChecker = (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) == false;
@ -927,6 +932,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private void startElectionScheduler() { private void startElectionScheduler() {
assert electionScheduler == null : electionScheduler; assert electionScheduler == null : electionScheduler;
if (getLocalNode().isMasterNode() == false) {
return;
}
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override @Override

View File

@ -166,6 +166,7 @@ public class JoinHelper {
} }
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) { public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin); final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest); final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) { if (pendingOutgoingJoins.add(dedupKey)) {
@ -210,6 +211,8 @@ public class JoinHelper {
} }
public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) { public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
assert startJoinRequest.getSourceNode().isMasterNode()
: "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
transportService.sendRequest(destination, START_JOIN_ACTION_NAME, transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
startJoinRequest, new TransportResponseHandler<Empty>() { startJoinRequest, new TransportResponseHandler<Empty>() {
@Override @Override

View File

@ -23,19 +23,22 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
@ -58,6 +61,7 @@ import java.io.UncheckedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -136,6 +140,15 @@ public class CoordinatorTests extends ESTestCase {
} }
} }
public void testDoesNotElectNonMasterNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5), false);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
assertTrue(leader.localNode.isMasterNode());
}
public void testNodesJoinAfterStableCluster() { public void testNodesJoinAfterStableCluster() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5)); final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly(); cluster.runRandomly();
@ -889,10 +902,6 @@ public class CoordinatorTests extends ESTestCase {
// then wait for the new leader to commit a state without the old leader // then wait for the new leader to commit a state without the old leader
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY; + DEFAULT_CLUSTER_STATE_UPDATE_DELAY;
private static String nodeIdFromIndex(int nodeIndex) {
return "node" + nodeIndex;
}
class Cluster { class Cluster {
static final long EXTREME_DELAY_VARIABILITY = 10000L; static final long EXTREME_DELAY_VARIABILITY = 10000L;
@ -910,28 +919,31 @@ public class CoordinatorTests extends ESTestCase {
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>(); private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
Cluster(int initialNodeCount) { Cluster(int initialNodeCount) {
this(initialNodeCount, true);
}
Cluster(int initialNodeCount, boolean allNodesMasterEligible) {
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
assertThat(initialNodeCount, greaterThan(0)); assertThat(initialNodeCount, greaterThan(0));
final Set<String> initialConfigurationNodeIds = new HashSet<>(initialNodeCount); final Set<String> masterEligibleNodeIds = new HashSet<>(initialNodeCount);
while (initialConfigurationNodeIds.isEmpty()) {
for (int i = 0; i < initialNodeCount; i++) {
if (randomBoolean()) {
initialConfigurationNodeIds.add(nodeIdFromIndex(i));
}
}
}
initialConfiguration = new VotingConfiguration(initialConfigurationNodeIds);
logger.info("--> creating cluster of {} nodes with initial configuration {}", initialNodeCount, initialConfiguration);
clusterNodes = new ArrayList<>(initialNodeCount); clusterNodes = new ArrayList<>(initialNodeCount);
for (int i = 0; i < initialNodeCount; i++) { for (int i = 0; i < initialNodeCount; i++) {
final ClusterNode clusterNode = new ClusterNode(i); final ClusterNode clusterNode = new ClusterNode(i, allNodesMasterEligible || i == 0 || randomBoolean());
clusterNodes.add(clusterNode); clusterNodes.add(clusterNode);
if (clusterNode.getLocalNode().isMasterNode()) {
masterEligibleNodeIds.add(clusterNode.getId());
} }
} }
initialConfiguration = new VotingConfiguration(new HashSet<>(
randomSubsetOf(randomIntBetween(1, masterEligibleNodeIds.size()), masterEligibleNodeIds)));
logger.info("--> creating cluster of {} nodes (master-eligible nodes: {}) with initial configuration {}",
initialNodeCount, masterEligibleNodeIds, initialConfiguration);
}
void addNodesAndStabilise(int newNodesCount) { void addNodesAndStabilise(int newNodesCount) {
addNodes(newNodesCount); addNodes(newNodesCount);
stabilise( stabilise(
@ -950,7 +962,7 @@ public class CoordinatorTests extends ESTestCase {
final int nodeSizeAtStart = clusterNodes.size(); final int nodeSizeAtStart = clusterNodes.size();
for (int i = 0; i < newNodesCount; i++) { for (int i = 0; i < newNodesCount; i++) {
final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i); final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i, true);
clusterNodes.add(clusterNode); clusterNodes.add(clusterNode);
} }
} }
@ -1090,11 +1102,11 @@ public class CoordinatorTests extends ESTestCase {
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
assertFalse("stabilisation requires stable storage", disruptStorage); assertFalse("stabilisation requires stable storage", disruptStorage);
if (clusterNodes.stream().allMatch(n -> n.coordinator.isInitialConfigurationSet() == false)) { if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyNode(); final ClusterNode bootstrapNode = getAnyMasterEligibleNode();
bootstrapNode.applyInitialConfiguration(); bootstrapNode.applyInitialConfiguration();
} else { } else {
logger.info("setting initial configuration not required"); logger.info("setting initial configuration not required");
@ -1211,6 +1223,10 @@ public class CoordinatorTests extends ESTestCase {
return connectionStatus; return connectionStatus;
} }
ClusterNode getAnyMasterEligibleNode() {
return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).collect(Collectors.toList()));
}
ClusterNode getAnyNode() { ClusterNode getAnyNode() {
return getAnyNodeExcept(); return getAnyNodeExcept();
} }
@ -1283,16 +1299,20 @@ public class CoordinatorTests extends ESTestCase {
private DisruptableMockTransport mockTransport; private DisruptableMockTransport mockTransport;
private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
ClusterNode(int nodeIndex) { ClusterNode(int nodeIndex, boolean masterEligible) {
this.nodeIndex = nodeIndex; this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode(); localNode = createDiscoveryNode(masterEligible);
persistedState = new MockPersistedState(0L, persistedState = new MockPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
onNode(localNode, this::setUp).run(); onNode(localNode, this::setUp).run();
} }
private DiscoveryNode createDiscoveryNode() { private DiscoveryNode createDiscoveryNode(boolean masterEligible) {
return CoordinationStateTests.createNode(nodeIdFromIndex(nodeIndex)); final TransportAddress address = buildNewFakeTransportAddress();
return new DiscoveryNode("", "node" + nodeIndex,
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
masterEligible ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT);
} }
private void setUp() { private void setUp() {
@ -1483,6 +1503,10 @@ public class CoordinatorTests extends ESTestCase {
}).run(); }).run();
} }
private boolean isNotUsefullyBootstrapped() {
return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false;
}
private class FakeClusterApplier implements ClusterApplier { private class FakeClusterApplier implements ClusterApplier {
final ClusterName clusterName; final ClusterName clusterName;