[Zen2] Randomized testing of CoordinationState (#32242)

Simulates a random run of a cluster with multiple CoordinationState instances (each representing
one node), passing messages back and forth, and asserting that the overall system satisfies a given
set of safety properties.

Follow-up to #32171
This commit is contained in:
Yannick Welsch 2018-08-07 16:37:55 +02:00 committed by GitHub
parent 785b6e824c
commit 22c367315a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 197 additions and 0 deletions

View File

@ -388,6 +388,18 @@ public class CoordinationState extends AbstractComponent {
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}
public void invariant() {
assert getLastAcceptedTerm() <= getCurrentTerm();
assert electionWon() == isElectionQuorum(joinVotes);
if (electionWon()) {
assert getLastPublishedVersion() >= getLastAcceptedVersion();
} else {
assert getLastPublishedVersion() == 0L;
}
assert electionWon() == false || startedJoinSinceLastReboot;
assert publishVotes.isEmpty() || electionWon();
}
/**
* Pluggable persistence layer for {@link CoordinationState}.
*

View File

@ -35,11 +35,18 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class CoordinationStateTests extends ESTestCase {
@ -752,6 +759,10 @@ public class CoordinationStateTests extends ESTestCase {
});
}
public void testSafety() {
new Cluster(randomIntBetween(1, 5)).runRandomly();
}
public static CoordinationState createCoordinationState(PersistedState storage, DiscoveryNode localNode) {
final Settings initialSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), localNode.getId()).build();
return new CoordinationState(initialSettings, localNode, storage);
@ -827,4 +838,178 @@ public class CoordinationStateTests extends ESTestCase {
return acceptedState;
}
}
static class ClusterNode {
final DiscoveryNode localNode;
final PersistedState persistedState;
CoordinationState state;
ClusterNode(DiscoveryNode localNode) {
this.localNode = localNode;
persistedState = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
state = new CoordinationState(Settings.EMPTY, localNode, persistedState);
}
void reboot() {
state = new CoordinationState(Settings.EMPTY, localNode, persistedState);
}
void setInitialState(VotingConfiguration initialConfig, long initialValue) {
final ClusterState.Builder builder = ClusterState.builder(state.getLastAcceptedState()).incrementVersion();
builder.lastAcceptedConfiguration(initialConfig);
builder.lastCommittedConfiguration(initialConfig);
state.setInitialState(setValue(builder.build(), initialValue));
}
}
static class Cluster {
final List<Message> messages;
final List<ClusterNode> clusterNodes;
final VotingConfiguration initialConfiguration;
final long initialValue;
Cluster(int numNodes) {
messages = new ArrayList<>();
clusterNodes = IntStream.range(0, numNodes)
.mapToObj(i -> new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), Version.CURRENT))
.map(ClusterNode::new)
.collect(Collectors.toList());
initialConfiguration = randomVotingConfig();
initialValue = randomLong();
}
static class Message {
final DiscoveryNode sourceNode;
final DiscoveryNode targetNode;
final Object payload;
Message(DiscoveryNode sourceNode, DiscoveryNode targetNode, Object payload) {
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.payload = payload;
}
}
void reply(Message m, Object payload) {
messages.add(new Message(m.targetNode, m.sourceNode, payload));
}
void broadcast(DiscoveryNode sourceNode, Object payload) {
messages.addAll(clusterNodes.stream().map(cn -> new Message(sourceNode, cn.localNode, payload)).collect(Collectors.toList()));
}
Optional<ClusterNode> getNode(DiscoveryNode node) {
return clusterNodes.stream().filter(cn -> cn.localNode.equals(node)).findFirst();
}
VotingConfiguration randomVotingConfig() {
return new VotingConfiguration(
randomSubsetOf(randomIntBetween(1, clusterNodes.size()), clusterNodes).stream()
.map(cn -> cn.localNode.getId()).collect(toSet()));
}
void applyMessage(Message message) {
final Optional<ClusterNode> maybeNode = getNode(message.targetNode);
if (maybeNode.isPresent() == false) {
throw new CoordinationStateRejectedException("node not available");
} else {
final Object payload = message.payload;
if (payload instanceof StartJoinRequest) {
reply(message, maybeNode.get().state.handleStartJoin((StartJoinRequest) payload));
} else if (payload instanceof Join) {
maybeNode.get().state.handleJoin((Join) payload);
} else if (payload instanceof PublishRequest) {
reply(message, maybeNode.get().state.handlePublishRequest((PublishRequest) payload));
} else if (payload instanceof PublishResponse) {
maybeNode.get().state.handlePublishResponse(message.sourceNode, (PublishResponse) payload)
.ifPresent(ac -> broadcast(message.targetNode, ac));
} else if (payload instanceof ApplyCommitRequest) {
maybeNode.get().state.handleCommit((ApplyCommitRequest) payload);
} else {
throw new AssertionError("unknown message type");
}
}
}
void runRandomly() {
final int iterations = 10000;
final long maxTerm = 4;
long nextTerm = 1;
for (int i = 0; i < iterations; i++) {
try {
if (rarely() && nextTerm < maxTerm) {
final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : nextTerm++;
final StartJoinRequest startJoinRequest = new StartJoinRequest(randomFrom(clusterNodes).localNode, term);
broadcast(startJoinRequest.getSourceNode(), startJoinRequest);
} else if (rarely()) {
randomFrom(clusterNodes).setInitialState(initialConfiguration, initialValue);
} else if (rarely() && rarely()) {
randomFrom(clusterNodes).reboot();
} else if (rarely()) {
final List<ClusterNode> masterNodes = clusterNodes.stream().filter(cn -> cn.state.electionWon())
.collect(Collectors.toList());
if (masterNodes.isEmpty() == false) {
final ClusterNode clusterNode = randomFrom(masterNodes);
final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : clusterNode.state.getCurrentTerm();
final long version = rarely() ? randomIntBetween(0, 5) : clusterNode.state.getLastPublishedVersion() + 1;
final VotingConfiguration acceptedConfig = rarely() ? randomVotingConfig() :
clusterNode.state.getLastAcceptedConfiguration();
final PublishRequest publishRequest = clusterNode.state.handleClientValue(
clusterState(term, version, clusterNode.localNode, clusterNode.state.getLastCommittedConfiguration(),
acceptedConfig, randomLong()));
broadcast(clusterNode.localNode, publishRequest);
}
} else if (messages.isEmpty() == false) {
applyMessage(randomFrom(messages));
}
// check node invariants after each iteration
clusterNodes.forEach(cn -> cn.state.invariant());
} catch (CoordinationStateRejectedException e) {
// ignore
}
}
// check system invariants. It's sufficient to do this at the end as these invariants are monotonic.
invariant();
}
void invariant() {
// one master per term
messages.stream().filter(m -> m.payload instanceof PublishRequest)
.collect(Collectors.groupingBy(m -> ((PublishRequest) m.payload).getAcceptedState().term()))
.forEach((term, publishMessages) -> {
Set<DiscoveryNode> mastersForTerm = publishMessages.stream().collect(Collectors.groupingBy(m -> m.sourceNode)).keySet();
assertThat("Multiple masters " + mastersForTerm + " for term " + term, mastersForTerm, hasSize(1));
});
// unique cluster state per (term, version) pair
messages.stream().filter(m -> m.payload instanceof PublishRequest)
.map(m -> ((PublishRequest) m.payload).getAcceptedState())
.collect(Collectors.groupingBy(ClusterState::term))
.forEach((term, clusterStates) -> {
clusterStates.stream().collect(Collectors.groupingBy(ClusterState::version))
.forEach((version, clusterStates1) -> {
Set<String> clusterStateUUIDsForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy(
ClusterState::stateUUID
)).keySet();
assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version,
clusterStateUUIDsForTermAndVersion, hasSize(1));
Set<Long> clusterStateValuesForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy(
CoordinationStateTests::value
)).keySet();
assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version,
clusterStateValuesForTermAndVersion, hasSize(1));
});
});
}
}
}