Only bootstrap and elect node in current voting configuration (#37712)

Adapts bootstrapping and leader election to only trigger on nodes that are actually part of the voting
configuration.
This commit is contained in:
Yannick Welsch 2019-01-23 13:10:11 +01:00 committed by GitHub
parent 4ec3a6d922
commit d5139e0590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 14 deletions

View File

@ -97,7 +97,7 @@ public class ClusterBootstrapService {
void onFoundPeersUpdated() { void onFoundPeersUpdated() {
final Set<DiscoveryNode> nodes = getDiscoveredNodes(); final Set<DiscoveryNode> nodes = getDiscoveredNodes();
if (transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false if (bootstrappingPermitted.get() && transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false
&& isBootstrappedSupplier.getAsBoolean() == false && nodes.stream().noneMatch(Coordinator::isZen1Node)) { && isBootstrappedSupplier.getAsBoolean() == false && nodes.stream().noneMatch(Coordinator::isZen1Node)) {
final Tuple<Set<DiscoveryNode>,List<String>> requirementMatchingResult; final Tuple<Set<DiscoveryNode>,List<String>> requirementMatchingResult;
@ -114,6 +114,13 @@ public class ClusterBootstrapService {
logger.trace("nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}", logger.trace("nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}",
nodesMatchingRequirements, unsatisfiedRequirements, bootstrapRequirements); nodesMatchingRequirements, unsatisfiedRequirements, bootstrapRequirements);
if (nodesMatchingRequirements.contains(transportService.getLocalNode()) == false) {
logger.info("skipping cluster bootstrapping as local node does not match bootstrap requirements: {}",
bootstrapRequirements);
bootstrappingPermitted.set(false);
return;
}
if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) { if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) {
startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements); startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements);
} }

View File

@ -348,6 +348,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
// The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
// to check our mode again here. // to check our mode again here.
if (mode == Mode.CANDIDATE) { if (mode == Mode.CANDIDATE) {
if (electionQuorumContainsLocalNode(getLastAcceptedState()) == false) {
logger.trace("skip election as local node is not part of election quorum: {}",
getLastAcceptedState().coordinationMetaData());
return;
}
final StartJoinRequest startJoinRequest final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1); = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest); logger.debug("starting election with {}", startJoinRequest);
@ -360,6 +366,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
} }
} }
private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) {
final String localNodeId = lastAcceptedState.nodes().getLocalNodeId();
assert localNodeId != null;
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId)
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId);
}
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (getCurrentTerm() < targetTerm) { if (getCurrentTerm() < targetTerm) {
@ -709,10 +722,24 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return false; return false;
} }
if (getLocalNode().isMasterNode() == false) {
logger.debug("skip setting initial configuration as local node is not a master-eligible node");
throw new CoordinationStateRejectedException(
"this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node");
}
if (votingConfiguration.getNodeIds().contains(getLocalNode().getId()) == false) {
logger.debug("skip setting initial configuration as local node is not part of initial configuration");
throw new CoordinationStateRejectedException("local node is not part of initial configuration");
}
final List<DiscoveryNode> knownNodes = new ArrayList<>(); final List<DiscoveryNode> knownNodes = new ArrayList<>();
knownNodes.add(getLocalNode()); knownNodes.add(getLocalNode());
peerFinder.getFoundPeers().forEach(knownNodes::add); peerFinder.getFoundPeers().forEach(knownNodes::add);
if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) { if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) {
logger.debug("skip setting initial configuration as not enough nodes discovered to form a quorum in the " +
"initial configuration [knownNodes={}, {}]", knownNodes, votingConfiguration);
throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " + throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " +
"[knownNodes=" + knownNodes + ", " + votingConfiguration + "]"); "[knownNodes=" + knownNodes + ", " + votingConfiguration + "]");
} }
@ -729,6 +756,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
metaDataBuilder.coordinationMetaData(coordinationMetaData); metaDataBuilder.coordinationMetaData(coordinationMetaData);
coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build()); coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build());
assert electionQuorumContainsLocalNode(getLastAcceptedState()) :
"initial state does not have local node in its election quorum: " + getLastAcceptedState().coordinationMetaData();
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler(); startElectionScheduler();
return true; return true;
@ -1022,12 +1051,20 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
public void run() { public void run() {
synchronized (mutex) { synchronized (mutex) {
if (mode == Mode.CANDIDATE) { if (mode == Mode.CANDIDATE) {
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
if (electionQuorumContainsLocalNode(lastAcceptedState) == false) {
logger.trace("skip prevoting as local node is not part of election quorum: {}",
lastAcceptedState.coordinationMetaData());
return;
}
if (prevotingRound != null) { if (prevotingRound != null) {
prevotingRound.close(); prevotingRound.close();
} }
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
final List<DiscoveryNode> discoveredNodes final List<DiscoveryNode> discoveredNodes
= getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList()); = getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList());
prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes); prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes);
} }
} }

View File

