Add voting-only master node (#43410)

A voting-only master-eligible node is a node that can participate in master elections but will not act
as a master in the cluster. In particular, a voting-only node can help elect another master-eligible
node as master, and can serve as a tiebreaker in elections. High availability (HA) clusters require at
least three master-eligible nodes, so that if one of the three nodes is down, then the remaining two
can still elect a master amongst them-selves. This only requires one of the two remaining nodes to
have the capability to act as master, but both need to have voting powers. This means that one of
the three master-eligible nodes can be made as voting-only. If this voting-only node is a dedicated
master, a less powerful machine or a smaller heap-size can be chosen for this node. Alternatively, a
voting-only non-dedicated master node can play the role of the third master-eligible node, which
allows running an HA cluster with only two dedicated master nodes.

Closes #14340

Co-authored-by: David Turner <david.turner@elastic.co>
This commit is contained in:
Yannick Welsch 2019-06-25 17:29:30 +02:00
parent 11f41c4e7d
commit 2049f715b3
41 changed files with 2533 additions and 1576 deletions

0
A Normal file
View File

View File

@ -22,12 +22,14 @@ one of the following:
* an IP address or hostname, to add all matching nodes to the subset.
* a pattern, using `*` wildcards, which adds all nodes to the subset
whose name, address or hostname matches the pattern.
* `master:true`, `data:true`, `ingest:true` or `coordinating_only:true`, which
respectively add to the subset all master-eligible nodes, all data nodes,
all ingest nodes, and all coordinating-only nodes.
* `master:false`, `data:false`, `ingest:false` or `coordinating_only:false`,
which respectively remove from the subset all master-eligible nodes, all data
nodes, all ingest nodes, and all coordinating-only nodes.
* `master:true`, `data:true`, `ingest:true`, `voting_only:true` or
`coordinating_only:true`, which respectively add to the subset all
master-eligible nodes, all data nodes, all ingest nodes, all voting-only
nodes, and all coordinating-only nodes.
* `master:false`, `data:false`, `ingest:false`, `voting_only:true`, or
`coordinating_only:false`, which respectively remove from the subset all
master-eligible nodes, all data nodes, all ingest nodes, all voting-only
nodes and all coordinating-only nodes.
* a pair of patterns, using `*` wildcards, of the form `attrname:attrvalue`,
which adds to the subset all nodes with a custom node attribute whose name
and value match the respective patterns. Custom node attributes are
@ -46,6 +48,9 @@ means that filters such as `master:false` which remove nodes from the chosen
subset are only useful if they come after some other filters. When used on its
own, `master:false` selects no nodes.
NOTE: The `voting_only` role requires the {default-dist} of Elasticsearch and
is not supported in the {oss-dist}.
Here are some examples of the use of node filters with the
<<cluster-nodes-info,Nodes Info>> APIs.
@ -69,6 +74,7 @@ GET /_nodes/10.0.0.*
GET /_nodes/_all,master:false
GET /_nodes/data:true,ingest:true
GET /_nodes/coordinating_only:true
GET /_nodes/master:true,voting_only:false
# Select nodes by custom attribute (e.g. with something like `node.attr.rack: 2` in the configuration file)
GET /_nodes/rack:2
GET /_nodes/ra*:2

View File

@ -109,7 +109,8 @@ Will return, for example:
"data": 1,
"coordinating_only": 0,
"master": 1,
"ingest": 1
"ingest": 1,
"voting_only": 0
},
"versions": [
"{version}"
@ -207,6 +208,7 @@ Will return, for example:
// TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/]
// TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/]
// TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/]
// TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/]
// TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/]
// TESTRESPONSE[s/: true|false/: $body.$_path/]
// TESTRESPONSE[s/: (\-)?[0-9]+/: $body.$_path/]
@ -217,7 +219,10 @@ Will return, for example:
// see an exhaustive list anyway.
// 2. Similarly, ignore the contents of `network_types`, `discovery_types`, and
// `packaging_types`.
// 3. All of the numbers and strings on the right hand side of *every* field in
// 3. Ignore the contents of the (nodes) count object, as what's shown here
// depends on the license. Voting-only nodes are e.g. only shown when this
// test runs with a basic license.
// 4. All of the numbers and strings on the right hand side of *every* field in
// the response are ignored. So we're really only asserting things about the
// the shape of this response, not the values in it.

View File

@ -85,8 +85,9 @@ creating or deleting an index, tracking which nodes are part of the cluster,
and deciding which shards to allocate to which nodes. It is important for
cluster health to have a stable master node.
Any master-eligible node (all nodes by default) may be elected to become the
master node by the <<modules-discovery,master election process>>.
Any master-eligible node that is not a <<voting-only-node,voting-only node>> may
be elected to become the master node by the <<modules-discovery,master election
process>>.
IMPORTANT: Master nodes must have access to the `data/` directory (just like
`data` nodes) as this is where the cluster state is persisted between node restarts.
@ -135,6 +136,47 @@ cluster.remote.connect: false <4>
<3> Disable the `node.ingest` role (enabled by default).
<4> Disable {ccs} (enabled by default).
[float]
[[voting-only-node]]
==== Voting-only master-eligible node
A voting-only master-eligible node is a node that participates in
<<modules-discovery,master elections>> but which will not act as the cluster's
elected master node. In particular, a voting-only node can serve as a tiebreaker
in elections.
It may seem confusing to use the term "master-eligible" to describe a
voting-only node since such a node is not actually eligible to become the master
at all. This terminology is an unfortunate consequence of history:
master-eligible nodes are those nodes that participate in elections and perform
certain tasks during cluster state publications, and voting-only nodes have the
same responsibilities even if they can never become the elected master.
To configure a master-eligible node as a voting-only node, set the following
setting:
[source,yaml]
-------------------
node.voting_only: true <1>
-------------------
<1> The default for `node.voting_only` is `false`.
IMPORTANT: The `voting_only` role requires the {default-dist} of Elasticsearch
and is not supported in the {oss-dist}. If you use the {oss-dist} and set
`node.voting_only` then the node will fail to start. Also note that only
master-eligible nodes can be marked as voting-only.
High availability (HA) clusters require at least three master-eligible nodes, at
least two of which are not voting-only nodes. Such a cluster will be able to
elect a master node even if one of the nodes fails.
Since voting-only nodes never act as the cluster's elected master, they may
require require less heap and a less powerful CPU than the true master nodes.
However all master-eligible nodes, including voting-only nodes, require
reasonably fast persistent storage and a reliable and low-latency network
connection to the rest of the cluster, since they are on the critical path for
<<cluster-state-publishing,publishing cluster state updates>>.
[float]
[[data-node]]
=== Data Node

View File

@ -111,6 +111,10 @@ Example response:
"available" : true,
"enabled" : true
},
"voting_only" : {
"available" : true,
"enabled" : true
},
"watcher" : {
"available" : true,
"enabled" : true

View File

@ -122,14 +122,16 @@ public class ClusterFormationFailureHelper {
private final List<TransportAddress> resolvedAddresses;
private final List<DiscoveryNode> foundPeers;
private final long currentTerm;
private final ElectionStrategy electionStrategy;
ClusterFormationState(Settings settings, ClusterState clusterState, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers, long currentTerm) {
List<DiscoveryNode> foundPeers, long currentTerm, ElectionStrategy electionStrategy) {
this.settings = settings;
this.clusterState = clusterState;
this.resolvedAddresses = resolvedAddresses;
this.foundPeers = foundPeers;
this.currentTerm = currentTerm;
this.electionStrategy = electionStrategy;
}
String getDescription() {
@ -188,7 +190,9 @@ public class ClusterFormationFailureHelper {
final VoteCollection voteCollection = new VoteCollection();
foundPeers.forEach(voteCollection::addVote);
final String isQuorumOrNot
= CoordinationState.isElectionQuorum(voteCollection, clusterState) ? "is a quorum" : "is not a quorum";
= electionStrategy.isElectionQuorum(clusterState.nodes().getLocalNode(), currentTerm, clusterState.term(),
clusterState.version(), clusterState.getLastCommittedConfiguration(), clusterState.getLastAcceptedConfiguration(),
voteCollection) ? "is a quorum" : "is not a quorum";
return String.format(Locale.ROOT,
"master not discovered or elected yet, an election requires %s, have discovered %s which %s; %s",

View File

@ -24,13 +24,14 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
@ -44,6 +45,8 @@ public class CoordinationState {
private final DiscoveryNode localNode;
private final ElectionStrategy electionStrategy;
// persisted state
private final PersistedState persistedState;
@ -55,11 +58,12 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;
public CoordinationState(Settings settings, DiscoveryNode localNode, PersistedState persistedState) {
public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy) {
this.localNode = localNode;
// persisted state
this.persistedState = persistedState;
this.electionStrategy = electionStrategy;
// transient state
this.joinVotes = new VoteCollection();
@ -106,13 +110,9 @@ public class CoordinationState {
return electionWon;
}
public boolean isElectionQuorum(VoteCollection votes) {
return isElectionQuorum(votes, getLastAcceptedState());
}
static boolean isElectionQuorum(VoteCollection votes, ClusterState lastAcceptedState) {
return votes.isQuorum(lastAcceptedState.getLastCommittedConfiguration())
&& votes.isQuorum(lastAcceptedState.getLastAcceptedConfiguration());
public boolean isElectionQuorum(VoteCollection joinVotes) {
return electionStrategy.isElectionQuorum(localNode, getCurrentTerm(), getLastAcceptedTerm(), getLastAcceptedVersion(),
getLastCommittedConfiguration(), getLastAcceptedConfiguration(), joinVotes);
}
public boolean isPublishQuorum(VoteCollection votes) {
@ -123,6 +123,11 @@ public class CoordinationState {
return joinVotes.containsVoteFor(node);
}
// used for tests
boolean containsJoin(Join join) {
return joinVotes.getJoins().contains(join);
}
public boolean joinVotesHaveQuorumFor(VotingConfiguration votingConfiguration) {
return joinVotes.isQuorum(votingConfiguration);
}
@ -249,7 +254,7 @@ public class CoordinationState {
throw new CoordinationStateRejectedException("rejecting join since this node has not received its initial configuration yet");
}
boolean added = joinVotes.addVote(join.getSourceNode());
boolean added = joinVotes.addJoinVote(join);
boolean prevElectionWon = electionWon;
electionWon = isElectionQuorum(joinVotes);
assert !prevElectionWon || electionWon; // we cannot go from won to not won
@ -503,18 +508,28 @@ public class CoordinationState {
}
/**
* A collection of votes, used to calculate quorums.
* A collection of votes, used to calculate quorums. Optionally records the Joins as well.
*/
public static class VoteCollection {
private final Map<String, DiscoveryNode> nodes;
private final Set<Join> joins;
public boolean addVote(DiscoveryNode sourceNode) {
return nodes.put(sourceNode.getId(), sourceNode) == null;
}
public boolean addJoinVote(Join join) {
final boolean added = addVote(join.getSourceNode());
if (added) {
joins.add(join);
}
return added;
}
public VoteCollection() {
nodes = new HashMap<>();
joins = new HashSet<>();
}
public boolean isQuorum(VotingConfiguration configuration) {
@ -533,24 +548,31 @@ public class CoordinationState {
return Collections.unmodifiableCollection(nodes.values());
}
public Set<Join> getJoins() {
return Collections.unmodifiableSet(joins);
}
@Override
public String toString() {
return "VoteCollection{" + String.join(",", nodes.keySet()) + "}";
return "VoteCollection{votes=" + nodes.keySet() + ", joins=" + joins + "}";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!(o instanceof VoteCollection)) return false;
VoteCollection that = (VoteCollection) o;
return nodes.equals(that.nodes);
if (!nodes.equals(that.nodes)) return false;
return joins.equals(that.joins);
}
@Override
public int hashCode() {
return nodes.hashCode();
int result = nodes.hashCode();
result = 31 * result + joins.hashCode();
return result;
}
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.metadata.MetaData;
@ -105,6 +106,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Settings settings;
private final boolean singleNodeDiscovery;
private final ElectionStrategy electionStrategy;
private final TransportService transportService;
private final MasterService masterService;
private final AllocationService allocationService;
@ -156,13 +158,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
Consumer<String> reroute) {
Consumer<String> reroute, ElectionStrategy electionStrategy) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
reroute);
@ -174,7 +177,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen);
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy);
configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
@ -199,7 +202,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(),
Stream.concat(Stream.of(getLocalNode()), StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false))
.collect(Collectors.toList()), getCurrentTerm());
.collect(Collectors.toList()), getCurrentTerm(), electionStrategy);
}
private void onLeaderFailure(Exception e) {
@ -679,7 +682,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
protected void doStart() {
synchronized (mutex) {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
@ -1131,11 +1134,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
synchronized (mutex) {
final Iterable<DiscoveryNode> foundPeers = getFoundPeers();
if (mode == Mode.CANDIDATE) {
final CoordinationState.VoteCollection expectedVotes = new CoordinationState.VoteCollection();
final VoteCollection expectedVotes = new VoteCollection();
foundPeers.forEach(expectedVotes::addVote);
expectedVotes.addVote(Coordinator.this.getLocalNode());
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
final boolean foundQuorum = CoordinationState.isElectionQuorum(expectedVotes, lastAcceptedState);
final boolean foundQuorum = coordinationState.get().isElectionQuorum(expectedVotes);
if (foundQuorum) {
if (electionScheduler == null) {
@ -1338,6 +1340,18 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
final List<DiscoveryNode> masterCandidates = completedNodes().stream()
.filter(DiscoveryNode::isMasterNode)
.filter(node -> nodeMayWinElection(state, node))
.filter(node -> {
// check if master candidate would be able to get an election quorum if we were to
// abdicate to it. Assume that every node that completed the publication can provide
// a vote in that next election and has the latest state.
final long futureElectionTerm = state.term() + 1;
final VoteCollection futureVoteCollection = new VoteCollection();
completedNodes().forEach(completedNode -> futureVoteCollection.addJoinVote(
new Join(completedNode, node, futureElectionTerm, state.term(), state.version())));
return electionStrategy.isElectionQuorum(node, futureElectionTerm,
state.term(), state.version(), state.getLastCommittedConfiguration(),
state.getLastAcceptedConfiguration(), futureVoteCollection);
})
.collect(Collectors.toList());
if (masterCandidates.isEmpty() == false) {
abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size())));
@ -1378,7 +1392,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
@Override
protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) {
protected boolean isPublishQuorum(VoteCollection votes) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
return coordinationState.get().isPublishQuorum(votes);
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* Allows plugging in a custom election strategy, restricting the notion of an election quorum.
* Custom additional quorum restrictions can be defined by implementing the {@link #satisfiesAdditionalQuorumConstraints} method.
*/
public abstract class ElectionStrategy {
public static final ElectionStrategy DEFAULT_INSTANCE = new ElectionStrategy() {
@Override
protected boolean satisfiesAdditionalQuorumConstraints(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm,
long localAcceptedVersion, VotingConfiguration lastCommittedConfiguration,
VotingConfiguration lastAcceptedConfiguration, VoteCollection joinVotes) {
return true;
}
};
protected ElectionStrategy() {
}
/**
* Whether there is an election quorum from the point of view of the given local node under the provided voting configurations
*/
public final boolean isElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm, long localAcceptedVersion,
VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
VoteCollection joinVotes) {
return joinVotes.isQuorum(lastCommittedConfiguration) &&
joinVotes.isQuorum(lastAcceptedConfiguration) &&
satisfiesAdditionalQuorumConstraints(localNode, localCurrentTerm, localAcceptedTerm, localAcceptedVersion,
lastCommittedConfiguration, lastAcceptedConfiguration, joinVotes);
}
/**
* The extension point to be overridden by plugins. Defines additional constraints on the election quorum.
* @param localNode the local node for the election quorum
* @param localCurrentTerm the current term of the local node
* @param localAcceptedTerm the last accepted term of the local node
* @param localAcceptedVersion the last accepted version of the local node
* @param lastCommittedConfiguration the last committed configuration for the election quorum
* @param lastAcceptedConfiguration the last accepted configuration for the election quorum
* @param joinVotes the votes that were provided so far
* @return true iff the additional quorum constraints are satisfied
*/
protected abstract boolean satisfiesAdditionalQuorumConstraints(DiscoveryNode localNode,
long localCurrentTerm,
long localAcceptedTerm,
long localAcceptedVersion,
VotingConfiguration lastCommittedConfiguration,
VotingConfiguration lastAcceptedConfiguration,
VoteCollection joinVotes);
}

View File

@ -35,13 +35,12 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
public class PreVoteCollector {
@ -52,14 +51,17 @@ public class PreVoteCollector {
private final TransportService transportService;
private final Runnable startElection;
private final LongConsumer updateMaxTermSeen;
private final ElectionStrategy electionStrategy;
// Tuple for simple atomic updates. null until the first call to `update()`.
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader.
PreVoteCollector(final TransportService transportService, final Runnable startElection, final LongConsumer updateMaxTermSeen) {
PreVoteCollector(final TransportService transportService, final Runnable startElection, final LongConsumer updateMaxTermSeen,
final ElectionStrategy electionStrategy) {
this.transportService = transportService;
this.startElection = startElection;
this.updateMaxTermSeen = updateMaxTermSeen;
this.electionStrategy = electionStrategy;
// TODO does this need to be on the generic threadpool or can it use SAME?
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
@ -129,7 +131,7 @@ public class PreVoteCollector {
}
private class PreVotingRound implements Releasable {
private final Set<DiscoveryNode> preVotesReceived = newConcurrentSet();
private final Map<DiscoveryNode, PreVoteResponse> preVotesReceived = newConcurrentMap();
private final AtomicBoolean electionStarted = new AtomicBoolean();
private final PreVoteRequest preVoteRequest;
private final ClusterState clusterState;
@ -187,11 +189,20 @@ public class PreVoteCollector {
return;
}
preVotesReceived.add(sender);
final VoteCollection voteCollection = new VoteCollection();
preVotesReceived.forEach(voteCollection::addVote);
preVotesReceived.put(sender, response);
if (isElectionQuorum(voteCollection, clusterState) == false) {
// create a fake VoteCollection based on the pre-votes and check if there is an election quorum
final VoteCollection voteCollection = new VoteCollection();
final DiscoveryNode localNode = clusterState.nodes().getLocalNode();
final PreVoteResponse localPreVoteResponse = getPreVoteResponse();
preVotesReceived.forEach((node, preVoteResponse) -> voteCollection.addJoinVote(
new Join(node, localNode, preVoteResponse.getCurrentTerm(),
preVoteResponse.getLastAcceptedTerm(), preVoteResponse.getLastAcceptedVersion())));
if (electionStrategy.isElectionQuorum(clusterState.nodes().getLocalNode(), localPreVoteResponse.getCurrentTerm(),
localPreVoteResponse.getLastAcceptedTerm(), localPreVoteResponse.getLastAcceptedVersion(),
clusterState.getLastCommittedConfiguration(), clusterState.getLastAcceptedConfiguration(), voteCollection) == false) {
logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
return;
}

View File

@ -409,6 +409,11 @@ public class DiscoveryNode implements Writeable, ToXContentFragment {
sb.append('{').append(ephemeralId).append('}');
sb.append('{').append(hostName).append('}');
sb.append('{').append(address).append('}');
if (roles.isEmpty() == false) {
sb.append('{');
roles.stream().map(DiscoveryNodeRole::roleNameAbbreviation).sorted().forEach(sb::append);
sb.append('}');
}
if (!attributes.isEmpty()) {
sb.append(attributes);
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import java.io.IOException;
import java.util.ArrayList;
@ -378,6 +379,17 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
resolvedNodesIds.removeAll(getCoordinatingOnlyNodes().keys());
}
} else {
for (DiscoveryNode node : this) {
for (DiscoveryNodeRole role : Sets.difference(node.getRoles(), DiscoveryNodeRole.BUILT_IN_ROLES)) {
if (role.roleName().equals(matchAttrName)) {
if (Booleans.parseBoolean(matchAttrValue, true)) {
resolvedNodesIds.add(node.getId());
} else {
resolvedNodesIds.remove(node.getId());
}
}
}
}
for (DiscoveryNode node : this) {
for (Map.Entry<String, String> entry : node.getAttributes().entrySet()) {
String attrName = entry.getKey();

View File

@ -405,6 +405,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
DiscoveryModule.ELECTION_STRATEGY_SETTING,
SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING,
SettingsBasedSeedHostsProvider.LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
SeedHostsResolver.DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING,

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -80,6 +81,11 @@ public class DiscoveryModule {
Setting.listSetting("discovery.seed_providers", Collections.emptyList(), Function.identity(),
Property.NodeScope);
public static final String DEFAULT_ELECTION_STRATEGY = "default";
public static final Setting<String> ELECTION_STRATEGY_SETTING =
new Setting<>("cluster.election.strategy", DEFAULT_ELECTION_STRATEGY, Function.identity(), Property.NodeScope);
private final Discovery discovery;
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
@ -91,6 +97,8 @@ public class DiscoveryModule {
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
hostProviders.put("file", () -> new FileBasedSeedHostsProvider(configFile));
final Map<String, ElectionStrategy> electionStrategies = new HashMap<>();
electionStrategies.put(DEFAULT_ELECTION_STRATEGY, ElectionStrategy.DEFAULT_INSTANCE);
for (DiscoveryPlugin plugin : plugins) {
plugin.getSeedHostProviders(transportService, networkService).forEach((key, value) -> {
if (hostProviders.put(key, value) != null) {
@ -101,6 +109,11 @@ public class DiscoveryModule {
if (joinValidator != null) {
joinValidators.add(joinValidator);
}
plugin.getElectionStrategies().forEach((key, value) -> {
if (electionStrategies.put(key, value) != null) {
throw new IllegalArgumentException("Cannot register election strategy [" + key + "] twice");
}
});
}
List<String> seedProviderNames = getSeedProviderNames(settings);
@ -131,12 +144,17 @@ public class DiscoveryModule {
return Collections.unmodifiableList(addresses);
};
final ElectionStrategy electionStrategy = electionStrategies.get(ELECTION_STRATEGY_SETTING.get(settings));
if (electionStrategy == null) {
throw new IllegalArgumentException("Unknown election strategy " + ELECTION_STRATEGY_SETTING.get(settings));
}
if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), routingService::reroute);
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), routingService::reroute, electionStrategy);
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, routingService::reroute);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.plugins;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
@ -85,4 +86,11 @@ public interface DiscoveryPlugin {
* {@link IllegalStateException} if the node and the cluster-state are incompatible.
*/
default BiConsumer<DiscoveryNode,ClusterState> getJoinValidator() { return null; }
/**
* Allows plugging in election strategies (see {@link ElectionStrategy}) that define a customized notion of an election quorum.
*/
default Map<String, ElectionStrategy> getElectionStrategies() {
return Collections.emptyMap();
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.Clus
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -44,9 +45,12 @@ import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INI
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.oneOf;
public class ClusterFormationFailureHelperTests extends ESTestCase {
private static final ElectionStrategy electionStrategy = ElectionStrategy.DEFAULT_INSTANCE;
public void testScheduling() {
final long expectedDelayMillis;
final Settings.Builder settingsBuilder = Settings.builder();
@ -72,7 +76,7 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(),
() -> {
warningCount.incrementAndGet();
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L);
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy);
},
deterministicTaskQueue.getThreadPool(), logLastFailedJoinAttemptWarningCount::incrementAndGet);
@ -140,17 +144,20 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.version(12L).nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy)
.getDescription(),
is("master not discovered yet: have discovered []; discovery will continue using [] from hosts providers " +
"and [] from last-known cluster state; node term 15, last-accepted version 12 in term 0"));
final TransportAddress otherAddress = buildNewFakeTransportAddress();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 16L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 16L, electionStrategy)
.getDescription(),
is("master not discovered yet: have discovered []; discovery will continue using [" + otherAddress +
"] from hosts providers and [] from last-known cluster state; node term 16, last-accepted version 12 in term 0"));
final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 17L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 17L, electionStrategy)
.getDescription(),
is("master not discovered yet: have discovered [" + otherNode + "]; discovery will continue using [] from hosts providers " +
"and [] from last-known cluster state; node term 17, last-accepted version 12 in term 0"));
}
@ -162,28 +169,30 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
.metaData(MetaData.builder().coordinationMetaData(CoordinationMetaData.builder().term(4L).build()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 1L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 1L, electionStrategy).getDescription(),
is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " +
"[cluster.initial_master_nodes] is empty on this node: have discovered []; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 1, last-accepted version 7 in term 4"));
final TransportAddress otherAddress = buildNewFakeTransportAddress();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 2L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 2L, electionStrategy)
.getDescription(),
is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " +
"[cluster.initial_master_nodes] is empty on this node: have discovered []; " +
"discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 2, last-accepted version 7 in term 4"));
final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 3L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 3L, electionStrategy)
.getDescription(),
is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " +
"[cluster.initial_master_nodes] is empty on this node: have discovered [" + otherNode + "]; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 3, last-accepted version 7 in term 4"));
assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other").build(),
clusterState, emptyList(), emptyList(), 4L).getDescription(),
clusterState, emptyList(), emptyList(), 4L, electionStrategy).getDescription(),
is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " +
"this node must discover master-eligible nodes [other] to bootstrap a cluster: have discovered []; " +
"discovery will continue using [] from hosts providers and [" + localNode +
@ -213,28 +222,31 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
final ClusterState clusterState = state(localNode,
VotingConfiguration.MUST_JOIN_ELECTED_MASTER.getNodeIds().toArray(new String[0]));
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
is("master not discovered yet and this node was detached from its previous cluster, " +
"have discovered []; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final TransportAddress otherAddress = buildNewFakeTransportAddress();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy)
.getDescription(),
is("master not discovered yet and this node was detached from its previous cluster, " +
"have discovered []; " +
"discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy)
.getDescription(),
is("master not discovered yet and this node was detached from its previous cluster, " +
"have discovered [" + otherNode + "]; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final DiscoveryNode yetAnotherNode = new DiscoveryNode("yetAnotherNode", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy)
.getDescription(),
is("master not discovered yet and this node was detached from its previous cluster, " +
"have discovered [" + yetAnotherNode + "]; " +
"discovery will continue using [] from hosts providers and [" + localNode +
@ -247,104 +259,109 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
final ClusterState clusterState = state(localNode, "otherNode");
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [otherNode], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final TransportAddress otherAddress = buildNewFakeTransportAddress();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy)
.getDescription(),
is("master not discovered or elected yet, an election requires a node with id [otherNode], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy)
.getDescription(),
is("master not discovered or elected yet, an election requires a node with id [otherNode], " +
"have discovered [" + otherNode + "] which is a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final DiscoveryNode yetAnotherNode = new DiscoveryNode("yetAnotherNode", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy)
.getDescription(),
is("master not discovered or elected yet, an election requires a node with id [otherNode], " +
"have discovered [" + yetAnotherNode + "] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2"), emptyList(), emptyList(), 0L).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2"), emptyList(), emptyList(), 0L, electionStrategy)
.getDescription(),
is("master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3"), emptyList(), emptyList(), 0L)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3"), emptyList(), emptyList(), 0L,
electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires at least 2 nodes with ids from [n1, n2, n3], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", BOOTSTRAP_PLACEHOLDER_PREFIX + "n3"),
emptyList(), emptyList(), 0L).getDescription(),
emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires 2 nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4"), emptyList(), emptyList(), 0L)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4"), emptyList(), emptyList(), 0L,
electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", "n5"), emptyList(), emptyList(), 0L)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", "n5"), emptyList(), emptyList(), 0L,
electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4, n5], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"),
emptyList(), emptyList(), 0L).getDescription(),
emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3",
BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L).getDescription(),
BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L, electionStrategy)
.getDescription(),
is("master not discovered or elected yet, an election requires 3 nodes with ids [n1, n2, n3], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n1"}), emptyList(),
emptyList(), 0L).getDescription(),
emptyList(), 0L, electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2"}), emptyList(),
emptyList(), 0L).getDescription(),
emptyList(), 0L, electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1] and a node with id [n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3"}), emptyList(),
emptyList(), 0L).getDescription(),
emptyList(), 0L, electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1] and two nodes with ids [n2, n3], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3", "n4"}),
emptyList(), emptyList(), 0L).getDescription(),
emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1] and " +
"at least 2 nodes with ids from [n2, n3, n4], " +
"have discovered [] which is not a quorum; " +
@ -353,8 +370,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
final DiscoveryNode otherMasterNode = new DiscoveryNode("other-master", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNonMasterNode = new DiscoveryNode("other-non-master", buildNewFakeTransportAddress(), emptyMap(),
new HashSet<>(randomSubsetOf(Arrays.stream(DiscoveryNode.Role.values())
.filter(r -> r != DiscoveryNode.Role.MASTER).collect(Collectors.toList()))),
new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES).stream()
.filter(r -> r != DiscoveryNodeRole.MASTER_ROLE).collect(Collectors.toList())),
Version.CURRENT);
String[] configNodeIds = new String[]{"n1", "n2"};
@ -364,18 +381,19 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
.lastAcceptedConfiguration(config(configNodeIds))
.lastCommittedConfiguration(config(configNodeIds)).build())).build();
assertThat(new ClusterFormationState(Settings.EMPTY, stateWithOtherNodes, emptyList(), emptyList(), 0L).getDescription(), isOneOf(
assertThat(
new ClusterFormationState(Settings.EMPTY, stateWithOtherNodes, emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
// nodes from last-known cluster state could be in either order
is(oneOf(
"master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode + ", " + otherMasterNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0",
"master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode + ", " + otherMasterNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0",
"master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + otherMasterNode + ", " + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
"master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + otherMasterNode + ", " + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0")));
}
}

View File

@ -32,23 +32,17 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class CoordinationStateTests extends ESTestCase {
@ -240,6 +234,7 @@ public class CoordinationStateTests extends ESTestCase {
assertTrue(cs1.handleJoin(join));
assertTrue(cs1.electionWon());
assertTrue(cs1.containsJoinVoteFor(node1));
assertTrue(cs1.containsJoin(join));
assertFalse(cs1.containsJoinVoteFor(node2));
assertEquals(cs1.getLastPublishedVersion(), cs1.getLastAcceptedVersion());
assertFalse(cs1.handleJoin(join));
@ -322,7 +317,10 @@ public class CoordinationStateTests extends ESTestCase {
Join v2 = cs2.handleStartJoin(startJoinRequest1);
assertTrue(cs1.handleJoin(v1));
assertTrue(cs1.electionWon());
assertTrue(cs1.containsJoin(v1));
assertFalse(cs1.containsJoin(v2));
assertTrue(cs1.handleJoin(v2));
assertTrue(cs1.containsJoin(v2));
VotingConfiguration newConfig = new VotingConfiguration(Collections.singleton(node2.getId()));
@ -766,12 +764,14 @@ public class CoordinationStateTests extends ESTestCase {
}
public void testSafety() {
new Cluster(randomIntBetween(1, 5)).runRandomly();
new CoordinationStateTestCluster(IntStream.range(0, randomIntBetween(1, 5))
.mapToObj(i -> new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), Version.CURRENT))
.collect(Collectors.toList()), ElectionStrategy.DEFAULT_INSTANCE)
.runRandomly();
}
public static CoordinationState createCoordinationState(PersistedState storage, DiscoveryNode localNode) {
final Settings initialSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), localNode.getId()).build();
return new CoordinationState(initialSettings, localNode, storage);
return new CoordinationState(localNode, storage, ElectionStrategy.DEFAULT_INSTANCE);
}
public static ClusterState clusterState(long term, long version, DiscoveryNode localNode, VotingConfiguration lastCommittedConfig,
@ -810,181 +810,4 @@ public class CoordinationStateTests extends ESTestCase {
public static long value(ClusterState clusterState) {
return clusterState.metaData().persistentSettings().getAsLong("value", 0L);
}
static class ClusterNode {
final DiscoveryNode localNode;
final PersistedState persistedState;
CoordinationState state;
ClusterNode(DiscoveryNode localNode) {
this.localNode = localNode;
persistedState = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
state = new CoordinationState(Settings.EMPTY, localNode, persistedState);
}
void reboot() {
state = new CoordinationState(Settings.EMPTY, localNode, persistedState);
}
void setInitialState(VotingConfiguration initialConfig, long initialValue) {
final ClusterState.Builder builder = ClusterState.builder(state.getLastAcceptedState());
builder.metaData(MetaData.builder()
.coordinationMetaData(CoordinationMetaData.builder()
.lastAcceptedConfiguration(initialConfig)
.lastCommittedConfiguration(initialConfig)
.build()));
state.setInitialState(setValue(builder.build(), initialValue));
}
}
static class Cluster {
final List<Message> messages;
final List<ClusterNode> clusterNodes;
final VotingConfiguration initialConfiguration;
final long initialValue;
Cluster(int numNodes) {
messages = new ArrayList<>();
clusterNodes = IntStream.range(0, numNodes)
.mapToObj(i -> new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), Version.CURRENT))
.map(ClusterNode::new)
.collect(Collectors.toList());
initialConfiguration = randomVotingConfig();
initialValue = randomLong();
}
static class Message {
final DiscoveryNode sourceNode;
final DiscoveryNode targetNode;
final Object payload;
Message(DiscoveryNode sourceNode, DiscoveryNode targetNode, Object payload) {
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.payload = payload;
}
}
void reply(Message m, Object payload) {
messages.add(new Message(m.targetNode, m.sourceNode, payload));
}
void broadcast(DiscoveryNode sourceNode, Object payload) {
messages.addAll(clusterNodes.stream().map(cn -> new Message(sourceNode, cn.localNode, payload)).collect(Collectors.toList()));
}
Optional<ClusterNode> getNode(DiscoveryNode node) {
return clusterNodes.stream().filter(cn -> cn.localNode.equals(node)).findFirst();
}
VotingConfiguration randomVotingConfig() {
return new VotingConfiguration(
randomSubsetOf(randomIntBetween(1, clusterNodes.size()), clusterNodes).stream()
.map(cn -> cn.localNode.getId()).collect(toSet()));
}
void applyMessage(Message message) {
final Optional<ClusterNode> maybeNode = getNode(message.targetNode);
if (maybeNode.isPresent() == false) {
throw new CoordinationStateRejectedException("node not available");
} else {
final Object payload = message.payload;
if (payload instanceof StartJoinRequest) {
reply(message, maybeNode.get().state.handleStartJoin((StartJoinRequest) payload));
} else if (payload instanceof Join) {
maybeNode.get().state.handleJoin((Join) payload);
} else if (payload instanceof PublishRequest) {
reply(message, maybeNode.get().state.handlePublishRequest((PublishRequest) payload));
} else if (payload instanceof PublishResponse) {
maybeNode.get().state.handlePublishResponse(message.sourceNode, (PublishResponse) payload)
.ifPresent(ac -> broadcast(message.targetNode, ac));
} else if (payload instanceof ApplyCommitRequest) {
maybeNode.get().state.handleCommit((ApplyCommitRequest) payload);
} else {
throw new AssertionError("unknown message type");
}
}
}
void runRandomly() {
final int iterations = 10000;
final long maxTerm = 4;
long nextTerm = 1;
for (int i = 0; i < iterations; i++) {
try {
if (rarely() && nextTerm < maxTerm) {
final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : nextTerm++;
final StartJoinRequest startJoinRequest = new StartJoinRequest(randomFrom(clusterNodes).localNode, term);
broadcast(startJoinRequest.getSourceNode(), startJoinRequest);
} else if (rarely()) {
randomFrom(clusterNodes).setInitialState(initialConfiguration, initialValue);
} else if (rarely() && rarely()) {
randomFrom(clusterNodes).reboot();
} else if (rarely()) {
final List<ClusterNode> masterNodes = clusterNodes.stream().filter(cn -> cn.state.electionWon())
.collect(Collectors.toList());
if (masterNodes.isEmpty() == false) {
final ClusterNode clusterNode = randomFrom(masterNodes);
final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : clusterNode.state.getCurrentTerm();
final long version = rarely() ? randomIntBetween(0, 5) : clusterNode.state.getLastPublishedVersion() + 1;
final VotingConfiguration acceptedConfig = rarely() ? randomVotingConfig() :
clusterNode.state.getLastAcceptedConfiguration();
final PublishRequest publishRequest = clusterNode.state.handleClientValue(
clusterState(term, version, clusterNode.localNode, clusterNode.state.getLastCommittedConfiguration(),
acceptedConfig, randomLong()));
broadcast(clusterNode.localNode, publishRequest);
}
} else if (messages.isEmpty() == false) {
applyMessage(randomFrom(messages));
}
// check node invariants after each iteration
clusterNodes.forEach(cn -> cn.state.invariant());
} catch (CoordinationStateRejectedException e) {
// ignore
}
}
// check system invariants. It's sufficient to do this at the end as these invariants are monotonic.
invariant();
}
void invariant() {
// one master per term
messages.stream().filter(m -> m.payload instanceof PublishRequest)
.collect(Collectors.groupingBy(m -> ((PublishRequest) m.payload).getAcceptedState().term()))
.forEach((term, publishMessages) -> {
Set<DiscoveryNode> mastersForTerm = publishMessages.stream().collect(Collectors.groupingBy(m -> m.sourceNode)).keySet();
assertThat("Multiple masters " + mastersForTerm + " for term " + term, mastersForTerm, hasSize(1));
});
// unique cluster state per (term, version) pair
messages.stream().filter(m -> m.payload instanceof PublishRequest)
.map(m -> ((PublishRequest) m.payload).getAcceptedState())
.collect(Collectors.groupingBy(ClusterState::term))
.forEach((term, clusterStates) -> {
clusterStates.stream().collect(Collectors.groupingBy(ClusterState::version))
.forEach((version, clusterStates1) -> {
Set<String> clusterStateUUIDsForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy(
ClusterState::stateUUID
)).keySet();
assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version,
clusterStateUUIDsForTermAndVersion, hasSize(1));
Set<Long> clusterStateValuesForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy(
CoordinationStateTests::value
)).keySet();
assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version,
clusterStateValuesForTermAndVersion, hasSize(1));
});
});
}
}
}

