Step down as master when configured out of voting configuration (#37802)
Abdicates to another master-eligible node once the active master is reconfigured out of the voting configuration, for example through the use of voting configuration exclusions. Follow-up to #37712
This commit is contained in:
parent
827c4f6567
commit
504a89feaf
|
@ -72,7 +72,10 @@ The node that should be added to the exclusions list is specified using
|
||||||
<<cluster-nodes,node filters>> in place of `node_name` here. If a call to the
|
<<cluster-nodes,node filters>> in place of `node_name` here. If a call to the
|
||||||
voting configuration exclusions API fails, you can safely retry it. Only a
|
voting configuration exclusions API fails, you can safely retry it. Only a
|
||||||
successful response guarantees that the node has actually been removed from the
|
successful response guarantees that the node has actually been removed from the
|
||||||
voting configuration and will not be reinstated.
|
voting configuration and will not be reinstated. If it's the active master that
|
||||||
|
was removed from the voting configuration, then it will abdicate to another
|
||||||
|
master-eligible node that's still in the voting configuration, if such a node
|
||||||
|
is available.
|
||||||
|
|
||||||
Although the voting configuration exclusions API is most useful for down-scaling
|
Although the voting configuration exclusions API is most useful for down-scaling
|
||||||
a two-node to a one-node cluster, it is also possible to use it to remove
|
a two-node to a one-node cluster, it is also possible to use it to remove
|
||||||
|
|
|
@ -112,6 +112,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
|
|
||||||
private final PeerFinder peerFinder;
|
private final PeerFinder peerFinder;
|
||||||
private final PreVoteCollector preVoteCollector;
|
private final PreVoteCollector preVoteCollector;
|
||||||
|
private final Random random;
|
||||||
private final ElectionSchedulerFactory electionSchedulerFactory;
|
private final ElectionSchedulerFactory electionSchedulerFactory;
|
||||||
private final UnicastConfiguredHostsResolver configuredHostsResolver;
|
private final UnicastConfiguredHostsResolver configuredHostsResolver;
|
||||||
private final TimeValue publishTimeout;
|
private final TimeValue publishTimeout;
|
||||||
|
@ -153,6 +154,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
this.lastJoin = Optional.empty();
|
this.lastJoin = Optional.empty();
|
||||||
this.joinAccumulator = new InitialJoinAccumulator();
|
this.joinAccumulator = new InitialJoinAccumulator();
|
||||||
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
|
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
|
||||||
|
this.random = random;
|
||||||
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
|
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
|
||||||
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen);
|
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen);
|
||||||
configuredHostsResolver = new UnicastConfiguredHostsResolver(nodeName, settings, transportService, unicastHostsProvider);
|
configuredHostsResolver = new UnicastConfiguredHostsResolver(nodeName, settings, transportService, unicastHostsProvider);
|
||||||
|
@ -366,11 +368,33 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void abdicateTo(DiscoveryNode newMaster) {
|
||||||
|
assert Thread.holdsLock(mutex);
|
||||||
|
assert mode == Mode.LEADER : "expected to be leader on abdication but was " + mode;
|
||||||
|
assert newMaster.isMasterNode() : "should only abdicate to master-eligible node but was " + newMaster;
|
||||||
|
final StartJoinRequest startJoinRequest = new StartJoinRequest(newMaster, Math.max(getCurrentTerm(), maxTermSeen) + 1);
|
||||||
|
logger.info("abdicating to {} with term {}", newMaster, startJoinRequest.getTerm());
|
||||||
|
getLastAcceptedState().nodes().mastersFirstStream().forEach(node -> {
|
||||||
|
if (isZen1Node(node) == false) {
|
||||||
|
joinHelper.sendStartJoinRequest(startJoinRequest, node);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// handling of start join messages on the local node will be dispatched to the generic thread-pool
|
||||||
|
assert mode == Mode.LEADER : "should still be leader after sending abdication messages " + mode;
|
||||||
|
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
|
||||||
|
becomeCandidate("after abdicating to " + newMaster);
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) {
|
private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) {
|
||||||
final String localNodeId = lastAcceptedState.nodes().getLocalNodeId();
|
final DiscoveryNode localNode = lastAcceptedState.nodes().getLocalNode();
|
||||||
assert localNodeId != null;
|
assert localNode != null;
|
||||||
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId)
|
return electionQuorumContains(lastAcceptedState, localNode);
|
||||||
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId);
|
}
|
||||||
|
|
||||||
|
private static boolean electionQuorumContains(ClusterState lastAcceptedState, DiscoveryNode node) {
|
||||||
|
final String nodeId = node.getId();
|
||||||
|
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(nodeId)
|
||||||
|
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
|
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
|
||||||
|
@ -780,7 +804,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
.filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet());
|
.filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet());
|
||||||
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
|
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
|
||||||
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
|
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
|
||||||
clusterState.getLastAcceptedConfiguration());
|
getLocalNode(), clusterState.getLastAcceptedConfiguration());
|
||||||
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
|
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
|
||||||
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
|
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
|
||||||
return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
|
return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
|
||||||
|
@ -1192,8 +1216,19 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
updateMaxTermSeen(getCurrentTerm());
|
updateMaxTermSeen(getCurrentTerm());
|
||||||
|
|
||||||
if (mode == Mode.LEADER) {
|
if (mode == Mode.LEADER) {
|
||||||
|
final ClusterState state = getLastAcceptedState(); // committed state
|
||||||
|
if (electionQuorumContainsLocalNode(state) == false) {
|
||||||
|
final List<DiscoveryNode> masterCandidates = completedNodes().stream()
|
||||||
|
.filter(DiscoveryNode::isMasterNode)
|
||||||
|
.filter(node -> electionQuorumContains(state, node))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
if (masterCandidates.isEmpty() == false) {
|
||||||
|
abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size())));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
scheduleReconfigurationIfNeeded();
|
scheduleReconfigurationIfNeeded();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
|
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
|
||||||
}
|
}
|
||||||
ackListener.onNodeAck(getLocalNode(), null);
|
ackListener.onNodeAck(getLocalNode(), null);
|
||||||
|
|
|
@ -36,6 +36,7 @@ import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public abstract class Publication {
|
public abstract class Publication {
|
||||||
|
|
||||||
|
@ -92,6 +93,13 @@ public abstract class Publication {
|
||||||
onPossibleCompletion();
|
onPossibleCompletion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<DiscoveryNode> completedNodes() {
|
||||||
|
return publicationTargets.stream()
|
||||||
|
.filter(PublicationTarget::isSuccessfullyCompleted)
|
||||||
|
.map(PublicationTarget::getDiscoveryNode)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isCommitted() {
|
public boolean isCommitted() {
|
||||||
return applyCommitRequest.isPresent();
|
return applyCommitRequest.isPresent();
|
||||||
}
|
}
|
||||||
|
@ -268,6 +276,10 @@ public abstract class Publication {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DiscoveryNode getDiscoveryNode() {
|
||||||
|
return discoveryNode;
|
||||||
|
}
|
||||||
|
|
||||||
private void ackOnce(Exception e) {
|
private void ackOnce(Exception e) {
|
||||||
if (ackIsPending) {
|
if (ackIsPending) {
|
||||||
ackIsPending = false;
|
ackIsPending = false;
|
||||||
|
@ -280,6 +292,10 @@ public abstract class Publication {
|
||||||
&& state != PublicationTargetState.APPLIED_COMMIT;
|
&& state != PublicationTargetState.APPLIED_COMMIT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isSuccessfullyCompleted() {
|
||||||
|
return state == PublicationTargetState.APPLIED_COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
boolean isWaitingForQuorum() {
|
boolean isWaitingForQuorum() {
|
||||||
return state == PublicationTargetState.WAITING_FOR_QUORUM;
|
return state == PublicationTargetState.WAITING_FOR_QUORUM;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -90,18 +91,23 @@ public class Reconfigurator {
|
||||||
* @param retiredNodeIds Nodes that are leaving the cluster and which should not appear in the configuration if possible. Nodes that are
|
* @param retiredNodeIds Nodes that are leaving the cluster and which should not appear in the configuration if possible. Nodes that are
|
||||||
* retired and not in the current configuration will never appear in the resulting configuration; this is useful
|
* retired and not in the current configuration will never appear in the resulting configuration; this is useful
|
||||||
* for shifting the vote in a 2-node cluster so one of the nodes can be restarted without harming availability.
|
* for shifting the vote in a 2-node cluster so one of the nodes can be restarted without harming availability.
|
||||||
|
* @param currentMaster The current master. Unless retired, we prefer to keep the current master in the config.
|
||||||
* @param currentConfig The current configuration. As far as possible, we prefer to keep the current config as-is.
|
* @param currentConfig The current configuration. As far as possible, we prefer to keep the current config as-is.
|
||||||
* @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum.
|
* @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum.
|
||||||
*/
|
*/
|
||||||
public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String> retiredNodeIds, VotingConfiguration currentConfig) {
|
public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String> retiredNodeIds, DiscoveryNode currentMaster,
|
||||||
|
VotingConfiguration currentConfig) {
|
||||||
assert liveNodes.stream().noneMatch(Coordinator::isZen1Node) : liveNodes;
|
assert liveNodes.stream().noneMatch(Coordinator::isZen1Node) : liveNodes;
|
||||||
logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}", this, currentConfig, liveNodes, retiredNodeIds);
|
assert liveNodes.contains(currentMaster) : "liveNodes = " + liveNodes + " master = " + currentMaster;
|
||||||
|
logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}, currentMaster={}",
|
||||||
|
this, currentConfig, liveNodes, retiredNodeIds, currentMaster);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* There are three true/false properties of each node in play: live/non-live, retired/non-retired and in-config/not-in-config.
|
* There are three true/false properties of each node in play: live/non-live, retired/non-retired and in-config/not-in-config.
|
||||||
* Firstly we divide the nodes into disjoint sets based on these properties:
|
* Firstly we divide the nodes into disjoint sets based on these properties:
|
||||||
*
|
*
|
||||||
* - nonRetiredInConfigNotLiveIds
|
* - nonRetiredMaster
|
||||||
|
* - nonRetiredNotMasterInConfigNotLiveIds
|
||||||
* - nonRetiredInConfigLiveIds
|
* - nonRetiredInConfigLiveIds
|
||||||
* - nonRetiredLiveNotInConfigIds
|
* - nonRetiredLiveNotInConfigIds
|
||||||
*
|
*
|
||||||
|
@ -125,6 +131,17 @@ public class Reconfigurator {
|
||||||
final Set<String> nonRetiredInConfigLiveIds = new TreeSet<>(liveInConfigIds);
|
final Set<String> nonRetiredInConfigLiveIds = new TreeSet<>(liveInConfigIds);
|
||||||
nonRetiredInConfigLiveIds.removeAll(retiredNodeIds);
|
nonRetiredInConfigLiveIds.removeAll(retiredNodeIds);
|
||||||
|
|
||||||
|
final Set<String> nonRetiredInConfigLiveMasterIds;
|
||||||
|
final Set<String> nonRetiredInConfigLiveNotMasterIds;
|
||||||
|
if (nonRetiredInConfigLiveIds.contains(currentMaster.getId())) {
|
||||||
|
nonRetiredInConfigLiveNotMasterIds = new TreeSet<>(nonRetiredInConfigLiveIds);
|
||||||
|
nonRetiredInConfigLiveNotMasterIds.remove(currentMaster.getId());
|
||||||
|
nonRetiredInConfigLiveMasterIds = Collections.singleton(currentMaster.getId());
|
||||||
|
} else {
|
||||||
|
nonRetiredInConfigLiveNotMasterIds = nonRetiredInConfigLiveIds;
|
||||||
|
nonRetiredInConfigLiveMasterIds = Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
final Set<String> nonRetiredLiveNotInConfigIds = Sets.sortedDifference(liveNodeIds, currentConfig.getNodeIds());
|
final Set<String> nonRetiredLiveNotInConfigIds = Sets.sortedDifference(liveNodeIds, currentConfig.getNodeIds());
|
||||||
nonRetiredLiveNotInConfigIds.removeAll(retiredNodeIds);
|
nonRetiredLiveNotInConfigIds.removeAll(retiredNodeIds);
|
||||||
|
|
||||||
|
@ -151,9 +168,9 @@ public class Reconfigurator {
|
||||||
* The new configuration is formed by taking this many nodes in the following preference order:
|
* The new configuration is formed by taking this many nodes in the following preference order:
|
||||||
*/
|
*/
|
||||||
final VotingConfiguration newConfig = new VotingConfiguration(
|
final VotingConfiguration newConfig = new VotingConfiguration(
|
||||||
// live nodes first, preferring the current config, and if we need more then use non-live nodes
|
// live master first, then other live nodes, preferring the current config, and if we need more then use non-live nodes
|
||||||
Stream.of(nonRetiredInConfigLiveIds, nonRetiredLiveNotInConfigIds, nonRetiredInConfigNotLiveIds)
|
Stream.of(nonRetiredInConfigLiveMasterIds, nonRetiredInConfigLiveNotMasterIds, nonRetiredLiveNotInConfigIds,
|
||||||
.flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet()));
|
nonRetiredInConfigNotLiveIds).flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet()));
|
||||||
|
|
||||||
if (newConfig.hasQuorum(liveNodeIds)) {
|
if (newConfig.hasQuorum(liveNodeIds)) {
|
||||||
return newConfig;
|
return newConfig;
|
||||||
|
|
|
@ -31,12 +31,12 @@ import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||||
|
import org.elasticsearch.test.InternalTestCluster;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
|
|
||||||
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
@ -106,7 +106,7 @@ public class SpecificMasterNodesIT extends ESIntegTestCase {
|
||||||
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName));
|
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testElectOnlyBetweenMasterNodes() throws IOException, ExecutionException, InterruptedException {
|
public void testElectOnlyBetweenMasterNodes() throws Exception {
|
||||||
logger.info("--> start data node / non master node");
|
logger.info("--> start data node / non master node");
|
||||||
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true)
|
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true)
|
||||||
.put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s"));
|
.put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s"));
|
||||||
|
@ -138,7 +138,14 @@ public class SpecificMasterNodesIT extends ESIntegTestCase {
|
||||||
logger.info("--> closing master node (1)");
|
logger.info("--> closing master node (1)");
|
||||||
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
|
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
|
||||||
new AddVotingConfigExclusionsRequest(new String[]{masterNodeName})).get();
|
new AddVotingConfigExclusionsRequest(new String[]{masterNodeName})).get();
|
||||||
internalCluster().stopCurrentMasterNode();
|
// removing the master from the voting configuration immediately triggers the master to step down
|
||||||
|
assertBusy(() -> {
|
||||||
|
assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState()
|
||||||
|
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
|
||||||
|
assertThat(internalCluster().masterClient().admin().cluster().prepareState()
|
||||||
|
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
|
||||||
|
});
|
||||||
|
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodeName));
|
||||||
assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState()
|
assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState()
|
||||||
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
|
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
|
||||||
assertThat(internalCluster().masterClient().admin().cluster().prepareState()
|
assertThat(internalCluster().masterClient().admin().cluster().prepareState()
|
||||||
|
|
|
@ -66,6 +66,7 @@ import org.elasticsearch.test.disruption.DisruptableMockTransport;
|
||||||
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
|
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
|
import org.hamcrest.core.IsCollectionContaining;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
@ -1331,6 +1332,8 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration();
|
final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration();
|
||||||
assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration,
|
assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration,
|
||||||
lastCommittedConfiguration.hasQuorum(connectedNodeIds));
|
lastCommittedConfiguration.hasQuorum(connectedNodeIds));
|
||||||
|
assertThat("leader " + leader.getLocalNode() + " should be part of voting configuration " + lastCommittedConfiguration,
|
||||||
|
lastCommittedConfiguration.getNodeIds(), IsCollectionContaining.hasItem(leader.getLocalNode().getId()));
|
||||||
|
|
||||||
assertThat("no reconfiguration is in progress",
|
assertThat("no reconfiguration is in progress",
|
||||||
lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration()));
|
lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration()));
|
||||||
|
|
|
@ -56,6 +56,7 @@ import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
public class PublicationTests extends ESTestCase {
|
public class PublicationTests extends ESTestCase {
|
||||||
|
@ -178,6 +179,7 @@ public class PublicationTests extends ESTestCase {
|
||||||
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet());
|
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet());
|
||||||
|
|
||||||
assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes));
|
assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes));
|
||||||
|
assertThat(publication.completedNodes(), empty());
|
||||||
assertTrue(publication.pendingCommits.isEmpty());
|
assertTrue(publication.pendingCommits.isEmpty());
|
||||||
AtomicBoolean processedNode1PublishResponse = new AtomicBoolean();
|
AtomicBoolean processedNode1PublishResponse = new AtomicBoolean();
|
||||||
boolean delayProcessingNode2PublishResponse = randomBoolean();
|
boolean delayProcessingNode2PublishResponse = randomBoolean();
|
||||||
|
@ -232,10 +234,12 @@ public class PublicationTests extends ESTestCase {
|
||||||
|
|
||||||
assertFalse(publication.completed);
|
assertFalse(publication.completed);
|
||||||
assertFalse(publication.committed);
|
assertFalse(publication.committed);
|
||||||
|
assertThat(publication.completedNodes(), containsInAnyOrder(n1, n3));
|
||||||
publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE);
|
publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertTrue(publication.completed);
|
assertTrue(publication.completed);
|
||||||
|
assertThat(publication.completedNodes(), containsInAnyOrder(n1, n2, n3));
|
||||||
assertTrue(publication.committed);
|
assertTrue(publication.committed);
|
||||||
|
|
||||||
assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3));
|
assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3));
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -52,6 +53,7 @@ public class ReconfiguratorTests extends ESTestCase {
|
||||||
|
|
||||||
check(nodes("a"), conf("a"), true, conf("a"));
|
check(nodes("a"), conf("a"), true, conf("a"));
|
||||||
check(nodes("a", "b"), conf("a"), true, conf("a"));
|
check(nodes("a", "b"), conf("a"), true, conf("a"));
|
||||||
|
check(nodes("a", "b"), conf("b"), true, conf("b"));
|
||||||
check(nodes("a", "b"), conf("a", "c"), true, conf("a"));
|
check(nodes("a", "b"), conf("a", "c"), true, conf("a"));
|
||||||
check(nodes("a", "b"), conf("a", "b"), true, conf("a"));
|
check(nodes("a", "b"), conf("a", "b"), true, conf("a"));
|
||||||
check(nodes("a", "b"), conf("a", "b", "e"), true, conf("a", "b", "e"));
|
check(nodes("a", "b"), conf("a", "b", "e"), true, conf("a", "b", "e"));
|
||||||
|
@ -64,6 +66,7 @@ public class ReconfiguratorTests extends ESTestCase {
|
||||||
check(nodes("a", "b", "c", "d"), conf("a", "b", "e"), true, conf("a", "b", "c"));
|
check(nodes("a", "b", "c", "d"), conf("a", "b", "e"), true, conf("a", "b", "c"));
|
||||||
check(nodes("a", "b", "c", "d", "e"), conf("a", "f", "g"), true, conf("a", "b", "c", "d", "e"));
|
check(nodes("a", "b", "c", "d", "e"), conf("a", "f", "g"), true, conf("a", "b", "c", "d", "e"));
|
||||||
check(nodes("a", "b", "c", "d"), conf("a", "b", "c", "d", "e"), true, conf("a", "b", "c"));
|
check(nodes("a", "b", "c", "d"), conf("a", "b", "c", "d", "e"), true, conf("a", "b", "c"));
|
||||||
|
check(nodes("e", "a", "b", "c"), retired(), "e", conf("a", "b", "c", "d", "e"), true, conf("a", "b", "e"));
|
||||||
check(nodes("a", "b", "c"), conf("a", "b", "c", "d", "e"), true, conf("a", "b", "c"));
|
check(nodes("a", "b", "c"), conf("a", "b", "c", "d", "e"), true, conf("a", "b", "c"));
|
||||||
|
|
||||||
check(nodes("a"), conf("a"), false, conf("a"));
|
check(nodes("a"), conf("a"), false, conf("a"));
|
||||||
|
@ -124,7 +127,8 @@ public class ReconfiguratorTests extends ESTestCase {
|
||||||
|
|
||||||
final int quorumSize = Math.max(liveNodes.length / 2 + 1, initialVotingNodes.length < 3 ? 1 : 2);
|
final int quorumSize = Math.max(liveNodes.length / 2 + 1, initialVotingNodes.length < 3 ? 1 : 2);
|
||||||
|
|
||||||
final VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(), initialConfig);
|
final VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(),
|
||||||
|
randomFrom(liveNodesSet), initialConfig);
|
||||||
|
|
||||||
final String description = "reconfigure " + liveNodesSet + " from " + initialConfig + " yielded " + finalConfig;
|
final String description = "reconfigure " + liveNodesSet + " from " + initialConfig + " yielded " + finalConfig;
|
||||||
|
|
||||||
|
@ -152,7 +156,8 @@ public class ReconfiguratorTests extends ESTestCase {
|
||||||
|
|
||||||
final int quorumSize = Math.max(liveNodes.length, initialVotingNodes.length) / 2 + 1;
|
final int quorumSize = Math.max(liveNodes.length, initialVotingNodes.length) / 2 + 1;
|
||||||
|
|
||||||
final VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(), initialConfig);
|
final VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(), randomFrom(liveNodesSet),
|
||||||
|
initialConfig);
|
||||||
|
|
||||||
final String description = "reconfigure " + liveNodesSet + " from " + initialConfig + " yielded " + finalConfig;
|
final String description = "reconfigure " + liveNodesSet + " from " + initialConfig + " yielded " + finalConfig;
|
||||||
|
|
||||||
|
@ -187,13 +192,20 @@ public class ReconfiguratorTests extends ESTestCase {
|
||||||
|
|
||||||
private void check(Set<DiscoveryNode> liveNodes, Set<String> retired, VotingConfiguration config,
|
private void check(Set<DiscoveryNode> liveNodes, Set<String> retired, VotingConfiguration config,
|
||||||
boolean autoShrinkVotingConfiguration, VotingConfiguration expectedConfig) {
|
boolean autoShrinkVotingConfiguration, VotingConfiguration expectedConfig) {
|
||||||
|
final DiscoveryNode master = liveNodes.stream().sorted(Comparator.comparing(DiscoveryNode::getId)).findFirst().get();
|
||||||
|
check(liveNodes, retired, master.getId(), config, autoShrinkVotingConfiguration, expectedConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void check(Set<DiscoveryNode> liveNodes, Set<String> retired, String masterId, VotingConfiguration config,
|
||||||
|
boolean autoShrinkVotingConfiguration, VotingConfiguration expectedConfig) {
|
||||||
final Reconfigurator reconfigurator = makeReconfigurator(Settings.builder()
|
final Reconfigurator reconfigurator = makeReconfigurator(Settings.builder()
|
||||||
.put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), autoShrinkVotingConfiguration)
|
.put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), autoShrinkVotingConfiguration)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
final VotingConfiguration adaptedConfig = reconfigurator.reconfigure(liveNodes, retired, config);
|
final DiscoveryNode master = liveNodes.stream().filter(n -> n.getId().equals(masterId)).findFirst().get();
|
||||||
assertEquals(new ParameterizedMessage("[liveNodes={}, retired={}, config={}, autoShrinkVotingConfiguration={}]",
|
final VotingConfiguration adaptedConfig = reconfigurator.reconfigure(liveNodes, retired, master, config);
|
||||||
liveNodes, retired, config, autoShrinkVotingConfiguration).getFormattedMessage(),
|
assertEquals(new ParameterizedMessage("[liveNodes={}, retired={}, master={}, config={}, autoShrinkVotingConfiguration={}]",
|
||||||
|
liveNodes, retired, master, config, autoShrinkVotingConfiguration).getFormattedMessage(),
|
||||||
expectedConfig, adaptedConfig);
|
expectedConfig, adaptedConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,18 +218,24 @@ public class ReconfiguratorTests extends ESTestCase {
|
||||||
final Reconfigurator reconfigurator = new Reconfigurator(Settings.EMPTY, clusterSettings);
|
final Reconfigurator reconfigurator = new Reconfigurator(Settings.EMPTY, clusterSettings);
|
||||||
final VotingConfiguration initialConfig = conf("a", "b", "c", "d", "e");
|
final VotingConfiguration initialConfig = conf("a", "b", "c", "d", "e");
|
||||||
|
|
||||||
|
Set<DiscoveryNode> twoNodes = nodes("a", "b");
|
||||||
|
Set<DiscoveryNode> threeNodes = nodes("a", "b", "c");
|
||||||
|
|
||||||
// default is "true"
|
// default is "true"
|
||||||
assertThat(reconfigurator.reconfigure(nodes("a", "b"), retired(), initialConfig), equalTo(conf("a", "b", "c")));
|
assertThat(reconfigurator.reconfigure(twoNodes, retired(), randomFrom(twoNodes), initialConfig), equalTo(conf("a", "b", "c")));
|
||||||
|
|
||||||
// update to "false"
|
// update to "false"
|
||||||
clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "false").build());
|
clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "false").build());
|
||||||
assertThat(reconfigurator.reconfigure(nodes("a", "b"), retired(), initialConfig), sameInstance(initialConfig)); // no quorum
|
assertThat(reconfigurator.reconfigure(twoNodes, retired(), randomFrom(twoNodes), initialConfig),
|
||||||
assertThat(reconfigurator.reconfigure(nodes("a", "b", "c"), retired(), initialConfig), equalTo(conf("a", "b", "c", "d", "e")));
|
sameInstance(initialConfig)); // no quorum
|
||||||
assertThat(reconfigurator.reconfigure(nodes("a", "b", "c"), retired("d"), initialConfig), equalTo(conf("a", "b", "c", "e")));
|
assertThat(reconfigurator.reconfigure(threeNodes, retired(), randomFrom(threeNodes), initialConfig),
|
||||||
|
equalTo(conf("a", "b", "c", "d", "e")));
|
||||||
|
assertThat(reconfigurator.reconfigure(threeNodes, retired("d"), randomFrom(threeNodes), initialConfig),
|
||||||
|
equalTo(conf("a", "b", "c", "e")));
|
||||||
|
|
||||||
// explicitly set to "true"
|
// explicitly set to "true"
|
||||||
clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "true").build());
|
clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "true").build());
|
||||||
assertThat(reconfigurator.reconfigure(nodes("a", "b"), retired(), initialConfig), equalTo(conf("a", "b", "c")));
|
assertThat(reconfigurator.reconfigure(twoNodes, retired(), randomFrom(twoNodes), initialConfig), equalTo(conf("a", "b", "c")));
|
||||||
|
|
||||||
expectThrows(IllegalArgumentException.class, () ->
|
expectThrows(IllegalArgumentException.class, () ->
|
||||||
clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "blah").build()));
|
clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "blah").build()));
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.cluster.coordination;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
|
||||||
|
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
|
||||||
|
import org.elasticsearch.common.Priority;
|
||||||
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||||
|
public class VotingConfigurationIT extends ESIntegTestCase {
|
||||||
|
|
||||||
|
public void testAbdicateAfterVotingConfigExclusionAdded() throws ExecutionException, InterruptedException {
|
||||||
|
internalCluster().startNodes(2);
|
||||||
|
final String originalMaster = internalCluster().getMasterName();
|
||||||
|
|
||||||
|
logger.info("--> excluding master node {}", originalMaster);
|
||||||
|
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
|
||||||
|
new AddVotingConfigExclusionsRequest(new String[]{originalMaster})).get();
|
||||||
|
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
|
||||||
|
assertNotEquals(originalMaster, internalCluster().getMasterName());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue