Handle situation where only voting-only nodes are bootstrapped (#43628)
Adds support for the situation where only voting-only nodes are bootstrapped. In that case, they will still try to become elected and bring full master nodes into the cluster.
This commit is contained in:
parent
f39619d182
commit
6744344ef2
|
@ -257,7 +257,8 @@ public class CoordinationState {
|
|||
boolean added = joinVotes.addJoinVote(join);
|
||||
boolean prevElectionWon = electionWon;
|
||||
electionWon = isElectionQuorum(joinVotes);
|
||||
assert !prevElectionWon || electionWon; // we cannot go from won to not won
|
||||
assert !prevElectionWon || electionWon : // we cannot go from won to not won
|
||||
"locaNode= " + localNode + ", join=" + join + ", joinVotes=" + joinVotes;
|
||||
logger.debug("handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join,
|
||||
join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion());
|
||||
|
||||
|
|
|
@ -67,6 +67,8 @@ import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.disruption.DisruptableMockTransport;
|
||||
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportInterceptor;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.core.IsCollectionContaining;
|
||||
|
@ -822,7 +824,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
|
||||
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
|
||||
transportService = mockTransport.createTransportService(
|
||||
settings, deterministicTaskQueue.getThreadPool(this::onNode), NOOP_TRANSPORT_INTERCEPTOR,
|
||||
settings, deterministicTaskQueue.getThreadPool(this::onNode),
|
||||
getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
|
||||
a -> localNode, null, emptySet());
|
||||
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
|
||||
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
|
||||
|
@ -839,7 +842,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
|
||||
allocationService, masterService, this::getPersistedState,
|
||||
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {},
|
||||
ElectionStrategy.DEFAULT_INSTANCE);
|
||||
getElectionStrategy());
|
||||
masterService.setClusterStatePublisher(coordinator);
|
||||
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
|
||||
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);
|
||||
|
@ -1099,6 +1102,14 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
protected TransportInterceptor getTransportInterceptor(DiscoveryNode localNode, ThreadPool threadPool) {
|
||||
return NOOP_TRANSPORT_INTERCEPTOR;
|
||||
}
|
||||
|
||||
protected ElectionStrategy getElectionStrategy() {
|
||||
return ElectionStrategy.DEFAULT_INSTANCE;
|
||||
}
|
||||
|
||||
public static final String NODE_ID_LOG_CONTEXT_KEY = "nodeId";
|
||||
|
||||
protected static String getNodeIdForLogContext(DiscoveryNode node) {
|
||||
|
|
|
@ -46,6 +46,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class VotingOnlyNodePlugin extends Plugin implements DiscoveryPlugin, NetworkPlugin, ActionPlugin {
|
||||
|
@ -151,16 +152,35 @@ public class VotingOnlyNodePlugin extends Plugin implements DiscoveryPlugin, Net
|
|||
if (joinVotes.nodes().stream().filter(DiscoveryNode::isMasterNode).allMatch(VotingOnlyNodePlugin::isVotingOnlyNode)) {
|
||||
return false;
|
||||
}
|
||||
// if there's a vote from a full master node with same last accepted term and version, that node should become master
|
||||
// instead, so we should stand down
|
||||
if (joinVotes.getJoins().stream().anyMatch(join -> isFullMasterNode(join.getSourceNode()) &&
|
||||
join.getLastAcceptedTerm() == localAcceptedTerm &&
|
||||
join.getLastAcceptedVersion() == localAcceptedVersion)) {
|
||||
// if there's a vote from a full master node with same state (i.e. last accepted term and version match), then that node
|
||||
// should become master instead, so we should stand down. There are two exceptional cases, however:
|
||||
// 1) if we are in term 0. In that case, we allow electing the voting-only node to avoid poisonous situations where only
|
||||
// voting-only nodes are bootstrapped.
|
||||
// 2) if there is another full master node with an older state. In that case, we ensure that
|
||||
// satisfiesAdditionalQuorumConstraints cannot go from true to false when adding new joinVotes in the same election.
|
||||
// As voting-only nodes only broadcast the state to the full master nodes, eventually all of them will have caught up
|
||||
// and there should not be any remaining full master nodes with older state, effectively disabling election of
|
||||
// voting-only nodes.
|
||||
if (joinVotes.getJoins().stream().anyMatch(fullMasterWithSameState(localAcceptedTerm, localAcceptedVersion)) &&
|
||||
localAcceptedTerm > 0 &&
|
||||
joinVotes.getJoins().stream().noneMatch(fullMasterWithOlderState(localAcceptedTerm, localAcceptedVersion))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static Predicate<Join> fullMasterWithSameState(long localAcceptedTerm, long localAcceptedVersion) {
|
||||
return join -> isFullMasterNode(join.getSourceNode()) &&
|
||||
join.getLastAcceptedTerm() == localAcceptedTerm &&
|
||||
join.getLastAcceptedVersion() == localAcceptedVersion;
|
||||
}
|
||||
|
||||
private static Predicate<Join> fullMasterWithOlderState(long localAcceptedTerm, long localAcceptedVersion) {
|
||||
return join -> isFullMasterNode(join.getSourceNode()) &&
|
||||
(join.getLastAcceptedTerm() < localAcceptedTerm ||
|
||||
(join.getLastAcceptedTerm() == localAcceptedTerm && join.getLastAcceptedVersion() < localAcceptedVersion));
|
||||
}
|
||||
}
|
||||
|
||||
static class VotingOnlyNodeAsyncSender implements TransportInterceptor.AsyncSender {
|
||||
|
|
|
@ -12,6 +12,9 @@ import org.elasticsearch.common.UUIDs;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportInterceptor;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
|
@ -19,6 +22,31 @@ import static java.util.Collections.emptySet;
|
|||
|
||||
public class VotingOnlyNodeCoordinatorTests extends AbstractCoordinatorTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void setPossibleRolesWithVotingOnly() {
|
||||
DiscoveryNode.setPossibleRoles(
|
||||
Sets.union(DiscoveryNodeRole.BUILT_IN_ROLES, Sets.newHashSet(VotingOnlyNodePlugin.VOTING_ONLY_NODE_ROLE)));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportInterceptor getTransportInterceptor(DiscoveryNode localNode, ThreadPool threadPool) {
|
||||
if (VotingOnlyNodePlugin.isVotingOnlyNode(localNode)) {
|
||||
return new TransportInterceptor() {
|
||||
@Override
|
||||
public AsyncSender interceptSender(AsyncSender sender) {
|
||||
return new VotingOnlyNodePlugin.VotingOnlyNodeAsyncSender(sender, () -> threadPool);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return super.getTransportInterceptor(localNode, threadPool);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ElectionStrategy getElectionStrategy() {
|
||||
return new VotingOnlyNodePlugin.VotingOnlyNodeElectionStrategy();
|
||||
}
|
||||
|
||||
public void testDoesNotElectVotingOnlyMasterNode() {
|
||||
final Cluster cluster = new Cluster(randomIntBetween(1, 5), false, Settings.EMPTY);
|
||||
cluster.runRandomly();
|
||||
|
@ -26,7 +54,7 @@ public class VotingOnlyNodeCoordinatorTests extends AbstractCoordinatorTestCase
|
|||
|
||||
final Cluster.ClusterNode leader = cluster.getAnyLeader();
|
||||
assertTrue(leader.getLocalNode().isMasterNode());
|
||||
assertFalse(VotingOnlyNodePlugin.isVotingOnlyNode(leader.getLocalNode()));
|
||||
assertFalse(leader.getLocalNode().toString(), VotingOnlyNodePlugin.isVotingOnlyNode(leader.getLocalNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -68,6 +68,18 @@ public class VotingOnlyNodePluginTests extends ESIntegTestCase {
|
|||
equalTo(false));
|
||||
}
|
||||
|
||||
public void testBootstrapOnlyVotingOnlyNodes() throws Exception {
|
||||
internalCluster().setBootstrapMasterNodeIndex(0);
|
||||
internalCluster().startNodes(Settings.builder().put(VotingOnlyNodePlugin.VOTING_ONLY_NODE_SETTING.getKey(), true).build(),
|
||||
Settings.EMPTY, Settings.EMPTY);
|
||||
assertBusy(() -> assertThat(
|
||||
client().admin().cluster().prepareState().get().getState().getLastCommittedConfiguration().getNodeIds().size(),
|
||||
equalTo(3)));
|
||||
assertThat(
|
||||
VotingOnlyNodePlugin.isVotingOnlyNode(client().admin().cluster().prepareState().get().getState().nodes().getMasterNode()),
|
||||
equalTo(false));
|
||||
}
|
||||
|
||||
public void testVotingOnlyNodesCannotBeMasterWithoutFullMasterNodes() throws Exception {
|
||||
internalCluster().setBootstrapMasterNodeIndex(0);
|
||||
internalCluster().startNode();
|
||||
|
|
Loading…
Reference in New Issue