Avoid counting votes from master-ineligible nodes (#43688)

Today if a master-eligible node is converted to a master-ineligible node it may
remain in the voting configuration, meaning that the master node may count its
publish responses as an indication that it has properly persisted the cluster
state. However master-ineligible nodes do not properly persist the cluster
state, so it is not safe to count these votes.

This change adjusts `CoordinationState` to take account of this from a safety
point of view, and also adjusts the `Coordinator` to prevent such nodes from
joining the cluster. Instead, it triggers a reconfiguration to remove from the
voting configuration a node that now appears to be master-ineligible before
processing its join.

Backport of #43688, see #44260.
This commit is contained in:
David Turner 2019-07-12 11:30:52 +01:00 committed by GitHub
parent 9e920f9612
commit 735c897ec6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 192 additions and 36 deletions

View File

@ -340,7 +340,7 @@ public class CoordinationMetaData implements Writeable, ToXContentFragment {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(nodeIds.toArray(new String[nodeIds.size()])); out.writeStringArray(nodeIds.toArray(new String[0]));
} }
public boolean hasQuorum(Collection<String> votes) { public boolean hasQuorum(Collection<String> votes) {

View File

@ -517,7 +517,7 @@ public class CoordinationState {
private final Set<Join> joins; private final Set<Join> joins;
public boolean addVote(DiscoveryNode sourceNode) { public boolean addVote(DiscoveryNode sourceNode) {
return nodes.put(sourceNode.getId(), sourceNode) == null; return sourceNode.isMasterNode() && nodes.put(sourceNode.getId(), sourceNode) == null;
} }
public boolean addJoinVote(Join join) { public boolean addJoinVote(Join join) {

View File

@ -881,11 +881,26 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
ClusterState improveConfiguration(ClusterState clusterState) { ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
// exclude any nodes whose ID is in the voting config exclusions list ...
final Stream<String> excludedNodeIds = clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId);
// ... and also automatically exclude the node IDs of master-ineligible nodes that were previously master-eligible and are still in
// the voting config. We could exclude all the master-ineligible nodes here, but there could be quite a few of them and that makes
// the logging much harder to follow.
final Stream<String> masterIneligibleNodeIdsInVotingConfig = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(n -> n.isMasterNode() == false
&& (clusterState.getLastAcceptedConfiguration().getNodeIds().contains(n.getId())
|| clusterState.getLastCommittedConfiguration().getNodeIds().contains(n.getId())))
.map(DiscoveryNode::getId);
final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet()); .filter(DiscoveryNode::isMasterNode)
.filter(coordinationState.get()::containsJoinVoteFor)
.filter(discoveryNode -> isZen1Node(discoveryNode) == false)
.collect(Collectors.toSet());
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes, final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()), Stream.concat(masterIneligibleNodeIdsInVotingConfig, excludedNodeIds).collect(Collectors.toSet()),
getLocalNode(), clusterState.getLastAcceptedConfiguration()); getLocalNode(), clusterState.getLastAcceptedConfiguration());
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig); assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData()) return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
@ -923,9 +938,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
} }
} }
// for tests // exposed for tests
boolean hasJoinVoteFrom(DiscoveryNode node) { boolean missingJoinVoteFrom(DiscoveryNode node) {
return coordinationState.get().containsJoinVoteFor(node); return node.isMasterNode() && coordinationState.get().containsJoinVoteFor(node) == false;
} }
private void handleJoin(Join join) { private void handleJoin(Join join) {
@ -934,13 +949,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
if (coordinationState.get().electionWon()) { if (coordinationState.get().electionWon()) {
// If we have already won the election then the actual join does not matter for election purposes, so swallow any exception // If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
final boolean isNewJoin = handleJoinIgnoringExceptions(join); final boolean isNewJoinFromMasterEligibleNode = handleJoinIgnoringExceptions(join);
// If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn, // If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn,
// schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the // schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
// race against the election-winning publication and log a big error message, which we can prevent by checking this here: // race against the election-winning publication and log a big error message, which we can prevent by checking this here:
final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm(); final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
if (isNewJoin && establishedAsMaster && publicationInProgress() == false) { if (isNewJoinFromMasterEligibleNode && establishedAsMaster && publicationInProgress() == false) {
scheduleReconfigurationIfNeeded(); scheduleReconfigurationIfNeeded();
} }
} else { } else {
@ -1382,7 +1397,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
} }
private void handleAssociatedJoin(Join join) { private void handleAssociatedJoin(Join join) {
if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) { if (join.getTerm() == getCurrentTerm() && missingJoinVoteFrom(join.getSourceNode())) {
logger.trace("handling {}", join); logger.trace("handling {}", join);
handleJoin(join); handleJoin(join);
} }
@ -1420,7 +1435,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
// The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote // The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote
// node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy // node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy
// of a join from this node then we assume the latter and bump our term to obtain a vote from this node. // of a join from this node then we assume the latter and bump our term to obtain a vote from this node.
if (hasJoinVoteFrom(discoveryNode) == false) { if (missingJoinVoteFrom(discoveryNode)) {
final long term = publishRequest.getAcceptedState().term(); final long term = publishRequest.getAcceptedState().term();
logger.debug("onMissingJoin: no join vote from {}, bumping term to exceed {}", discoveryNode, term); logger.debug("onMissingJoin: no join vote from {}, bumping term to exceed {}", discoveryNode, term);
updateMaxTermSeen(term + 1); updateMaxTermSeen(term + 1);

View File

@ -239,12 +239,18 @@ public abstract class Publication {
if (applyCommitRequest.isPresent()) { if (applyCommitRequest.isPresent()) {
sendApplyCommit(); sendApplyCommit();
} else { } else {
try {
Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> { Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> {
assert applyCommitRequest.isPresent() == false; assert applyCommitRequest.isPresent() == false;
applyCommitRequest = Optional.of(applyCommit); applyCommitRequest = Optional.of(applyCommit);
ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime)); ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime));
publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit); publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum)
.forEach(PublicationTarget::sendApplyCommit);
}); });
} catch (Exception e) {
setFailed(e);
onPossibleCommitFailure();
}
} }
} }