View File

@ -174,7 +174,7 @@ public class NodeJoinTests extends ESTestCase {
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
Collections.emptyList(),
random, s -> {});
random, s -> {}, ElectionStrategy.DEFAULT_INSTANCE);
transportService.start();
transportService.acceptIncomingRequests();
transport = capturingTransport;

View File

@ -114,7 +114,7 @@ public class PreVoteCollectorTests extends ESTestCase {
assert electionOccurred == false;
electionOccurred = true;
}, l -> {
}); // TODO need tests that check that the max term seen is updated
}, ElectionStrategy.DEFAULT_INSTANCE); // TODO need tests that check that the max term seen is updated
preVoteCollector.update(getLocalPreVoteResponse(), null);
}
@ -233,8 +233,8 @@ public class PreVoteCollectorTests extends ESTestCase {
DiscoveryNode[] votingNodes = votingNodesSet.toArray(new DiscoveryNode[0]);
startAndRunCollector(votingNodes);
final CoordinationState coordinationState = new CoordinationState(Settings.EMPTY, localNode,
new InMemoryPersistedState(currentTerm, makeClusterState(votingNodes)));
final CoordinationState coordinationState = new CoordinationState(localNode,
new InMemoryPersistedState(currentTerm, makeClusterState(votingNodes)), ElectionStrategy.DEFAULT_INSTANCE);
final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);

