mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
[Zen2] Reconfigure cluster as its membership changes (#34592)
As master-eligible nodes join or leave the cluster we should give them votes or take them away, in order to maintain the optimal level of fault-tolerance in the system. #33924 introduced the `Reconfigurator` to calculate the optimal configuration of the cluster, and in this change we add the plumbing needed to actually perform the reconfigurations needed as the cluster grows or shrinks.
This commit is contained in:
parent
3de266e3cf
commit
bfd24fc030
@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterState.Builder;
|
||||
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
|
||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
|
||||
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
|
||||
@ -42,6 +43,7 @@ import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
@ -64,9 +66,13 @@ import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE;
|
||||
import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
||||
@ -104,6 +110,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
@Nullable
|
||||
private Releasable leaderCheckScheduler;
|
||||
private long maxTermSeen;
|
||||
private final Reconfigurator reconfigurator;
|
||||
|
||||
private Mode mode;
|
||||
private Optional<DiscoveryNode> lastKnownLeader;
|
||||
@ -111,9 +118,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
private JoinHelper.JoinAccumulator joinAccumulator;
|
||||
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
|
||||
|
||||
public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService,
|
||||
MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier,
|
||||
UnicastHostsProvider unicastHostsProvider, ClusterApplier clusterApplier, Random random) {
|
||||
public Coordinator(Settings settings, ClusterSettings clusterSettings, TransportService transportService,
|
||||
AllocationService allocationService, MasterService masterService,
|
||||
Supplier<CoordinationState.PersistedState> persistedStateSupplier, UnicastHostsProvider unicastHostsProvider,
|
||||
ClusterApplier clusterApplier, Random random) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
this.masterService = masterService;
|
||||
@ -136,6 +144,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
|
||||
this.clusterApplier = clusterApplier;
|
||||
masterService.setClusterStateSupplier(this::getStateForMasterService);
|
||||
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
|
||||
}
|
||||
|
||||
private Runnable getOnLeaderFailure() {
|
||||
@ -269,8 +278,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump",
|
||||
maxTermSeen, currentTerm);
|
||||
} else {
|
||||
ensureTermAtLeast(getLocalNode(), maxTermSeen);
|
||||
startElection();
|
||||
try {
|
||||
ensureTermAtLeast(getLocalNode(), maxTermSeen);
|
||||
startElection();
|
||||
} catch (Exception e) {
|
||||
logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e);
|
||||
becomeCandidate("updateMaxTermSeen");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -524,6 +538,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
assert lastPublishedNodes.equals(followersChecker.getKnownFollowers()) :
|
||||
lastPublishedNodes + " != " + followersChecker.getKnownFollowers();
|
||||
}
|
||||
|
||||
assert becomingMaster || activePublication ||
|
||||
coordinationState.get().getLastAcceptedConfiguration().equals(coordinationState.get().getLastCommittedConfiguration())
|
||||
: coordinationState.get().getLastAcceptedConfiguration() + " != "
|
||||
+ coordinationState.get().getLastCommittedConfiguration();
|
||||
|
||||
} else if (mode == Mode.FOLLOWER) {
|
||||
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
|
||||
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
|
||||
@ -582,6 +602,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
// automatically generate a UID for the metadata if we need to
|
||||
metaDataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool?
|
||||
metaDataBuilder.persistentSettings(Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(),
|
||||
(votingConfiguration.getNodeIds().size() - 1) / 2).build()); // TODO set this in bootstrapping tool?
|
||||
builder.metaData(metaDataBuilder);
|
||||
coordinationState.get().setInitialState(builder.build());
|
||||
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
|
||||
@ -589,6 +611,50 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
}
|
||||
}
|
||||
|
||||
// Package-private for testing
|
||||
ClusterState improveConfiguration(ClusterState clusterState) {
|
||||
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
|
||||
|
||||
final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
|
||||
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
|
||||
final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure(
|
||||
liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration());
|
||||
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
|
||||
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
|
||||
return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build();
|
||||
}
|
||||
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
private AtomicBoolean reconfigurationTaskScheduled = new AtomicBoolean();
|
||||
|
||||
private void scheduleReconfigurationIfNeeded() {
|
||||
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
|
||||
assert mode == Mode.LEADER : mode;
|
||||
assert currentPublication.isPresent() == false : "Expected no publication in progress";
|
||||
|
||||
final ClusterState state = getLastAcceptedState();
|
||||
if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) {
|
||||
logger.trace("scheduling reconfiguration");
|
||||
masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
reconfigurationTaskScheduled.set(false);
|
||||
synchronized (mutex) {
|
||||
return improveConfiguration(currentState);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
reconfigurationTaskScheduled.set(false);
|
||||
logger.debug("reconfiguration failed", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// for tests
|
||||
boolean hasJoinVoteFrom(DiscoveryNode localNode) {
|
||||
return coordinationState.get().containsJoinVoteFor(localNode);
|
||||
@ -599,12 +665,15 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);
|
||||
|
||||
if (coordinationState.get().electionWon()) {
|
||||
// if we have already won the election then the actual join does not matter for election purposes,
|
||||
// so swallow any exception
|
||||
try {
|
||||
coordinationState.get().handleJoin(join);
|
||||
} catch (CoordinationStateRejectedException e) {
|
||||
logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e);
|
||||
// If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
|
||||
final boolean isNewJoin = handleJoinIgnoringExceptions(join);
|
||||
|
||||
// If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn,
|
||||
// schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
|
||||
// race against the election-winning publication and log a big error message, which we can prevent by checking this here:
|
||||
final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
|
||||
if (isNewJoin && establishedAsMaster && publicationInProgress() == false) {
|
||||
scheduleReconfigurationIfNeeded();
|
||||
}
|
||||
} else {
|
||||
coordinationState.get().handleJoin(join); // this might fail and bubble up the exception
|
||||
@ -612,6 +681,18 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true iff the join was from a new node and was successfully added
|
||||
*/
|
||||
private boolean handleJoinIgnoringExceptions(Join join) {
|
||||
try {
|
||||
return coordinationState.get().handleJoin(join);
|
||||
} catch (CoordinationStateRejectedException e) {
|
||||
logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public ClusterState getLastAcceptedState() {
|
||||
synchronized (mutex) {
|
||||
return coordinationState.get().getLastAcceptedState();
|
||||
@ -904,6 +985,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
logger.debug("publication ended successfully: {}", CoordinatorPublication.this);
|
||||
// trigger term bump if new term was found during publication
|
||||
updateMaxTermSeen(getCurrentTerm());
|
||||
|
||||
if (mode == Mode.LEADER) {
|
||||
scheduleReconfigurationIfNeeded();
|
||||
}
|
||||
}
|
||||
ackListener.onNodeAck(getLocalNode(), null);
|
||||
publishListener.onResponse(null);
|
||||
@ -916,8 +1001,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
|
||||
removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");
|
||||
|
||||
FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException(
|
||||
"publication failed", e);
|
||||
final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e);
|
||||
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
|
||||
publishListener.onFailure(exception);
|
||||
}
|
||||
|
@ -52,6 +52,7 @@ public class Reconfigurator extends AbstractComponent {
|
||||
Setting.intSetting("cluster.master_nodes_failure_tolerance", 0, 0, Property.NodeScope, Property.Dynamic);
|
||||
// the default is not supposed to be important since we expect to set this setting explicitly at bootstrapping time
|
||||
// TODO contemplate setting the default to something larger than 0 (1? 1<<30?)
|
||||
// TODO prevent this being set as a transient or a per-node setting?
|
||||
|
||||
private volatile int masterNodesFailureTolerance;
|
||||
|
||||
|
@ -28,7 +28,9 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ESAllocationTestCase;
|
||||
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
|
||||
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
|
||||
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
@ -83,6 +85,7 @@ import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_C
|
||||
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
|
||||
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
|
||||
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE;
|
||||
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
|
||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
|
||||
@ -91,10 +94,12 @@ import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
|
||||
@ -130,20 +135,258 @@ public class CoordinatorTests extends ESTestCase {
|
||||
cluster.stabilise();
|
||||
|
||||
final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm();
|
||||
final int newNodesCount = randomIntBetween(1, 2);
|
||||
cluster.addNodes(newNodesCount);
|
||||
cluster.stabilise(
|
||||
// The first pinging discovers the master
|
||||
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
|
||||
// One message delay to send a join
|
||||
+ DEFAULT_DELAY_VARIABILITY
|
||||
// Commit a new cluster state with the new node(s). Might be split into multiple commits
|
||||
+ newNodesCount * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||
cluster.addNodesAndStabilise(randomIntBetween(1, 2));
|
||||
|
||||
final long newTerm = cluster.getAnyLeader().coordinator.getCurrentTerm();
|
||||
assertEquals(currentTerm, newTerm);
|
||||
}
|
||||
|
||||
public void testExpandsConfigurationWhenGrowingFromOneToThreeNodesAndShrinksOnFailure() {
|
||||
final Cluster cluster = new Cluster(1);
|
||||
cluster.runRandomly();
|
||||
cluster.stabilise();
|
||||
|
||||
final ClusterNode leader = cluster.getAnyLeader();
|
||||
|
||||
assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(0));
|
||||
|
||||
cluster.addNodesAndStabilise(2);
|
||||
|
||||
{
|
||||
assertThat(leader.coordinator.getMode(), is(Mode.LEADER));
|
||||
final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(),
|
||||
equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet())));
|
||||
}
|
||||
|
||||
final ClusterNode disconnect1 = cluster.getAnyNode();
|
||||
logger.info("--> disconnecting {}", disconnect1);
|
||||
disconnect1.disconnect();
|
||||
cluster.stabilise();
|
||||
|
||||
{
|
||||
final ClusterNode newLeader = cluster.getAnyLeader();
|
||||
final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1));
|
||||
assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testExpandsConfigurationWhenGrowingFromThreeToFiveNodesAndShrinksOnFailure() {
|
||||
final Cluster cluster = new Cluster(3);
|
||||
cluster.runRandomly();
|
||||
cluster.stabilise();
|
||||
|
||||
final ClusterNode leader = cluster.getAnyLeader();
|
||||
|
||||
logger.info("setting fault tolerance to 1");
|
||||
leader.submitSetMasterNodesFailureTolerance(1);
|
||||
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||
assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1));
|
||||
|
||||
cluster.addNodesAndStabilise(2);
|
||||
|
||||
{
|
||||
assertThat(leader.coordinator.getMode(), is(Mode.LEADER));
|
||||
final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(),
|
||||
equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet())));
|
||||
}
|
||||
|
||||
final ClusterNode disconnect1 = cluster.getAnyNode();
|
||||
final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1);
|
||||
logger.info("--> disconnecting {} and {}", disconnect1, disconnect2);
|
||||
disconnect1.disconnect();
|
||||
disconnect2.disconnect();
|
||||
cluster.stabilise();
|
||||
|
||||
{
|
||||
final ClusterNode newLeader = cluster.getAnyLeader();
|
||||
final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3));
|
||||
assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId()));
|
||||
assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId()));
|
||||
}
|
||||
|
||||
// we still tolerate the loss of one more node here
|
||||
|
||||
final ClusterNode disconnect3 = cluster.getAnyNodeExcept(disconnect1, disconnect2);
|
||||
logger.info("--> disconnecting {}", disconnect3);
|
||||
disconnect3.disconnect();
|
||||
cluster.stabilise();
|
||||
|
||||
{
|
||||
final ClusterNode newLeader = cluster.getAnyLeader();
|
||||
final VotingConfiguration lastCommittedConfiguration = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3));
|
||||
assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId()));
|
||||
assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId()));
|
||||
assertTrue(lastCommittedConfiguration.getNodeIds().contains(disconnect3.getId()));
|
||||
}
|
||||
|
||||
// however we do not tolerate the loss of yet another one
|
||||
|
||||
final ClusterNode disconnect4 = cluster.getAnyNodeExcept(disconnect1, disconnect2, disconnect3);
|
||||
logger.info("--> disconnecting {}", disconnect4);
|
||||
disconnect4.disconnect();
|
||||
cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection");
|
||||
|
||||
for (final ClusterNode clusterNode : cluster.clusterNodes) {
|
||||
assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE));
|
||||
}
|
||||
|
||||
// moreover we are still stuck even if two other nodes heal
|
||||
logger.info("--> healing {} and {}", disconnect1, disconnect2);
|
||||
disconnect1.heal();
|
||||
disconnect2.heal();
|
||||
cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection");
|
||||
|
||||
for (final ClusterNode clusterNode : cluster.clusterNodes) {
|
||||
assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE));
|
||||
}
|
||||
|
||||
// we require another node to heal to recover
|
||||
final ClusterNode toHeal = randomBoolean() ? disconnect3 : disconnect4;
|
||||
logger.info("--> healing {}", toHeal);
|
||||
toHeal.heal();
|
||||
cluster.stabilise();
|
||||
}
|
||||
|
||||
public void testCanShrinkFromFiveNodesToThree() {
|
||||
final Cluster cluster = new Cluster(5);
|
||||
cluster.runRandomly();
|
||||
cluster.stabilise();
|
||||
|
||||
{
|
||||
final ClusterNode leader = cluster.getAnyLeader();
|
||||
logger.info("setting fault tolerance to 2");
|
||||
leader.submitSetMasterNodesFailureTolerance(2);
|
||||
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||
assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(2));
|
||||
}
|
||||
|
||||
final ClusterNode disconnect1 = cluster.getAnyNode();
|
||||
final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1);
|
||||
|
||||
logger.info("--> disconnecting {} and {}", disconnect1, disconnect2);
|
||||
disconnect1.disconnect();
|
||||
disconnect2.disconnect();
|
||||
cluster.stabilise();
|
||||
|
||||
final ClusterNode leader = cluster.getAnyLeader();
|
||||
|
||||
{
|
||||
final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(),
|
||||
equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet())));
|
||||
}
|
||||
|
||||
leader.submitSetMasterNodesFailureTolerance(1);
|
||||
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY * 2); // allow for a reconfiguration
|
||||
assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1));
|
||||
|
||||
{
|
||||
final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3));
|
||||
assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId()));
|
||||
assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect2.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testCanShrinkFromThreeNodesToTwo() {
|
||||
final Cluster cluster = new Cluster(3);
|
||||
cluster.runRandomly();
|
||||
cluster.stabilise();
|
||||
|
||||
{
|
||||
final ClusterNode leader = cluster.getAnyLeader();
|
||||
logger.info("setting fault tolerance to 1");
|
||||
leader.submitSetMasterNodesFailureTolerance(1);
|
||||
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||
assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(1));
|
||||
}
|
||||
|
||||
final ClusterNode disconnect1 = cluster.getAnyNode();
|
||||
|
||||
logger.info("--> disconnecting {}", disconnect1);
|
||||
disconnect1.disconnect();
|
||||
cluster.stabilise();
|
||||
|
||||
final ClusterNode leader = cluster.getAnyLeader();
|
||||
|
||||
{
|
||||
final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(),
|
||||
equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet())));
|
||||
}
|
||||
|
||||
leader.submitSetMasterNodesFailureTolerance(0);
|
||||
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY * 2); // allow for a reconfiguration
|
||||
assertThat(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(leader.getLastAppliedClusterState().metaData().settings()), equalTo(0));
|
||||
|
||||
{
|
||||
final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration();
|
||||
assertThat(lastCommittedConfiguration + " should be 1 node", lastCommittedConfiguration.getNodeIds().size(), equalTo(1));
|
||||
assertFalse(lastCommittedConfiguration.getNodeIds().contains(disconnect1.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDoesNotShrinkConfigurationDueToLossToleranceConfigurationWithThreeNodes() {
|
||||
final Cluster cluster = new Cluster(3);
|
||||
cluster.runRandomly();
|
||||
cluster.stabilise();
|
||||
|
||||
cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(1);
|
||||
cluster.stabilise(DEFAULT_ELECTION_DELAY);
|
||||
|
||||
final ClusterNode disconnect1 = cluster.getAnyNode();
|
||||
|
||||
logger.info("--> disconnecting {}", disconnect1);
|
||||
disconnect1.disconnect();
|
||||
cluster.stabilise();
|
||||
|
||||
final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1);
|
||||
logger.info("--> disconnecting {}", disconnect2);
|
||||
disconnect2.disconnect();
|
||||
cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection");
|
||||
|
||||
for (final ClusterNode clusterNode : cluster.clusterNodes) {
|
||||
assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE));
|
||||
}
|
||||
|
||||
disconnect1.heal();
|
||||
cluster.stabilise(); // would not work if disconnect1 were removed from the configuration
|
||||
}
|
||||
|
||||
public void testDoesNotShrinkConfigurationDueToLossToleranceConfigurationWithFiveNodes() {
|
||||
final Cluster cluster = new Cluster(5);
|
||||
cluster.runRandomly();
|
||||
cluster.stabilise();
|
||||
|
||||
cluster.getAnyLeader().submitSetMasterNodesFailureTolerance(2);
|
||||
cluster.stabilise(DEFAULT_ELECTION_DELAY);
|
||||
|
||||
final ClusterNode disconnect1 = cluster.getAnyNode();
|
||||
final ClusterNode disconnect2 = cluster.getAnyNodeExcept(disconnect1);
|
||||
|
||||
logger.info("--> disconnecting {} and {}", disconnect1, disconnect2);
|
||||
disconnect1.disconnect();
|
||||
disconnect2.disconnect();
|
||||
cluster.stabilise();
|
||||
|
||||
final ClusterNode disconnect3 = cluster.getAnyNodeExcept(disconnect1, disconnect2);
|
||||
logger.info("--> disconnecting {}", disconnect3);
|
||||
disconnect3.disconnect();
|
||||
cluster.runFor(DEFAULT_STABILISATION_TIME, "allowing time for fault detection");
|
||||
|
||||
for (final ClusterNode clusterNode : cluster.clusterNodes) {
|
||||
assertThat(clusterNode.getId() + " should be a candidate", clusterNode.coordinator.getMode(), equalTo(Mode.CANDIDATE));
|
||||
}
|
||||
|
||||
disconnect1.heal();
|
||||
cluster.stabilise(); // would not work if disconnect1 were removed from the configuration
|
||||
}
|
||||
|
||||
public void testLeaderDisconnectionDetectedQuickly() {
|
||||
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
|
||||
cluster.runRandomly();
|
||||
@ -162,9 +405,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||
// then wait for the exception response
|
||||
+ DEFAULT_DELAY_VARIABILITY
|
||||
// then wait for a new election
|
||||
+ DEFAULT_ELECTION_DELAY
|
||||
// then wait for the old leader's removal to be committed
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
|
||||
+ DEFAULT_ELECTION_DELAY,
|
||||
|
||||
// ALSO the leader may have just sent a follower check, which receives no response
|
||||
// TODO unnecessary if notified of disconnection
|
||||
@ -172,10 +413,14 @@ public class CoordinatorTests extends ESTestCase {
|
||||
// wait for the leader to check its followers
|
||||
+ defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING)
|
||||
// then wait for the exception response
|
||||
+ DEFAULT_DELAY_VARIABILITY
|
||||
// then wait for the removal to be committed
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
));
|
||||
+ DEFAULT_DELAY_VARIABILITY)
|
||||
|
||||
// FINALLY:
|
||||
|
||||
// wait for the removal to be committed
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
// then wait for the followup reconfiguration
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||
|
||||
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
|
||||
}
|
||||
@ -210,6 +455,8 @@ public class CoordinatorTests extends ESTestCase {
|
||||
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING))
|
||||
|
||||
// then wait for the new leader to commit a state without the old leader
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
// then wait for the followup reconfiguration
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
|
||||
|
||||
// ALSO wait for the leader to notice that its followers are unresponsive
|
||||
@ -241,6 +488,8 @@ public class CoordinatorTests extends ESTestCase {
|
||||
// then wait for the exception response
|
||||
+ DEFAULT_DELAY_VARIABILITY
|
||||
// then wait for the removal to be committed
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
// then wait for the followup reconfiguration
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
|
||||
|
||||
// ALSO the follower may have just sent a leader check, which receives no response
|
||||
@ -269,6 +518,8 @@ public class CoordinatorTests extends ESTestCase {
|
||||
(defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
|
||||
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
|
||||
// then wait for the leader to commit a state without the follower
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
// then wait for the followup reconfiguration
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
|
||||
|
||||
// ALSO wait for the follower to notice the leader is unresponsive
|
||||
@ -338,9 +589,13 @@ public class CoordinatorTests extends ESTestCase {
|
||||
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
|
||||
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
|
||||
|
||||
logger.info("--> blocking cluster state application on {}", follower0);
|
||||
follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG);
|
||||
|
||||
logger.info("--> publishing another value");
|
||||
AckCollector ackCollector = leader.submitValue(randomLong());
|
||||
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
|
||||
|
||||
assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
|
||||
assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
|
||||
cluster.stabilise();
|
||||
@ -426,14 +681,18 @@ public class CoordinatorTests extends ESTestCase {
|
||||
}
|
||||
|
||||
cluster.getAnyNode().applyInitialConfiguration();
|
||||
cluster.stabilise(defaultMillis(
|
||||
cluster.stabilise(
|
||||
// the first election should succeed, because only one node knows of the initial configuration and therefore can win a
|
||||
// pre-voting round and proceed to an election, so there cannot be any collisions
|
||||
ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately
|
||||
// Allow two round-trip for pre-voting and voting
|
||||
+ 4 * DEFAULT_DELAY_VARIABILITY
|
||||
// Then a commit of the new leader's first cluster state
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||
defaultMillis(ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately
|
||||
// Allow two round-trip for pre-voting and voting
|
||||
+ 4 * DEFAULT_DELAY_VARIABILITY
|
||||
// Then a commit of the new leader's first cluster state
|
||||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
// Then allow time for all the other nodes to join, each of which might cause a reconfiguration
|
||||
+ (cluster.size() - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
// TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great.
|
||||
);
|
||||
}
|
||||
|
||||
public void testCannotSetInitialConfigurationTwice() {
|
||||
@ -533,13 +792,18 @@ public class CoordinatorTests extends ESTestCase {
|
||||
Cluster(int initialNodeCount) {
|
||||
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
|
||||
|
||||
logger.info("--> creating cluster of {} nodes", initialNodeCount);
|
||||
assertThat(initialNodeCount, greaterThan(0));
|
||||
|
||||
Set<String> initialNodeIds = new HashSet<>(initialNodeCount);
|
||||
for (int i = 0; i < initialNodeCount; i++) {
|
||||
initialNodeIds.add(nodeIdFromIndex(i));
|
||||
final Set<String> initialConfigurationNodeIds = new HashSet<>(initialNodeCount);
|
||||
while (initialConfigurationNodeIds.isEmpty()) {
|
||||
for (int i = 0; i < initialNodeCount; i++) {
|
||||
if (randomBoolean()) {
|
||||
initialConfigurationNodeIds.add(nodeIdFromIndex(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
initialConfiguration = new VotingConfiguration(initialNodeIds);
|
||||
initialConfiguration = new VotingConfiguration(initialConfigurationNodeIds);
|
||||
logger.info("--> creating cluster of {} nodes with initial configuration {}", initialNodeCount, initialConfiguration);
|
||||
|
||||
clusterNodes = new ArrayList<>(initialNodeCount);
|
||||
for (int i = 0; i < initialNodeCount; i++) {
|
||||
@ -548,6 +812,19 @@ public class CoordinatorTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
void addNodesAndStabilise(int newNodesCount) {
|
||||
addNodes(newNodesCount);
|
||||
stabilise(
|
||||
// The first pinging discovers the master
|
||||
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
|
||||
// One message delay to send a join
|
||||
+ DEFAULT_DELAY_VARIABILITY
|
||||
// Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a
|
||||
// followup reconfiguration
|
||||
+ newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||
// TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great.
|
||||
}
|
||||
|
||||
void addNodes(int newNodesCount) {
|
||||
logger.info("--> adding {} nodes", newNodesCount);
|
||||
|
||||
@ -558,6 +835,10 @@ public class CoordinatorTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
int size() {
|
||||
return clusterNodes.size();
|
||||
}
|
||||
|
||||
void runRandomly() {
|
||||
|
||||
// TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it
|
||||
@ -579,7 +860,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||
if (randomSteps <= step && finishTime == -1) {
|
||||
finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime();
|
||||
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
|
||||
logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}]", step, finishTime);
|
||||
logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}ms]", step, finishTime);
|
||||
}
|
||||
|
||||
try {
|
||||
@ -591,6 +872,15 @@ public class CoordinatorTests extends ESTestCase {
|
||||
thisStep, newValue, clusterNode.getId());
|
||||
clusterNode.submitValue(newValue);
|
||||
}).run();
|
||||
} else if (rarely()) {
|
||||
final ClusterNode clusterNode = getAnyNodePreferringLeaders();
|
||||
final int masterNodeFailureTolerance = randomIntBetween(0, 2);
|
||||
onNode(clusterNode.getLocalNode(),
|
||||
() -> {
|
||||
logger.debug("----> [runRandomly {}] setting master-node fault tolerance to {} on {}",
|
||||
thisStep, masterNodeFailureTolerance, clusterNode.getId());
|
||||
clusterNode.submitSetMasterNodesFailureTolerance(masterNodeFailureTolerance);
|
||||
}).run();
|
||||
} else if (rarely()) {
|
||||
final ClusterNode clusterNode = getAnyNode();
|
||||
onNode(clusterNode.getLocalNode(), () -> {
|
||||
@ -692,16 +982,67 @@ public class CoordinatorTests extends ESTestCase {
|
||||
|
||||
runFor(stabilisationDurationMillis, "stabilising");
|
||||
fixLag();
|
||||
assertUniqueLeaderAndExpectedModes();
|
||||
|
||||
final ClusterNode leader = getAnyLeader();
|
||||
final long leaderTerm = leader.coordinator.getCurrentTerm();
|
||||
final Matcher<Long> isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion());
|
||||
final String leaderId = leader.getId();
|
||||
|
||||
assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId));
|
||||
assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);
|
||||
|
||||
for (final ClusterNode clusterNode : clusterNodes) {
|
||||
final String nodeId = clusterNode.getId();
|
||||
assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress());
|
||||
|
||||
if (clusterNode == leader) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isConnectedPair(leader, clusterNode)) {
|
||||
assertThat(nodeId + " is a follower of " + leaderId, clusterNode.coordinator.getMode(), is(FOLLOWER));
|
||||
assertThat(nodeId + " has the same term as " + leaderId, clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
|
||||
assertTrue(nodeId + " has voted for " + leaderId, leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode()));
|
||||
assertThat(nodeId + " has the same accepted state as " + leaderId,
|
||||
clusterNode.coordinator.getLastAcceptedState().getVersion(), isEqualToLeaderVersion);
|
||||
if (clusterNode.getClusterStateApplyResponse() == ClusterStateApplyResponse.SUCCEED) {
|
||||
assertThat(nodeId + " has the same applied state as " + leaderId,
|
||||
clusterNode.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);
|
||||
assertTrue(nodeId + " is in its own latest applied state",
|
||||
clusterNode.getLastAppliedClusterState().getNodes().nodeExists(nodeId));
|
||||
}
|
||||
assertTrue(nodeId + " is in the latest applied state on " + leaderId,
|
||||
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId));
|
||||
} else {
|
||||
assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE));
|
||||
assertFalse(nodeId + " is not in the applied state on " + leaderId,
|
||||
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId));
|
||||
}
|
||||
}
|
||||
|
||||
final Set<String> connectedNodeIds
|
||||
= clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).map(ClusterNode::getId).collect(Collectors.toSet());
|
||||
|
||||
assertThat(leader.getLastAppliedClusterState().getNodes().getSize(), equalTo(connectedNodeIds.size()));
|
||||
|
||||
final ClusterState lastAcceptedState = leader.coordinator.getLastAcceptedState();
|
||||
final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration();
|
||||
assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration,
|
||||
lastCommittedConfiguration.hasQuorum(connectedNodeIds));
|
||||
|
||||
assertThat("no reconfiguration is in progress",
|
||||
lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration()));
|
||||
assertThat("current configuration is already optimal",
|
||||
leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState));
|
||||
}
|
||||
|
||||
// TODO remove this when lag detection is implemented
|
||||
void fixLag() {
|
||||
final ClusterNode leader = getAnyLeader();
|
||||
final long leaderVersion = leader.coordinator.getApplierState().version();
|
||||
final long leaderVersion = leader.getLastAppliedClusterState().version();
|
||||
final long minVersion = clusterNodes.stream()
|
||||
.filter(n -> isConnectedPair(n, leader))
|
||||
.map(n -> n.coordinator.getApplierState().version()).min(Long::compare).orElse(Long.MIN_VALUE);
|
||||
.map(n -> n.getLastAppliedClusterState().version()).min(Long::compare).orElse(Long.MIN_VALUE);
|
||||
assert minVersion >= 0;
|
||||
if (minVersion < leaderVersion) {
|
||||
logger.info("--> fixLag publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion);
|
||||
@ -710,10 +1051,15 @@ public class CoordinatorTests extends ESTestCase {
|
||||
leader.submitValue(randomLong());
|
||||
}
|
||||
}).run();
|
||||
|
||||
runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY
|
||||
// may need to bump terms too
|
||||
+ DEFAULT_ELECTION_DELAY,
|
||||
// may need to bump terms too
|
||||
+ DEFAULT_ELECTION_DELAY,
|
||||
"re-stabilising after lag-fixing publication");
|
||||
|
||||
if (clusterNodes.stream().anyMatch(n -> n.getClusterStateApplyResponse().equals(ClusterStateApplyResponse.HANG))) {
|
||||
runFor(defaultMillis(PUBLISH_TIMEOUT_SETTING), "allowing lag-fixing publication to time out");
|
||||
}
|
||||
} else {
|
||||
logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion);
|
||||
}
|
||||
@ -755,42 +1101,6 @@ public class CoordinatorTests extends ESTestCase {
|
||||
&& getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED);
|
||||
}
|
||||
|
||||
private void assertUniqueLeaderAndExpectedModes() {
|
||||
final ClusterNode leader = getAnyLeader();
|
||||
final long leaderTerm = leader.coordinator.getCurrentTerm();
|
||||
Matcher<Long> isPresentAndEqualToLeaderVersion
|
||||
= equalTo(leader.coordinator.getLastAcceptedState().getVersion());
|
||||
|
||||
assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leader.getId()));
|
||||
assertThat(leader.getLastAppliedClusterState().getVersion(), isPresentAndEqualToLeaderVersion);
|
||||
|
||||
for (final ClusterNode clusterNode : clusterNodes) {
|
||||
final String nodeId = clusterNode.getId();
|
||||
assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress());
|
||||
|
||||
if (clusterNode == leader) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isConnectedPair(leader, clusterNode)) {
|
||||
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
|
||||
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
|
||||
assertTrue(nodeId + " has voted for the leader", leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode()));
|
||||
// TODO assert that this node's accepted and committed states are the same as the leader's
|
||||
|
||||
assertTrue(nodeId + " is in the leader's applied state",
|
||||
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId));
|
||||
} else {
|
||||
assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE));
|
||||
assertFalse(nodeId + " is not in the leader's applied state",
|
||||
leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId));
|
||||
}
|
||||
}
|
||||
|
||||
int connectedNodeCount = Math.toIntExact(clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).count());
|
||||
assertThat(leader.getLastAppliedClusterState().getNodes().getSize(), equalTo(connectedNodeCount));
|
||||
}
|
||||
|
||||
ClusterNode getAnyLeader() {
|
||||
List<ClusterNode> allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList());
|
||||
assertThat("leaders", allLeaders, not(empty()));
|
||||
@ -839,6 +1149,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||
private void possiblyFail(String description) {
|
||||
if (disruptStorage && rarely()) {
|
||||
// TODO revisit this when we've decided how PersistedState should throw exceptions
|
||||
logger.trace("simulating IO exception [{}]", description);
|
||||
if (randomBoolean()) {
|
||||
throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']'));
|
||||
} else {
|
||||
@ -938,8 +1249,9 @@ public class CoordinatorTests extends ESTestCase {
|
||||
transportService = mockTransport.createTransportService(
|
||||
settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR,
|
||||
a -> localNode, null, emptySet());
|
||||
coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY),
|
||||
masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get());
|
||||
coordinator = new Coordinator(settings, clusterSettings, transportService,
|
||||
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
|
||||
Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get());
|
||||
masterService.setClusterStatePublisher(coordinator);
|
||||
|
||||
transportService.start();
|
||||
@ -965,10 +1277,34 @@ public class CoordinatorTests extends ESTestCase {
|
||||
return coordinator.getMode() == LEADER;
|
||||
}
|
||||
|
||||
ClusterState improveConfiguration(ClusterState currentState) {
|
||||
synchronized (coordinator.mutex) {
|
||||
return coordinator.improveConfiguration(currentState);
|
||||
}
|
||||
}
|
||||
|
||||
void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyResponse) {
|
||||
this.clusterStateApplyResponse = clusterStateApplyResponse;
|
||||
}
|
||||
|
||||
ClusterStateApplyResponse getClusterStateApplyResponse() {
|
||||
return clusterStateApplyResponse;
|
||||
}
|
||||
|
||||
void submitSetMasterNodesFailureTolerance(final int masterNodesFaultTolerance) {
|
||||
submitUpdateTask("set master nodes failure tolerance [" + masterNodesFaultTolerance + "]", cs ->
|
||||
cs.getLastAcceptedConfiguration().getNodeIds().size() < 2 * masterNodesFaultTolerance + 1 ? cs :
|
||||
// TODO this rejects invalid updates, but in fact this should be validated elsewhere. Where?
|
||||
ClusterState.builder(cs).metaData(
|
||||
MetaData.builder(cs.metaData())
|
||||
.persistentSettings(Settings.builder()
|
||||
.put(cs.metaData().persistentSettings())
|
||||
.put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), masterNodesFaultTolerance)
|
||||
.build())
|
||||
.build())
|
||||
.build());
|
||||
}
|
||||
|
||||
AckCollector submitValue(final long value) {
|
||||
return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, value));
|
||||
}
|
||||
|
@ -156,11 +156,12 @@ public class NodeJoinTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
};
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, threadPool,
|
||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> initialState.nodes().getLocalNode(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), Collections.emptySet());
|
||||
coordinator = new Coordinator(Settings.EMPTY,
|
||||
clusterSettings, Collections.emptySet());
|
||||
coordinator = new Coordinator(Settings.EMPTY, clusterSettings,
|
||||
transportService,
|
||||
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
|
||||
masterService,
|
||||
|
@ -67,6 +67,7 @@ import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.coordination.Reconfigurator;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexGraveyard;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
@ -551,15 +552,17 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
||||
if (cluster() != null) {
|
||||
if (currentClusterScope != Scope.TEST) {
|
||||
MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData();
|
||||
final Set<String> persistent = metaData.persistentSettings().keySet();
|
||||
assertThat("test leaves persistent cluster metadata behind: " + persistent, persistent.size(), equalTo(0));
|
||||
final Set<String> transientSettings = new HashSet<>(metaData.transientSettings().keySet());
|
||||
|
||||
final Set<String> persistentKeys = new HashSet<>(metaData.persistentSettings().keySet());
|
||||
persistentKeys.remove(Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey());
|
||||
assertThat("test leaves persistent cluster metadata behind", persistentKeys, empty());
|
||||
|
||||
final Set<String> transientKeys = new HashSet<>(metaData.transientSettings().keySet());
|
||||
if (isInternalCluster() && internalCluster().getAutoManageMinMasterNode()) {
|
||||
// this is set by the test infra
|
||||
transientSettings.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey());
|
||||
transientKeys.remove(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey());
|
||||
}
|
||||
assertThat("test leaves transient cluster metadata behind: " + transientSettings,
|
||||
transientSettings, empty());
|
||||
assertThat("test leaves transient cluster metadata behind", transientKeys, empty());
|
||||
}
|
||||
ensureClusterSizeConsistency();
|
||||
ensureClusterStateConsistency();
|
||||
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.coordination.Reconfigurator;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -123,8 +124,11 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
||||
super.tearDown();
|
||||
assertAcked(client().admin().indices().prepareDelete("*").get());
|
||||
MetaData metaData = client().admin().cluster().prepareState().get().getState().getMetaData();
|
||||
assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().keySet(),
|
||||
metaData.persistentSettings().size(), equalTo(0));
|
||||
Settings.Builder unexpectedPersistentSettingsBuilder = Settings.builder().put(metaData.persistentSettings());
|
||||
unexpectedPersistentSettingsBuilder.remove(Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey());
|
||||
Settings unexpectedPersistentSettings = unexpectedPersistentSettingsBuilder.build();
|
||||
assertThat("test leaves persistent cluster metadata behind: " + unexpectedPersistentSettings.keySet(),
|
||||
unexpectedPersistentSettings.size(), equalTo(0));
|
||||
assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().keySet(),
|
||||
metaData.transientSettings().size(), equalTo(0));
|
||||
if (resetNodeAfterTest()) {
|
||||
|
@ -86,8 +86,8 @@ public class TestZenDiscovery extends ZenDiscovery {
|
||||
() -> new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
|
||||
.nodes(DiscoveryNodes.builder().add(transportService.getLocalNode())
|
||||
.localNodeId(transportService.getLocalNode().getId()).build()).build());
|
||||
return new Coordinator(fixedSettings, transportService, allocationService, masterService, persistedStateSupplier,
|
||||
hostsProvider, clusterApplier, new Random(Randomness.get().nextLong()));
|
||||
return new Coordinator(fixedSettings, clusterSettings, transportService, allocationService, masterService,
|
||||
persistedStateSupplier, hostsProvider, clusterApplier, new Random(Randomness.get().nextLong()));
|
||||
} else {
|
||||
return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService,
|
||||
clusterApplier, clusterSettings, hostsProvider, allocationService);
|
||||
|
Loading…
x
Reference in New Issue
Block a user