View File

@ -41,6 +41,8 @@ import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -734,6 +736,11 @@ public class CoordinationStateTests extends ESTestCase {
public void testVoteCollection() { public void testVoteCollection() {
final CoordinationState.VoteCollection voteCollection = new CoordinationState.VoteCollection(); final CoordinationState.VoteCollection voteCollection = new CoordinationState.VoteCollection();
assertTrue(voteCollection.isEmpty()); assertTrue(voteCollection.isEmpty());
assertFalse(voteCollection.addVote(
new DiscoveryNode("master-ineligible", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT)));
assertTrue(voteCollection.isEmpty());
voteCollection.addVote(node1); voteCollection.addVote(node1);
assertFalse(voteCollection.isEmpty()); assertFalse(voteCollection.isEmpty());
assertTrue(voteCollection.containsVoteFor(node1)); assertTrue(voteCollection.containsVoteFor(node1));

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.MockLogAppender;
import java.io.IOException; import java.io.IOException;
@ -1207,6 +1208,61 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
} }
} }
public void testReconfiguresToExcludeMasterIneligibleNodesInVotingConfig() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode chosenNode = cluster.getAnyNode();
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastCommittedConfiguration().getNodeIds(),
hasItem(chosenNode.getId()));
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastAcceptedConfiguration().getNodeIds(),
hasItem(chosenNode.getId()));
final boolean chosenNodeIsLeader = chosenNode == cluster.getAnyLeader();
final long termBeforeRestart = cluster.getAnyNode().coordinator.getCurrentTerm();
logger.info("--> restarting [{}] as a master-ineligible node", chosenNode);
chosenNode.close();
cluster.clusterNodes.replaceAll(cn -> cn == chosenNode ? cn.restartedNode(Function.identity(), Function.identity(),
Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build()) : cn);
cluster.stabilise();
if (chosenNodeIsLeader == false) {
assertThat("term did not change", cluster.getAnyNode().coordinator.getCurrentTerm(), is(termBeforeRestart));
}
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastCommittedConfiguration().getNodeIds(),
not(hasItem(chosenNode.getId())));
assertThat(cluster.getAnyLeader().getLastAppliedClusterState().getLastAcceptedConfiguration().getNodeIds(),
not(hasItem(chosenNode.getId())));
}
public void testDoesNotPerformElectionWhenRestartingFollower() {
final Cluster cluster = new Cluster(randomIntBetween(2, 5), false, Settings.EMPTY);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final long expectedTerm = leader.coordinator.getCurrentTerm();
if (cluster.clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).count() == 2) {
// in the 2-node case, auto-shrinking the voting configuration is required to reduce the voting configuration down to just the
// leader, otherwise restarting the other master-eligible node triggers an election
leader.submitSetAutoShrinkVotingConfiguration(true);
cluster.stabilise(2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); // first delay for the setting update, second for the reconfiguration
}
for (final ClusterNode clusterNode : cluster.getAllNodesExcept(leader)) {
logger.info("--> restarting {}", clusterNode);
clusterNode.close();
cluster.clusterNodes.replaceAll(cn ->
cn == clusterNode ? cn.restartedNode(Function.identity(), Function.identity(), Settings.EMPTY) : cn);
cluster.stabilise();
assertThat("term should not change", cluster.getAnyNode().coordinator.getCurrentTerm(), is(expectedTerm));
}
}
} }