View File

@ -72,7 +72,8 @@ public class PublicationTests extends ESTestCase {
this.localNode = localNode;
ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode,
CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG, CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG, 0L);
coordinationState = new CoordinationState(settings, localNode, new InMemoryPersistedState(0L, initialState));
coordinationState = new CoordinationState(localNode, new InMemoryPersistedState(0L, initialState),
ElectionStrategy.DEFAULT_INSTANCE);
}
final DiscoveryNode localNode;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.node;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.Version;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
@ -240,8 +241,16 @@ public class DiscoveryNodesTests extends ESTestCase {
if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5));
}
final DiscoveryNode node = newNode(idGenerator.getAndIncrement(), attributes,
new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)));
final Set<DiscoveryNodeRole> roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES));
if (frequently()) {
roles.add(new DiscoveryNodeRole("custom_role", "cr") {
@Override
protected Setting<Boolean> roleSetting() {
return null;
}
});
}
final DiscoveryNode node = newNode(idGenerator.getAndIncrement(), attributes, roles);
nodesList.add(node);
}
return nodesList;
@ -314,6 +323,17 @@ public class DiscoveryNodesTests extends ESTestCase {
});
return ids;
}
}, CUSTOM_ROLE("custom_role:true") {
@Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) {
Set<String> ids = new HashSet<>();
nodes.getNodes().valuesIt().forEachRemaining(node -> {
if (node.getRoles().stream().anyMatch(role -> role.roleName().equals("custom_role"))) {
ids.add(node.getId());
}
});
return ids;
}
};
private final String selector;

