[Zen2] Elect freshest master in upgrade (#37122)

Today when electing a master in Zen2 we use the cluster state version to
determine whether a node has a fresh-enough cluster state to become master.
However the cluster state version is not a reliable measure of freshness in the
Zen1 world; furthermore in 6.x the cluster state version is not persisted. This
means that when upgrading from 6.x via a full cluster restart a cluster state
update may be lost if a stale master wins the initial election.

This change fixes this by using the metadata version as a measure of freshness
when in term 0, since this is persisted in 6.x and does more reliably indicate
the freshness of nodes.

It also makes changes parallel to elastic/elasticsearch-formal-models#40 to
support situations in which nodes accept cluster state versions in term 0: this
does not happen in a pure Zen2 cluster, but can happen in mixed clusters and
during upgrades.
This commit is contained in:
David Turner 2019-01-04 09:09:16 +00:00 committed by GitHub
parent 586453fef1
commit 3f7d6a989a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 151 additions and 105 deletions

View File

@ -69,6 +69,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
/**
* Represents the current state of the cluster.
* <p>
@ -210,6 +212,12 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
return version();
}
public long getVersionOrMetaDataVersion() {
// When following a Zen1 master, the cluster state version is not guaranteed to increase, so instead it is preferable to use the
// metadata version to determine the freshest node. However when following a Zen2 master the cluster state version should be used.
return term() == ZEN1_BWC_TERM ? metaData().version() : version();
}
/**
* This stateUUID is automatically generated for for each version of cluster state. It is used to make sure that
* we are applying diffs to the right previous state.

View File

@ -86,6 +86,10 @@ public class CoordinationState {
return getLastAcceptedState().version();
}
private long getLastAcceptedVersionOrMetaDataVersion() {
return getLastAcceptedState().getVersionOrMetaDataVersion();
}
public VotingConfiguration getLastCommittedConfiguration() {
return getLastAcceptedState().getLastCommittedConfiguration();
}
@ -126,27 +130,29 @@ public class CoordinationState {
/**
* Used to bootstrap a cluster by injecting the initial state and configuration.
*
* @param initialState The initial state to use. Must have term 0, version 1, and non-empty configurations.
* @param initialState The initial state to use. Must have term 0, version equal to the last-accepted version, and non-empty
* configurations.
* @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object.
*/
public void setInitialState(ClusterState initialState) {
final long lastAcceptedVersion = getLastAcceptedVersion();
if (lastAcceptedVersion != 0) {
logger.debug("setInitialState: rejecting since last-accepted version {} > 0", lastAcceptedVersion);
throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion);
final VotingConfiguration lastAcceptedConfiguration = getLastAcceptedConfiguration();
if (lastAcceptedConfiguration.isEmpty() == false) {
logger.debug("setInitialState: rejecting since last-accepted configuration is nonempty: {}", lastAcceptedConfiguration);
throw new CoordinationStateRejectedException(
"initial state already set: last-accepted configuration now " + lastAcceptedConfiguration);
}
assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm();
assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration();
assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration();
assert lastPublishedVersion == 0 : lastAcceptedVersion;
assert lastPublishedVersion == 0 : lastPublishedVersion;
assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration;
assert electionWon == false;
assert joinVotes.isEmpty() : joinVotes;
assert publishVotes.isEmpty() : publishVotes;
assert initialState.term() == 0 : initialState;
assert initialState.version() == 1 : initialState;
assert initialState.term() == 0 : initialState + " should have term 0";
assert initialState.version() == getLastAcceptedVersion() : initialState + " should have version " + getLastAcceptedVersion();
assert initialState.getLastAcceptedConfiguration().isEmpty() == false;
assert initialState.getLastCommittedConfiguration().isEmpty() == false;
@ -191,7 +197,8 @@ public class CoordinationState {
joinVotes = new VoteCollection();
publishVotes = new VoteCollection();
return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(), getLastAcceptedVersion());
return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(),
getLastAcceptedVersionOrMetaDataVersion());
}
/**
@ -224,20 +231,22 @@ public class CoordinationState {
" of join higher than current last accepted term " + lastAcceptedTerm);
}
if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersion()) {
logger.debug("handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}])",
getLastAcceptedVersion(), join.getLastAcceptedVersion());
if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersionOrMetaDataVersion()) {
logger.debug(
"handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}]) in term {}",
getLastAcceptedVersionOrMetaDataVersion(), join.getLastAcceptedVersion(), lastAcceptedTerm);
throw new CoordinationStateRejectedException("incoming last accepted version " + join.getLastAcceptedVersion() +
" of join higher than current last accepted version " + getLastAcceptedVersion());
" of join higher than current last accepted version " + getLastAcceptedVersionOrMetaDataVersion()
+ " in term " + lastAcceptedTerm);
}
if (getLastAcceptedVersion() == 0) {
if (getLastAcceptedConfiguration().isEmpty()) {
// We do not check for an election won on setting the initial configuration, so it would be possible to end up in a state where
// we have enough join votes to have won the election immediately on setting the initial configuration. It'd be quite
// complicated to restore all the appropriate invariants when setting the initial configuration (it's not just electionWon)
// so instead we just reject join votes received prior to receiving the initial configuration.
logger.debug("handleJoin: ignored join because initial configuration not set");
throw new CoordinationStateRejectedException("initial configuration not set");
logger.debug("handleJoin: rejecting join since this node has not received its initial configuration yet");
throw new CoordinationStateRejectedException("rejecting join since this node has not received its initial configuration yet");
}
boolean added = joinVotes.addVote(join.getSourceNode());

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -168,7 +167,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService,
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade);
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
@ -497,7 +496,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private PreVoteResponse getPreVoteResponse() {
return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(),
coordinationState.get().getLastAcceptedVersion());
coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion());
}
// package-visible for testing
@ -705,7 +704,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
logger.info("setting initial configuration to {}", votingConfiguration);
final Builder builder = masterService.incrementVersion(currentState);
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(currentState.coordinationMetaData())
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
@ -715,57 +713,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
// automatically generate a UID for the metadata if we need to
metaDataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool?
metaDataBuilder.coordinationMetaData(coordinationMetaData);
builder.metaData(metaDataBuilder);
coordinationState.get().setInitialState(builder.build());
coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build());
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler();
return true;
}
}
private void unsafelySetConfigurationForUpgrade(VotingConfiguration votingConfiguration) {
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this method once unsafe upgrades are no longer needed";
synchronized (mutex) {
if (mode != Mode.CANDIDATE) {
throw new IllegalStateException("Cannot overwrite configuration in mode " + mode);
}
if (isInitialConfigurationSet()) {
throw new IllegalStateException("Cannot overwrite configuration: configuration is already set to "
+ getLastAcceptedState().getLastAcceptedConfiguration());
}
if (lastKnownLeader.map(Coordinator::isZen1Node).orElse(false) == false) {
throw new IllegalStateException("Cannot upgrade from last-known leader: " + lastKnownLeader);
}
if (getCurrentTerm() != ZEN1_BWC_TERM) {
throw new IllegalStateException("Cannot upgrade, term is " + getCurrentTerm());
}
logger.info("automatically bootstrapping during rolling upgrade, using initial configuration {}", votingConfiguration);
final ClusterState currentState = getStateForMasterService();
final Builder builder = masterService.incrementVersion(currentState);
builder.metaData(MetaData.builder(currentState.metaData()).coordinationMetaData(
CoordinationMetaData.builder(currentState.metaData().coordinationMetaData())
.term(1)
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
.build()));
final ClusterState newClusterState = builder.build();
coordinationState.get().handleStartJoin(new StartJoinRequest(getLocalNode(), newClusterState.term()));
coordinationState.get().handlePublishRequest(new PublishRequest(newClusterState));
followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
peerFinder.deactivate(getLocalNode());
peerFinder.activate(newClusterState.nodes());
}
}
// Package-private for testing
ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";

View File

@ -182,7 +182,7 @@ public class PreVoteCollector {
if (response.getLastAcceptedTerm() > clusterState.term()
|| (response.getLastAcceptedTerm() == clusterState.term()
&& response.getLastAcceptedVersion() > clusterState.version())) {
&& response.getLastAcceptedVersion() > clusterState.getVersionOrMetaDataVersion())) {
logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
return;
}

View File

@ -94,7 +94,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testSetInitialState() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId())));
assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId())));
cs1.setInitialState(state1);
@ -103,7 +103,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testSetInitialStateWhenAlreadySet() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId())));
assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId())));
cs1.setInitialState(state1);
@ -129,7 +129,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testStartJoinAfterBootstrap() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
assertTrue(state1.getLastAcceptedConfiguration().hasQuorum(Collections.singleton(node1.getId())));
assertTrue(state1.getLastCommittedConfiguration().hasQuorum(Collections.singleton(node1.getId())));
cs1.setInitialState(state1);
@ -157,7 +157,7 @@ public class CoordinationStateTests extends ESTestCase {
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleJoin(v1)).getMessage(),
containsString("initial configuration not set"));
containsString("this node has not received its initial configuration yet"));
}
public void testJoinWithNoStartJoinAfterReboot() {
@ -178,7 +178,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testJoinWithBadCurrentTerm() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5));
@ -191,7 +191,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testJoinWithHigherAcceptedTerm() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5));
@ -209,7 +209,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testJoinWithSameAcceptedTermButHigherVersion() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5));
@ -227,7 +227,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testJoinWithLowerLastAcceptedTermWinsElection() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5));
@ -248,7 +248,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testJoinWithSameLastAcceptedTermButLowerOrSameVersionWinsElection() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5));
@ -269,7 +269,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testJoinDoesNotWinElection() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node2, randomLongBetween(1, 5));
@ -289,7 +289,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testJoinDoesNotWinElectionWhenOnlyCommittedConfigQuorum() {
VotingConfiguration configNode1 = new VotingConfiguration(Collections.singleton(node1.getId()));
VotingConfiguration configNode2 = new VotingConfiguration(Collections.singleton(node2.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, configNode1, configNode2, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, configNode1, configNode2, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest = new StartJoinRequest(node1, randomLongBetween(1, 5));
@ -303,7 +303,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testJoinDoesNotWinElectionWhenOnlyLastAcceptedConfigQuorum() {
VotingConfiguration configNode1 = new VotingConfiguration(Collections.singleton(node1.getId()));
VotingConfiguration configNode2 = new VotingConfiguration(Collections.singleton(node2.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, configNode2, configNode1, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, configNode2, configNode1, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest = new StartJoinRequest(node1, randomLongBetween(1, 5));
@ -316,7 +316,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleClientValue() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -339,7 +339,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleClientValueWhenElectionNotWon() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
if (randomBoolean()) {
cs1.setInitialState(state1);
}
@ -349,7 +349,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleClientValueDuringOngoingPublication() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -366,7 +366,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleClientValueWithBadTerm() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(3, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -383,21 +383,21 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleClientValueWithOldVersion() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
assertTrue(cs1.handleJoin(v1));
assertTrue(cs1.electionWon());
ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state2 = clusterState(startJoinRequest1.getTerm(), 0L, node1, initialConfig, initialConfig, 42L);
assertThat(expectThrows(CoordinationStateRejectedException.class, () -> cs1.handleClientValue(state2)).getMessage(),
containsString("lower or equal to last published version"));
}
public void testHandleClientValueWithDifferentReconfigurationWhileAlreadyReconfiguring() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -418,7 +418,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleClientValueWithSameReconfigurationWhileAlreadyReconfiguring() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -438,7 +438,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleClientValueWithIllegalCommittedConfigurationChange() {
assumeTrue("test only works with assertions enabled", Assertions.ENABLED);
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -455,7 +455,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleClientValueWithConfigurationChangeButNoJoinQuorum() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -470,7 +470,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishRequest() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -491,7 +491,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishRequestWithBadTerm() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -511,7 +511,7 @@ public class CoordinationStateTests extends ESTestCase {
// scenario when handling a publish request from a master that we already received a newer state from
public void testHandlePublishRequestWithSameTermButOlderOrSameVersion() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -544,7 +544,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishResponseWithCommit() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -562,7 +562,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishResponseWhenSteppedDownAsLeader() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -581,7 +581,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishResponseWithoutPublishConfigQuorum() {
VotingConfiguration configNode1 = new VotingConfiguration(Collections.singleton(node1.getId()));
VotingConfiguration configNode2 = new VotingConfiguration(Collections.singleton(node2.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, configNode1, configNode1, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, configNode1, configNode1, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -599,7 +599,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishResponseWithoutCommitedConfigQuorum() {
VotingConfiguration configNode1 = new VotingConfiguration(Collections.singleton(node1.getId()));
VotingConfiguration configNode2 = new VotingConfiguration(Collections.singleton(node2.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, configNode1, configNode1, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, configNode1, configNode1, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -616,7 +616,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishResponseWithoutCommit() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -631,7 +631,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishResponseWithBadTerm() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -650,7 +650,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandlePublishResponseWithVersionMismatch() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -665,7 +665,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleCommit() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -687,7 +687,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleCommitWithBadCurrentTerm() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -707,7 +707,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleCommitWithBadLastAcceptedTerm() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -720,7 +720,7 @@ public class CoordinationStateTests extends ESTestCase {
public void testHandleCommitWithBadVersion() {
VotingConfiguration initialConfig = new VotingConfiguration(Collections.singleton(node1.getId()));
ClusterState state1 = clusterState(0L, 1L, node1, initialConfig, initialConfig, 42L);
ClusterState state1 = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L);
cs1.setInitialState(state1);
StartJoinRequest startJoinRequest1 = new StartJoinRequest(node1, randomLongBetween(1, 5));
Join v1 = cs1.handleStartJoin(startJoinRequest1);
@ -830,7 +830,7 @@ public class CoordinationStateTests extends ESTestCase {
}
void setInitialState(VotingConfiguration initialConfig, long initialValue) {
final ClusterState.Builder builder = ClusterState.builder(state.getLastAcceptedState()).incrementVersion();
final ClusterState.Builder builder = ClusterState.builder(state.getLastAcceptedState());
builder.metaData(MetaData.builder()
.coordinationMetaData(CoordinationMetaData.builder()
.lastAcceptedConfiguration(initialConfig)

View File

@ -156,7 +156,7 @@ public class PublicationTests extends ESTestCase {
Function<DiscoveryNode, MockNode> nodeResolver = dn -> nodes.stream().filter(mn -> mn.localNode.equals(dn)).findFirst().get();
private void initializeCluster(VotingConfiguration initialConfig) {
node1.coordinationState.setInitialState(CoordinationStateTests.clusterState(0L, 1L, n1, initialConfig, initialConfig, 0L));
node1.coordinationState.setInitialState(CoordinationStateTests.clusterState(0L, 0L, n1, initialConfig, initialConfig, 0L));
StartJoinRequest startJoinRequest = new StartJoinRequest(n1, 1L);
node1.coordinationState.handleJoin(node1.coordinationState.handleStartJoin(startJoinRequest));
node1.coordinationState.handleJoin(node2.coordinationState.handleStartJoin(startJoinRequest));

View File

@ -24,22 +24,34 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExc
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Allocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class Zen1IT extends ESIntegTestCase {
@ -228,4 +240,66 @@ public class Zen1IT extends ESIntegTestCase {
public void testMultipleNodeMigrationFromZen1ToZen2WithThreeNodes() throws Exception {
testMultipleNodeMigrationFromZen1ToZen2(3);
}
public void testFreshestMasterElectedAfterFullClusterRestart() throws Exception {
final List<String> nodeNames = internalCluster().startNodes(3, ZEN1_SETTINGS);
assertTrue(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.ALL)).get().isAcknowledged());
final List<NodeEnvironment> nodeEnvironments
= StreamSupport.stream(internalCluster().getDataOrMasterNodeInstances(NodeEnvironment.class).spliterator(), false)
.collect(Collectors.toList());
final boolean randomiseVersions = rarely();
internalCluster().fullRestart(new RestartCallback() {
int nodesStopped;
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
nodesStopped += 1;
if (nodesStopped == 1) {
final Client client = internalCluster().client(randomValueOtherThan(nodeName, () -> randomFrom(nodeNames)));
assertFalse(client.admin().cluster().health(Requests.clusterHealthRequest()
.waitForEvents(Priority.LANGUID)
.waitForNoRelocatingShards(true)
.waitForNodes("2")).actionGet().isTimedOut());
assertTrue(client.admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.NONE)).get().isAcknowledged());
}
if (nodesStopped == nodeNames.size()) {
for (final NodeEnvironment nodeEnvironment : nodeEnvironments) {
// The versions written by nodes following a Zen1 master cannot be trusted. Randomise them to demonstrate they are
// not important.
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry());
final Manifest manifest = metaStateService.loadManifestOrEmpty();
assertThat(manifest.getCurrentTerm(), is(ZEN1_BWC_TERM));
final long newVersion = randomiseVersions ? randomNonNegativeLong() : 0L;
metaStateService.writeManifestAndCleanup("altering version to " + newVersion,
new Manifest(manifest.getCurrentTerm(), newVersion, manifest.getGlobalGeneration(),
manifest.getIndexGenerations()));
}
}
return Coordinator.addZen1Attribute(false, Settings.builder())
.put(ZEN2_SETTINGS)
.putList(INITIAL_MASTER_NODES_SETTING.getKey(), nodeNames)
.build();
}
});
assertFalse(client().admin().cluster().health(Requests.clusterHealthRequest()
.waitForEvents(Priority.LANGUID)
.waitForNoRelocatingShards(true)
.waitForNodes("3")).actionGet().isTimedOut());
assertThat(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.get(
client().admin().cluster().state(new ClusterStateRequest()).get().getState().metaData().settings()),
equalTo(Allocation.NONE));
}
}