View File

@ -472,12 +472,16 @@ public class NodeJoinTests extends ESTestCase {
} }
public void testConcurrentJoining() { public void testConcurrentJoining() {
List<DiscoveryNode> nodes = IntStream.rangeClosed(1, randomIntBetween(2, 5)) List<DiscoveryNode> masterNodes = IntStream.rangeClosed(1, randomIntBetween(2, 5))
.mapToObj(nodeId -> newNode(nodeId, true)).collect(Collectors.toList()); .mapToObj(nodeId -> newNode(nodeId, true)).collect(Collectors.toList());
List<DiscoveryNode> otherNodes = IntStream.rangeClosed(masterNodes.size() + 1, masterNodes.size() + 1 + randomIntBetween(0, 5))
.mapToObj(nodeId -> newNode(nodeId, false)).collect(Collectors.toList());
List<DiscoveryNode> allNodes = Stream.concat(masterNodes.stream(), otherNodes.stream()).collect(Collectors.toList());
DiscoveryNode localNode = nodes.get(0); DiscoveryNode localNode = masterNodes.get(0);
VotingConfiguration votingConfiguration = new VotingConfiguration(randomValueOtherThan(singletonList(localNode), VotingConfiguration votingConfiguration = new VotingConfiguration(randomValueOtherThan(singletonList(localNode),
() -> randomSubsetOf(randomIntBetween(1, nodes.size()), nodes)).stream().map(DiscoveryNode::getId).collect(Collectors.toSet())); () -> randomSubsetOf(randomIntBetween(1, masterNodes.size()), masterNodes)).stream()
.map(DiscoveryNode::getId).collect(Collectors.toSet()));
logger.info("Voting configuration: {}", votingConfiguration); logger.info("Voting configuration: {}", votingConfiguration);
@ -489,7 +493,7 @@ public class NodeJoinTests extends ESTestCase {
// we need at least a quorum of voting nodes with a correct term and worse state // we need at least a quorum of voting nodes with a correct term and worse state
List<DiscoveryNode> successfulNodes; List<DiscoveryNode> successfulNodes;
do { do {
successfulNodes = randomSubsetOf(nodes); successfulNodes = randomSubsetOf(allNodes);
} while (votingConfiguration.hasQuorum(successfulNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) } while (votingConfiguration.hasQuorum(successfulNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList()))
== false); == false);
@ -499,7 +503,7 @@ public class NodeJoinTests extends ESTestCase {
node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion)))) node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion))))
.collect(Collectors.toList()); .collect(Collectors.toList());
List<DiscoveryNode> possiblyUnsuccessfulNodes = new ArrayList<>(nodes); List<DiscoveryNode> possiblyUnsuccessfulNodes = new ArrayList<>(allNodes);
possiblyUnsuccessfulNodes.removeAll(successfulNodes); possiblyUnsuccessfulNodes.removeAll(successfulNodes);
logger.info("Possibly unsuccessful voting nodes: {}", possiblyUnsuccessfulNodes); logger.info("Possibly unsuccessful voting nodes: {}", possiblyUnsuccessfulNodes);
@ -572,8 +576,8 @@ public class NodeJoinTests extends ESTestCase {
assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster()); assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster());
for (DiscoveryNode successfulNode : successfulNodes) { for (DiscoveryNode successfulNode : successfulNodes) {
assertTrue(successfulNode.toString(), clusterStateHasNode(successfulNode)); assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode));
assertTrue(successfulNode.toString(), coordinator.hasJoinVoteFrom(successfulNode)); assertFalse(successfulNode + " voted for master", coordinator.missingJoinVoteFrom(successfulNode));
} }
} }