View File

@ -87,6 +87,7 @@ import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.CoordinatorTests;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.MockSinglePrioritizingExecutor;
import org.elasticsearch.cluster.metadata.AliasValidator;
@ -1247,7 +1248,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(n -> n.node.getAddress()).collect(Collectors.toList()),
clusterService.getClusterApplierService(), Collections.emptyList(), random(),
new RoutingService(clusterService, allocationService)::reroute);
new RoutingService(clusterService, allocationService)::reroute, ElectionStrategy.DEFAULT_INSTANCE);
masterService.setClusterStatePublisher(coordinator);
coordinator.start();
masterService.start();

View File

@ -0,0 +1,264 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static com.carrotsearch.randomizedtesting.RandomizedTest.rarely;
import static java.util.stream.Collectors.toSet;
import static org.apache.lucene.util.LuceneTestCase.random;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.elasticsearch.test.ESTestCase.randomLong;
import static org.elasticsearch.test.ESTestCase.randomLongBetween;
import static org.elasticsearch.test.ESTestCase.randomSubsetOf;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;
public class CoordinationStateTestCluster {
public static ClusterState clusterState(long term, long version, DiscoveryNode localNode,
CoordinationMetaData.VotingConfiguration lastCommittedConfig,
CoordinationMetaData.VotingConfiguration lastAcceptedConfig, long value) {
return clusterState(term, version, DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build(),
lastCommittedConfig, lastAcceptedConfig, value);
}
public static ClusterState clusterState(long term, long version, DiscoveryNodes discoveryNodes,
CoordinationMetaData.VotingConfiguration lastCommittedConfig,
CoordinationMetaData.VotingConfiguration lastAcceptedConfig, long value) {
return setValue(ClusterState.builder(ClusterName.DEFAULT)
.version(version)
.nodes(discoveryNodes)
.metaData(MetaData.builder()
.clusterUUID(UUIDs.randomBase64UUID(random())) // generate cluster UUID deterministically for repeatable tests
.coordinationMetaData(CoordinationMetaData.builder()
.term(term)
.lastCommittedConfiguration(lastCommittedConfig)
.lastAcceptedConfiguration(lastAcceptedConfig)
.build()))
.stateUUID(UUIDs.randomBase64UUID(random())) // generate cluster state UUID deterministically for repeatable tests
.build(), value);
}
public static ClusterState setValue(ClusterState clusterState, long value) {
return ClusterState.builder(clusterState).metaData(
MetaData.builder(clusterState.metaData())
.persistentSettings(Settings.builder()
.put(clusterState.metaData().persistentSettings())
.put("value", value)
.build())
.build())
.build();
}
public static long value(ClusterState clusterState) {
return clusterState.metaData().persistentSettings().getAsLong("value", 0L);
}
static class ClusterNode {
final DiscoveryNode localNode;
final CoordinationState.PersistedState persistedState;
private final ElectionStrategy electionStrategy;
CoordinationState state;
ClusterNode(DiscoveryNode localNode, ElectionStrategy electionStrategy) {
this.localNode = localNode;
persistedState = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG,
CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG, 0L));
this.electionStrategy = electionStrategy;
state = new CoordinationState(localNode, persistedState, electionStrategy);
}
void reboot() {
state = new CoordinationState(localNode, persistedState, electionStrategy);
}
void setInitialState(CoordinationMetaData.VotingConfiguration initialConfig, long initialValue) {
final ClusterState.Builder builder = ClusterState.builder(state.getLastAcceptedState());
builder.metaData(MetaData.builder()
.coordinationMetaData(CoordinationMetaData.builder()
.lastAcceptedConfiguration(initialConfig)
.lastCommittedConfiguration(initialConfig)
.build()));
state.setInitialState(setValue(builder.build(), initialValue));
}
}
final ElectionStrategy electionStrategy;
final List<Message> messages;
final List<ClusterNode> clusterNodes;
final CoordinationMetaData.VotingConfiguration initialConfiguration;
final long initialValue;
CoordinationStateTestCluster(List<DiscoveryNode> nodes, ElectionStrategy electionStrategy) {
this.electionStrategy = electionStrategy;
messages = new ArrayList<>();
clusterNodes = nodes.stream()
.map(node -> new ClusterNode(node, electionStrategy))
.collect(Collectors.toList());
initialConfiguration = randomVotingConfig();
initialValue = randomLong();
}
static class Message {
final DiscoveryNode sourceNode;
final DiscoveryNode targetNode;
final Object payload;
Message(DiscoveryNode sourceNode, DiscoveryNode targetNode, Object payload) {
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.payload = payload;
}
}
void reply(Message m, Object payload) {
messages.add(new Message(m.targetNode, m.sourceNode, payload));
}
void broadcast(DiscoveryNode sourceNode, Object payload) {
messages.addAll(clusterNodes.stream().map(cn -> new Message(sourceNode, cn.localNode, payload)).collect(Collectors.toList()));
}
Optional<ClusterNode> getNode(DiscoveryNode node) {
return clusterNodes.stream().filter(cn -> cn.localNode.equals(node)).findFirst();
}
CoordinationMetaData.VotingConfiguration randomVotingConfig() {
return new CoordinationMetaData.VotingConfiguration(
randomSubsetOf(randomIntBetween(1, clusterNodes.size()), clusterNodes).stream()
.map(cn -> cn.localNode.getId()).collect(toSet()));
}
void applyMessage(Message message) {
final Optional<ClusterNode> maybeNode = getNode(message.targetNode);
if (maybeNode.isPresent() == false) {
throw new CoordinationStateRejectedException("node not available");
} else {
final Object payload = message.payload;
if (payload instanceof StartJoinRequest) {
reply(message, maybeNode.get().state.handleStartJoin((StartJoinRequest) payload));
} else if (payload instanceof Join) {
maybeNode.get().state.handleJoin((Join) payload);
} else if (payload instanceof PublishRequest) {
reply(message, maybeNode.get().state.handlePublishRequest((PublishRequest) payload));
} else if (payload instanceof PublishResponse) {
maybeNode.get().state.handlePublishResponse(message.sourceNode, (PublishResponse) payload)
.ifPresent(ac -> broadcast(message.targetNode, ac));
} else if (payload instanceof ApplyCommitRequest) {
maybeNode.get().state.handleCommit((ApplyCommitRequest) payload);
} else {
throw new AssertionError("unknown message type");
}
}
}
void runRandomly() {
final int iterations = 10000;
final long maxTerm = 4;
long nextTerm = 1;
for (int i = 0; i < iterations; i++) {
try {
if (rarely() && nextTerm < maxTerm) {
final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : nextTerm++;
final StartJoinRequest startJoinRequest = new StartJoinRequest(randomFrom(clusterNodes).localNode, term);
broadcast(startJoinRequest.getSourceNode(), startJoinRequest);
} else if (rarely()) {
randomFrom(clusterNodes).setInitialState(initialConfiguration, initialValue);
} else if (rarely() && rarely()) {
randomFrom(clusterNodes).reboot();
} else if (rarely()) {
final List<ClusterNode> masterNodes = clusterNodes.stream().filter(cn -> cn.state.electionWon())
.collect(Collectors.toList());
if (masterNodes.isEmpty() == false) {
final ClusterNode clusterNode = randomFrom(masterNodes);
final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : clusterNode.state.getCurrentTerm();
final long version = rarely() ? randomIntBetween(0, 5) : clusterNode.state.getLastPublishedVersion() + 1;
final CoordinationMetaData.VotingConfiguration acceptedConfig = rarely() ? randomVotingConfig() :
clusterNode.state.getLastAcceptedConfiguration();
final PublishRequest publishRequest = clusterNode.state.handleClientValue(
clusterState(term, version, clusterNode.localNode, clusterNode.state.getLastCommittedConfiguration(),
acceptedConfig, randomLong()));
broadcast(clusterNode.localNode, publishRequest);
}
} else if (messages.isEmpty() == false) {
applyMessage(randomFrom(messages));
}
// check node invariants after each iteration
clusterNodes.forEach(cn -> cn.state.invariant());
} catch (CoordinationStateRejectedException e) {
// ignore
}
}
// check system invariants. It's sufficient to do this at the end as these invariants are monotonic.
invariant();
}
void invariant() {
// one master per term
messages.stream().filter(m -> m.payload instanceof PublishRequest)
.collect(Collectors.groupingBy(m -> ((PublishRequest) m.payload).getAcceptedState().term()))
.forEach((term, publishMessages) -> {
Set<DiscoveryNode> mastersForTerm = publishMessages.stream().collect(Collectors.groupingBy(m -> m.sourceNode)).keySet();
assertThat("Multiple masters " + mastersForTerm + " for term " + term, mastersForTerm, hasSize(1));
});
// unique cluster state per (term, version) pair
messages.stream().filter(m -> m.payload instanceof PublishRequest)
.map(m -> ((PublishRequest) m.payload).getAcceptedState())
.collect(Collectors.groupingBy(ClusterState::term))
.forEach((term, clusterStates) -> {
clusterStates.stream().collect(Collectors.groupingBy(ClusterState::version))
.forEach((version, clusterStates1) -> {
Set<String> clusterStateUUIDsForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy(
ClusterState::stateUUID
)).keySet();
assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version,
clusterStateUUIDsForTermAndVersion, hasSize(1));
Set<Long> clusterStateValuesForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy(
CoordinationStateTestCluster::value
)).keySet();
assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version,
clusterStateValuesForTermAndVersion, hasSize(1));
});
});
}
}