@ -328,6 +328,18 @@ public class ClusterBootstrapServiceTests extends ESTestCase {
deterministicTaskQueue.runAllTasks(); deterministicTaskQueue.runAllTasks();
} }
public void testDoesNotBootstrapsIfLocalNodeNotInInitialMasterNodes() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () ->
Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotBootstrapsIfNotConfigured() { public void testDoesNotBootstrapsIfNotConfigured() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()).build(), transportService, Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()).build(), transportService,

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.coordination; package org.elasticsearch.cluster.coordination;
import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.RandomizedContext;
import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -53,6 +52,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -93,10 +93,10 @@ import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOO
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY; import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_DURATION_SETTING; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_DURATION_SETTING;
@ -117,7 +117,6 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -745,7 +744,7 @@ public class CoordinatorTests extends ESTestCase {
assertThat(nodeId + " should have found all peers", foundPeers, hasSize(cluster.size())); assertThat(nodeId + " should have found all peers", foundPeers, hasSize(cluster.size()));
} }
final ClusterNode bootstrapNode = cluster.getAnyNode(); final ClusterNode bootstrapNode = cluster.getAnyBootstrappableNode();
bootstrapNode.applyInitialConfiguration(); bootstrapNode.applyInitialConfiguration();
assertTrue(bootstrapNode.getId() + " has been bootstrapped", bootstrapNode.coordinator.isInitialConfigurationSet()); assertTrue(bootstrapNode.getId() + " has been bootstrapped", bootstrapNode.coordinator.isInitialConfigurationSet());
@ -775,13 +774,13 @@ public class CoordinatorTests extends ESTestCase {
public void testCannotSetInitialConfigurationWithoutQuorum() { public void testCannotSetInitialConfigurationWithoutQuorum() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5)); final Cluster cluster = new Cluster(randomIntBetween(1, 5));
final Coordinator coordinator = cluster.getAnyNode().coordinator; final Coordinator coordinator = cluster.getAnyNode().coordinator;
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(
Sets.newHashSet(coordinator.getLocalNode().getId(), "unknown-node"));
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class, final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage(); () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
assertThat(exceptionMessage, assertThat(exceptionMessage,
startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=[")); startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=["));
assertThat(exceptionMessage, assertThat(exceptionMessage, containsString("unknown-node"));
endsWith("], VotingConfiguration{unknown-node}]"));
assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString())); assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString()));
// This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum.
@ -789,6 +788,16 @@ public class CoordinatorTests extends ESTestCase {
cluster.stabilise(); cluster.stabilise();
} }
public void testCannotSetInitialConfigurationWithoutLocalNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
final Coordinator coordinator = cluster.getAnyNode().coordinator;
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Sets.newHashSet("unknown-node"));
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
assertThat(exceptionMessage,
equalTo("local node is not part of initial configuration"));
}
public void testDiffBasedPublishing() { public void testDiffBasedPublishing() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5)); final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly(); cluster.runRandomly();
@ -1331,7 +1340,7 @@ public class CoordinatorTests extends ESTestCase {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyMasterEligibleNode(); final ClusterNode bootstrapNode = getAnyBootstrappableNode();
bootstrapNode.applyInitialConfiguration(); bootstrapNode.applyInitialConfiguration();
} else { } else {
logger.info("setting initial configuration not required"); logger.info("setting initial configuration not required");
@ -1402,8 +1411,10 @@ public class CoordinatorTests extends ESTestCase {
return clusterNodes.stream().anyMatch(cn -> cn.getLocalNode().equals(node)); return clusterNodes.stream().anyMatch(cn -> cn.getLocalNode().equals(node));
} }
ClusterNode getAnyMasterEligibleNode() { ClusterNode getAnyBootstrappableNode() {
return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).collect(Collectors.toList())); return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode())
.filter(n -> initialConfiguration.getNodeIds().contains(n.getLocalNode().getId()))
.collect(Collectors.toList()));
} }
ClusterNode getAnyNode() { ClusterNode getAnyNode() {
@ -1737,8 +1748,14 @@ public class CoordinatorTests extends ESTestCase {
Stream.generate(() -> BOOTSTRAP_PLACEHOLDER_PREFIX + UUIDs.randomBase64UUID(random())) Stream.generate(() -> BOOTSTRAP_PLACEHOLDER_PREFIX + UUIDs.randomBase64UUID(random()))
.limit((Math.max(initialConfiguration.getNodeIds().size(), 2) - 1) / 2) .limit((Math.max(initialConfiguration.getNodeIds().size(), 2) - 1) / 2)
.forEach(nodeIdsWithPlaceholders::add); .forEach(nodeIdsWithPlaceholders::add);
final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(new HashSet<>( final Set<String> nodeIds = new HashSet<>(
randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders))); randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders));
// initial configuration should not have a place holder for local node
if (initialConfiguration.getNodeIds().contains(localNode.getId()) && nodeIds.contains(localNode.getId()) == false) {
nodeIds.remove(nodeIds.iterator().next());
nodeIds.add(localNode.getId());
}
final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(nodeIds);
try { try {
coordinator.setInitialConfiguration(configurationWithPlaceholders); coordinator.setInitialConfiguration(configurationWithPlaceholders);
logger.info("successfully set initial configuration to {}", configurationWithPlaceholders); logger.info("successfully set initial configuration to {}", configurationWithPlaceholders);