View File

@ -64,6 +64,7 @@ import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.gateway.MockGatewayMetaState;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus; import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
@ -99,6 +100,7 @@ import java.util.stream.Stream;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.DEFAULT_DELAY_VARIABILITY; import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.DEFAULT_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.CoordinationStateTestCluster.clusterState; import static org.elasticsearch.cluster.coordination.CoordinationStateTestCluster.clusterState;
@ -503,7 +505,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
if (isConnectedPair(leader, clusterNode)) { if (isConnectedPair(leader, clusterNode)) {
assertThat(nodeId + " is a follower of " + leaderId, clusterNode.coordinator.getMode(), is(FOLLOWER)); assertThat(nodeId + " is a follower of " + leaderId, clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " has the same term as " + leaderId, clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); assertThat(nodeId + " has the same term as " + leaderId, clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
assertTrue(nodeId + " has voted for " + leaderId, leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); assertFalse(nodeId + " is not a missing vote for " + leaderId,
leader.coordinator.missingJoinVoteFrom(clusterNode.getLocalNode()));
assertThat(nodeId + " has the same accepted state as " + leaderId, assertThat(nodeId + " has the same accepted state as " + leaderId,
clusterNode.coordinator.getLastAcceptedState().getVersion(), isEqualToLeaderVersion); clusterNode.coordinator.getLastAcceptedState().getVersion(), isEqualToLeaderVersion);
if (clusterNode.getClusterStateApplyResponse() == ClusterStateApplyResponse.SUCCEED) { if (clusterNode.getClusterStateApplyResponse() == ClusterStateApplyResponse.SUCCEED) {
@ -723,18 +726,59 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
nodeEnvironment = null; nodeEnvironment = null;
BytesStreamOutput outStream = new BytesStreamOutput(); BytesStreamOutput outStream = new BytesStreamOutput();
outStream.setVersion(Version.CURRENT); outStream.setVersion(Version.CURRENT);
final MetaData updatedMetaData = adaptGlobalMetaData.apply(oldState.getLastAcceptedState().metaData());
final ClusterState clusterState; final long persistedCurrentTerm;
if (updatedMetaData != oldState.getLastAcceptedState().metaData()) {
clusterState = ClusterState.builder(oldState.getLastAcceptedState()).metaData(updatedMetaData).build(); if ( // node is master-ineligible either before or after the restart ...
(oldState.getLastAcceptedState().nodes().getLocalNode().isMasterNode() && newLocalNode.isMasterNode()) == false
// ... and it's accepted some non-initial state so we can roll back ...
&& (oldState.getLastAcceptedState().term() > 0L || oldState.getLastAcceptedState().version() > 0L)
// ... and we're feeling lucky ...
&& randomBoolean()) {
// ... then we might not have reliably persisted the cluster state, so emulate a rollback
persistedCurrentTerm = randomLongBetween(0L, oldState.getCurrentTerm());
final long lastAcceptedTerm = oldState.getLastAcceptedState().term();
final long lastAcceptedVersion = oldState.getLastAcceptedState().version();
final long newLastAcceptedTerm;
final long newLastAcceptedVersion;
if (lastAcceptedVersion == 0L) {
newLastAcceptedTerm = randomLongBetween(0L, Math.min(persistedCurrentTerm, lastAcceptedTerm - 1));
newLastAcceptedVersion = randomNonNegativeLong();
} else { } else {
clusterState = oldState.getLastAcceptedState(); newLastAcceptedTerm = randomLongBetween(0L, Math.min(persistedCurrentTerm, lastAcceptedTerm));
newLastAcceptedVersion = randomLongBetween(0L,
newLastAcceptedTerm == lastAcceptedTerm ? lastAcceptedVersion - 1 : Long.MAX_VALUE);
} }
clusterState.writeTo(outStream); final VotingConfiguration newVotingConfiguration
= new VotingConfiguration(randomBoolean() ? emptySet() : singleton(randomAlphaOfLength(10)));
final long newValue = randomLong();
logger.trace("rolling back persisted cluster state on master-ineligible node [{}]: " +
"previously currentTerm={}, lastAcceptedTerm={}, lastAcceptedVersion={} " +
"but now currentTerm={}, lastAcceptedTerm={}, lastAcceptedVersion={}", newLocalNode,
oldState.getCurrentTerm(), lastAcceptedTerm, lastAcceptedVersion,
persistedCurrentTerm, newLastAcceptedTerm, newLastAcceptedVersion);
clusterState(newLastAcceptedTerm, newLastAcceptedVersion, newLocalNode, newVotingConfiguration,
newVotingConfiguration, newValue).writeTo(outStream);
} else {
persistedCurrentTerm = oldState.getCurrentTerm();
final MetaData updatedMetaData = adaptGlobalMetaData.apply(oldState.getLastAcceptedState().metaData());
if (updatedMetaData != oldState.getLastAcceptedState().metaData()) {
ClusterState.builder(oldState.getLastAcceptedState()).metaData(updatedMetaData).build().writeTo(outStream);
} else {
oldState.getLastAcceptedState().writeTo(outStream);
}
}
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
// adapt cluster state to new localNode instance and add blocks // adapt cluster state to new localNode instance and add blocks
delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()), delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(persistedCurrentTerm),
ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode))); ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode)));
} }
} catch (IOException e) { } catch (IOException e) {
@ -880,7 +924,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
final DiscoveryNode newLocalNode = new DiscoveryNode(localNode.getName(), localNode.getId(), final DiscoveryNode newLocalNode = new DiscoveryNode(localNode.getName(), localNode.getId(),
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(), address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
localNode.isMasterNode() ? DiscoveryNodeRole.BUILT_IN_ROLES : emptySet(), Version.CURRENT); localNode.isMasterNode() && Node.NODE_MASTER_SETTING.get(nodeSettings)
? DiscoveryNodeRole.BUILT_IN_ROLES : emptySet(), Version.CURRENT);
return new ClusterNode(nodeIndex, newLocalNode, return new ClusterNode(nodeIndex, newLocalNode,
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetaData, adaptCurrentTerm), nodeSettings); node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetaData, adaptCurrentTerm), nodeSettings);
} }