View File

@ -37,8 +37,6 @@ import static org.mockito.Mockito.mock;
* It's not always easy / convenient to construct these dependencies.
* This class constructor takes far fewer dependencies and constructs usable {@link GatewayMetaState} with 2 restrictions:
* no metadata upgrade will be performed and no cluster state updaters will be run. This is sufficient for most of the tests.
* Metadata upgrade is tested in {@link GatewayMetaStateTests} and different {@link ClusterStateUpdaters} in
* {@link ClusterStateUpdatersTests}.
*/
public class MockGatewayMetaState extends GatewayMetaState {
private final DiscoveryNode localNode;

View File

@ -608,6 +608,15 @@ public class XPackLicenseState {
return status.active;
}
/**
* Voting only node functionality is always available as long as there is a valid license
*
* @return true if the license is active
*/
public synchronized boolean isVotingOnlyAllowed() {
return status.active;
}
/**
* Logstash is allowed as long as there is an active license of type TRIAL, STANDARD, GOLD or PLATINUM
* @return {@code true} as long as there is a valid license

View File

@ -206,6 +206,7 @@ import org.elasticsearch.xpack.core.ssl.action.GetCertificateInfoAction;
import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeAction;
import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeInfoAction;
import org.elasticsearch.xpack.core.vectors.VectorsFeatureSetUsage;
import org.elasticsearch.xpack.core.votingonly.VotingOnlyNodeFeatureSetUsage;
import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction;
@ -516,7 +517,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
new NamedWriteableRegistry.Entry(PersistentTaskState.class, DataFrameField.TASK_NAME, DataFrameTransformState::new),
new NamedWriteableRegistry.Entry(SyncConfig.class, DataFrameField.TIME_BASED_SYNC.getPreferredName(), TimeSyncConfig::new),
// Vectors
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.VECTORS, VectorsFeatureSetUsage::new)
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.VECTORS, VectorsFeatureSetUsage::new),
// Voting Only Node
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.VOTING_ONLY, VotingOnlyNodeFeatureSetUsage::new)
);
}

View File

@ -39,6 +39,8 @@ public final class XPackField {
public static final String DATA_FRAME = "data_frame";
/** Name constant for the vectors feature. */
public static final String VECTORS = "vectors";
/** Name constant for the voting-only-node feature. */
public static final String VOTING_ONLY = "voting_only";
private XPackField() {}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.votingonly;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import java.io.IOException;
public class VotingOnlyNodeFeatureSetUsage extends XPackFeatureSet.Usage {
public VotingOnlyNodeFeatureSetUsage(StreamInput input) throws IOException {
super(input);
}
public VotingOnlyNodeFeatureSetUsage(boolean available) {
super(XPackField.VOTING_ONLY, available, true);
}
}

View File

@ -14,9 +14,11 @@ import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
@ -76,10 +78,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
@ -333,6 +337,7 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
filterPlugins(Plugin.class).stream().forEach(p -> builders.addAll(p.getExecutorBuilders(settings)));
return builders;
}
@Override
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
return templates -> {
@ -343,6 +348,20 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
};
}
@Override
public Map<String, ElectionStrategy> getElectionStrategies() {
Map<String, ElectionStrategy> electionStrategies = new HashMap<>();
filterPlugins(DiscoveryPlugin.class).stream().forEach(p -> electionStrategies.putAll(p.getElectionStrategies()));
return electionStrategies;
}
@Override
public Set<DiscoveryNodeRole> getRoles() {
Set<DiscoveryNodeRole> roles = new HashSet<>();
filterPlugins(Plugin.class).stream().forEach(p -> roles.addAll(p.getRoles()));
return roles;
}
@Override
public Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>> getTokenizers() {
Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>> tokenizers = new HashMap<>();

View File

@ -0,0 +1,16 @@
---
"cluster stats with voting only node stats":
- skip:
version: " - 7.99.99"
reason: "voting only nodes are added in v8.0.0"
- do:
cluster.stats: {}
- gte: { nodes.count.total: 1}
- gte: { nodes.count.master: 1}
- gte: { nodes.count.data: 1}
- gte: { nodes.count.ingest: 0}
- gte: { nodes.count.coordinating_only: 0}
- gte: { nodes.count.voting_only: 0}

View File

@ -0,0 +1,22 @@
evaluationDependsOn(xpackModule('core'))
apply plugin: 'elasticsearch.esplugin'
esplugin {
name 'x-pack-voting-only-node'
description 'Elasticsearch Expanded Pack Plugin - Voting-only node'
classname 'org.elasticsearch.cluster.coordination.VotingOnlyNodePlugin'
extendedPlugins = ['x-pack-core']
}
dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
if (isEclipse) {
testCompile project(path: xpackModule('core-tests'), configuration: 'testArtifacts')
}
}
// xpack modules are installed in real clusters as the meta plugin, so
// installing them as individual plugins for integ tests doesn't make sense,
// so we disable integ tests
integTest.enabled = false