View File

@ -23,11 +23,13 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -36,6 +38,7 @@ import java.util.stream.Collectors;
import static com.carrotsearch.randomizedtesting.RandomizedTest.rarely; import static com.carrotsearch.randomizedtesting.RandomizedTest.rarely;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
import static org.apache.lucene.util.LuceneTestCase.random; import static org.apache.lucene.util.LuceneTestCase.random;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomIntBetween; import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.elasticsearch.test.ESTestCase.randomLong; import static org.elasticsearch.test.ESTestCase.randomLong;
@ -86,10 +89,10 @@ public class CoordinationStateTestCluster {
} }
static class ClusterNode { static class ClusterNode {
final DiscoveryNode localNode;
final CoordinationState.PersistedState persistedState;
private final ElectionStrategy electionStrategy; private final ElectionStrategy electionStrategy;
DiscoveryNode localNode;
CoordinationState.PersistedState persistedState;
CoordinationState state; CoordinationState state;
ClusterNode(DiscoveryNode localNode, ElectionStrategy electionStrategy) { ClusterNode(DiscoveryNode localNode, ElectionStrategy electionStrategy) {
@ -102,6 +105,26 @@ public class CoordinationStateTestCluster {
} }
void reboot() { void reboot() {
if (localNode.isMasterNode() == false && rarely()) {
// master-ineligible nodes can't be trusted to persist the cluster state properly
persistedState = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG,
CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG, 0L));
}
final Set<DiscoveryNodeRole> roles = new HashSet<>(localNode.getRoles());
if (randomBoolean()) {
if (roles.contains(DiscoveryNodeRole.MASTER_ROLE)) {
roles.remove(DiscoveryNodeRole.MASTER_ROLE);
} else {
roles.add(DiscoveryNodeRole.MASTER_ROLE);
}
}
localNode = new DiscoveryNode(localNode.getName(), localNode.getId(), UUIDs.randomBase64UUID(random()),
localNode.getHostName(), localNode.getHostAddress(), localNode.getAddress(), localNode.getAttributes(),
roles, localNode.getVersion());
state = new CoordinationState(localNode, persistedState, electionStrategy); state = new CoordinationState(localNode, persistedState, electionStrategy);
} }