View File

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.votingonly.VotingOnlyNodeFeatureSetUsage;
import java.util.Map;
public class VotingOnlyNodeFeatureSet implements XPackFeatureSet {
private final XPackLicenseState licenseState;
@Inject
public VotingOnlyNodeFeatureSet(@Nullable XPackLicenseState licenseState) {
this.licenseState = licenseState;
}
@Override
public String name() {
return XPackField.VOTING_ONLY;
}
@Override
public boolean available() {
return licenseState != null && licenseState.isVotingOnlyAllowed();
}
@Override
public boolean enabled() {
return true;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
listener.onResponse(new VotingOnlyNodeFeatureSetUsage(licenseState.isVotingOnlyAllowed()));
}
}

View File

@ -0,0 +1,212 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
public class VotingOnlyNodePlugin extends Plugin implements DiscoveryPlugin, NetworkPlugin, ActionPlugin {
public static final Setting<Boolean> VOTING_ONLY_NODE_SETTING
= Setting.boolSetting("node.voting_only", false, Setting.Property.NodeScope);
private static final String VOTING_ONLY_ELECTION_STRATEGY = "supports_voting_only";
static DiscoveryNodeRole VOTING_ONLY_NODE_ROLE = new DiscoveryNodeRole("voting_only", "v") {
@Override
protected Setting<Boolean> roleSetting() {
return VOTING_ONLY_NODE_SETTING;
}
};
private final Settings settings;
private final SetOnce<ThreadPool> threadPool;
private final boolean isVotingOnlyNode;
private final boolean transportClientMode;
public VotingOnlyNodePlugin(Settings settings) {
this.settings = settings;
threadPool = new SetOnce<>();
isVotingOnlyNode = VOTING_ONLY_NODE_SETTING.get(settings);
this.transportClientMode = XPackPlugin.transportClientMode(settings);
}
public static boolean isVotingOnlyNode(DiscoveryNode discoveryNode) {
return discoveryNode.getRoles().contains(VOTING_ONLY_NODE_ROLE);
}
public static boolean isFullMasterNode(DiscoveryNode discoveryNode) {
return discoveryNode.isMasterNode() && discoveryNode.getRoles().contains(VOTING_ONLY_NODE_ROLE) == false;
}
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(VOTING_ONLY_NODE_SETTING);
}
@Override
public Set<DiscoveryNodeRole> getRoles() {
if (isVotingOnlyNode && Node.NODE_MASTER_SETTING.get(settings) == false) {
throw new IllegalStateException("voting-only node must be master-eligible");
}
return Collections.singleton(VOTING_ONLY_NODE_ROLE);
}
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
this.threadPool.set(threadPool);
return Collections.emptyList();
}
@Override
public Collection<Module> createGuiceModules() {
if (transportClientMode) {
return Collections.emptyList();
}
return Collections.singleton(b -> XPackPlugin.bindFeatureSet(b, VotingOnlyNodeFeatureSet.class));
}
@Override
public Map<String, ElectionStrategy> getElectionStrategies() {
return Collections.singletonMap(VOTING_ONLY_ELECTION_STRATEGY, new VotingOnlyNodeElectionStrategy());
}
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
if (isVotingOnlyNode) {
return Collections.singletonList(new TransportInterceptor() {
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new VotingOnlyNodeAsyncSender(sender, threadPool::get);
}
});
} else {
return Collections.emptyList();
}
}
@Override
public Settings additionalSettings() {
return Settings.builder().put(DiscoveryModule.ELECTION_STRATEGY_SETTING.getKey(), VOTING_ONLY_ELECTION_STRATEGY).build();
}
static class VotingOnlyNodeElectionStrategy extends ElectionStrategy {
@Override
public boolean satisfiesAdditionalQuorumConstraints(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm,
long localAcceptedVersion, VotingConfiguration lastCommittedConfiguration,
VotingConfiguration lastAcceptedConfiguration, VoteCollection joinVotes) {
// if local node is voting only, have additional checks on election quorum definition
if (isVotingOnlyNode(localNode)) {
// if all votes are from voting only nodes, do not elect as master (no need to transfer state)
if (joinVotes.nodes().stream().filter(DiscoveryNode::isMasterNode).allMatch(VotingOnlyNodePlugin::isVotingOnlyNode)) {
return false;
}
// if there's a vote from a full master node with same last accepted term and version, that node should become master
// instead, so we should stand down
if (joinVotes.getJoins().stream().anyMatch(join -> isFullMasterNode(join.getSourceNode()) &&
join.getLastAcceptedTerm() == localAcceptedTerm &&
join.getLastAcceptedVersion() == localAcceptedVersion)) {
return false;
}
}
return true;
}
}
static class VotingOnlyNodeAsyncSender implements TransportInterceptor.AsyncSender {
private final TransportInterceptor.AsyncSender sender;
private final Supplier<ThreadPool> threadPoolSupplier;
VotingOnlyNodeAsyncSender(TransportInterceptor.AsyncSender sender, Supplier<ThreadPool> threadPoolSupplier) {
this.sender = sender;
this.threadPoolSupplier = threadPoolSupplier;
}
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (action.equals(PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME)) {
final DiscoveryNode destinationNode = connection.getNode();
if (isFullMasterNode(destinationNode)) {
sender.sendRequest(connection, action, request, options, new TransportResponseHandler<T>() {
@Override
public void handleResponse(TransportResponse response) {
handler.handleException(new TransportException(new ElasticsearchException(
"ignoring successful publish response used purely for state transfer: " + response)));
}
@Override
public void handleException(TransportException exp) {
handler.handleException(exp);
}
@Override
public String executor() {
return handler.executor();
}
@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
});
} else {
threadPoolSupplier.get().generic().execute(() -> handler.handleException(new TransportException(
new ElasticsearchException("voting-only node skipping publication to " + destinationNode))));
}
} else {
sender.sendRequest(connection, action, request, options, handler);
}
}
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import java.nio.file.Path;
public class LocalStateVotingOnlyNodePlugin extends LocalStateCompositeXPackPlugin {
public LocalStateVotingOnlyNodePlugin(final Settings settings, final Path configPath) throws Exception {
super(settings, configPath);
plugins.add(new VotingOnlyNodePlugin(settings));
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class VotingOnlyNodeCoordinationStateTests extends ESTestCase {
public void testSafety() {
new CoordinationStateTestCluster(IntStream.range(0, randomIntBetween(1, 5))
.mapToObj(i -> new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), Collections.emptyMap(),
randomBoolean() ? DiscoveryNodeRole.BUILT_IN_ROLES :
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE, DiscoveryNodeRole.MASTER_ROLE,
VotingOnlyNodePlugin.VOTING_ONLY_NODE_ROLE), Version.CURRENT))
.collect(Collectors.toList()), new VotingOnlyNodePlugin.VotingOnlyNodeElectionStrategy())
.runRandomly();
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import java.util.Collections;
import static java.util.Collections.emptySet;
public class VotingOnlyNodeCoordinatorTests extends AbstractCoordinatorTestCase {
public void testDoesNotElectVotingOnlyMasterNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5), false, Settings.EMPTY);
cluster.runRandomly();
cluster.stabilise();
final Cluster.ClusterNode leader = cluster.getAnyLeader();
assertTrue(leader.getLocalNode().isMasterNode());
assertFalse(VotingOnlyNodePlugin.isVotingOnlyNode(leader.getLocalNode()));
}
@Override
protected DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {
final TransportAddress address = buildNewFakeTransportAddress();
return new DiscoveryNode("", "node" + nodeIndex,
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
masterEligible ? DiscoveryNodeRole.BUILT_IN_ROLES :
randomBoolean() ? emptySet() : Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE,
DiscoveryNodeRole.MASTER_ROLE, VotingOnlyNodePlugin.VOTING_ONLY_NODE_ROLE), Version.CURRENT);
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import java.util.Collection;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoManageMasterNodes = false)
public class VotingOnlyNodePluginTests extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(LocalStateVotingOnlyNodePlugin.class);
}
public void testRequireVotingOnlyNodeToBeMasterEligible() {
internalCluster().setBootstrapMasterNodeIndex(0);
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> internalCluster().startNode(Settings.builder()
.put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(VotingOnlyNodePlugin.VOTING_ONLY_NODE_SETTING.getKey(), true)
.build()));
assertThat(ise.getMessage(), containsString("voting-only node must be master-eligible"));
}
public void testVotingOnlyNodeStats() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
internalCluster().startNodes(2);
internalCluster().startNode(Settings.builder().put(VotingOnlyNodePlugin.VOTING_ONLY_NODE_SETTING.getKey(), true));
assertBusy(() -> assertThat(client().admin().cluster().prepareState().get().getState().getLastCommittedConfiguration().getNodeIds(),
hasSize(3)));
assertThat(client().admin().cluster().prepareClusterStats().get().getNodesStats().getCounts().getRoles().get(
VotingOnlyNodePlugin.VOTING_ONLY_NODE_ROLE.roleName()).intValue(), equalTo(1));
assertThat(client().admin().cluster().prepareNodesStats("voting_only:true").get().getNodes(), hasSize(1));
assertThat(client().admin().cluster().prepareNodesStats("master:true", "voting_only:false").get().getNodes(), hasSize(2));
}
public void testPreferFullMasterOverVotingOnlyNodes() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
internalCluster().startNodes(2);
internalCluster().startNode(Settings.builder().put(VotingOnlyNodePlugin.VOTING_ONLY_NODE_SETTING.getKey(), true).build());
internalCluster().startDataOnlyNodes(randomInt(2));
assertBusy(() -> assertThat(
client().admin().cluster().prepareState().get().getState().getLastCommittedConfiguration().getNodeIds().size(),
equalTo(3)));
final String originalMaster = internalCluster().getMasterName();
internalCluster().stopCurrentMasterNode();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
assertNotEquals(originalMaster, internalCluster().getMasterName());
assertThat(
VotingOnlyNodePlugin.isVotingOnlyNode(client().admin().cluster().prepareState().get().getState().nodes().getMasterNode()),
equalTo(false));
}
public void testVotingOnlyNodesCannotBeMasterWithoutFullMasterNodes() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
internalCluster().startNode();
internalCluster().startNodes(2, Settings.builder().put(VotingOnlyNodePlugin.VOTING_ONLY_NODE_SETTING.getKey(), true).build());
internalCluster().startDataOnlyNodes(randomInt(2));
assertBusy(() -> assertThat(
client().admin().cluster().prepareState().get().getState().getLastCommittedConfiguration().getNodeIds().size(),
equalTo(3)));
final String oldMasterId = client().admin().cluster().prepareState().get().getState().nodes().getMasterNodeId();
internalCluster().stopCurrentMasterNode();
expectThrows(MasterNotDiscoveredException.class, () ->
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("100ms").execute().actionGet()
.getState().nodes().getMasterNodeId(), nullValue()));
// start a fresh full master node, which will be brought into the cluster as master by the voting-only nodes
final String newMaster = internalCluster().startNode();
assertEquals(newMaster, internalCluster().getMasterName());
final String newMasterId = client().admin().cluster().prepareState().get().getState().nodes().getMasterNodeId();
assertNotEquals(oldMasterId, newMasterId);
}
}