Add low-level bootstrap implementation (#34345)

Today we inject the initial configuration of the cluster (i.e. the set of
voting nodes) at startup. In reality we must support injecting the initial
configuration after startup too. This commit adds low-level support for doing
so as safely as possible.
This commit is contained in:
David Turner 2018-10-08 15:56:48 +01:00 committed by GitHub
parent ac99d1d66d
commit 52a3a19551
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 164 additions and 36 deletions

View File

@ -132,17 +132,17 @@ public class CoordinationState extends AbstractComponent {
throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion);
}
assert getLastAcceptedTerm() == 0;
assert getLastAcceptedConfiguration().isEmpty();
assert getLastCommittedConfiguration().isEmpty();
assert lastPublishedVersion == 0;
assert lastPublishedConfiguration.isEmpty();
assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm();
assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration();
assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration();
assert lastPublishedVersion == 0 : lastAcceptedVersion;
assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration;
assert electionWon == false;
assert joinVotes.isEmpty();
assert publishVotes.isEmpty();
assert joinVotes.isEmpty() : joinVotes;
assert publishVotes.isEmpty() : publishVotes;
assert initialState.term() == 0;
assert initialState.version() == 1;
assert initialState.term() == 0 : initialState;
assert initialState.version() == 1 : initialState;
assert initialState.getLastAcceptedConfiguration().isEmpty() == false;
assert initialState.getLastCommittedConfiguration().isEmpty() == false;

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
@ -63,6 +65,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@ -480,6 +483,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id());
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
: preVoteCollector + " vs " + getPreVoteResponse();
if (mode == Mode.LEADER) {
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();
@ -493,7 +498,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingMaster && activePublication == false) {
@ -527,7 +531,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
@ -540,11 +543,42 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader() == null : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
}
}
}
public void setInitialConfiguration(final VotingConfiguration votingConfiguration) {
synchronized (mutex) {
final ClusterState currentState = getStateForMasterService();
if (currentState.getLastAcceptedConfiguration().isEmpty() == false) {
throw new CoordinationStateRejectedException("Cannot set initial configuration: configuration has already been set");
}
assert currentState.term() == 0 : currentState;
assert currentState.version() == 0 : currentState;
if (mode != Mode.CANDIDATE) {
throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode);
}
final List<DiscoveryNode> knownNodes = new ArrayList<>();
knownNodes.add(getLocalNode());
peerFinder.getFoundPeers().forEach(knownNodes::add);
if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) {
throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " +
"[knownNodes=" + knownNodes + ", " + votingConfiguration + "]");
}
logger.info("setting initial configuration to {}", votingConfiguration);
final Builder builder = masterService.incrementVersion(currentState);
builder.lastAcceptedConfiguration(votingConfiguration);
builder.lastCommittedConfiguration(votingConfiguration);
coordinationState.get().setInitialState(builder.build());
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler();
}
}
// for tests
boolean hasJoinVoteFrom(DiscoveryNode localNode) {
return coordinationState.get().containsJoinVoteFor(localNode);
@ -731,25 +765,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
if (foundQuorum) {
if (electionScheduler == null) {
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
}
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
}
}
@Override
public String toString() {
return "scheduling of new prevoting round";
}
});
startElectionScheduler();
}
} else {
closePrevotingAndElectionScheduler();
@ -759,6 +775,30 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
private void startElectionScheduler() {
assert electionScheduler == null : electionScheduler;
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
}
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
}
}
@Override
public String toString() {
return "scheduling of new prevoting round";
}
});
}
class CoordinatorPublication extends Publication {
private final PublishRequest publishRequest;

View File

@ -329,7 +329,7 @@ public class MasterService extends AbstractLifecycleComponent {
return newClusterState;
}
protected Builder incrementVersion(ClusterState clusterState) {
public Builder incrementVersion(ClusterState clusterState) {
return ClusterState.builder(clusterState).incrementVersion();
}

View File

@ -51,6 +51,7 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -84,12 +85,15 @@ import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERV
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
public class CoordinatorTests extends ESTestCase {
@ -404,6 +408,61 @@ public class CoordinatorTests extends ESTestCase {
// assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
}
public void testSettingInitialConfigurationTriggersElection() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase");
for (final ClusterNode clusterNode : cluster.clusterNodes) {
final String nodeId = clusterNode.getId();
assertThat(nodeId + " is CANDIDATE", clusterNode.coordinator.getMode(), is(CANDIDATE));
assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L));
assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L));
assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L));
assertTrue(nodeId + " has an empty last-accepted configuration",
clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty());
assertTrue(nodeId + " has an empty last-committed configuration",
clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty());
}
cluster.getAnyNode().applyInitialConfiguration();
cluster.stabilise(defaultMillis(
// the first election should succeed, because only one node knows of the initial configuration and therefore can win a
// pre-voting round and proceed to an election, so there cannot be any collisions
ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately
// Allow two round-trip for pre-voting and voting
+ 4 * DEFAULT_DELAY_VARIABILITY
// Then a commit of the new leader's first cluster state
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
}
public void testCannotSetInitialConfigurationTwice() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
cluster.stabilise();
final Coordinator coordinator = cluster.getAnyNode().coordinator;
final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration()));
assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set"));
}
public void testCannotSetInitialConfigurationWithoutQuorum() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
final Coordinator coordinator = cluster.getAnyNode().coordinator;
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node"));
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
assertThat(exceptionMessage,
startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=["));
assertThat(exceptionMessage,
endsWith("], VotingConfiguration{unknown-node}]"));
assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString()));
// This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum.
coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId())));
cluster.stabilise();
}
private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
@ -555,6 +614,14 @@ public class CoordinatorTests extends ESTestCase {
}
break;
}
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
onNode(clusterNode.getLocalNode(),
() -> {
logger.debug("----> [runRandomly {}] applying initial configuration {} to {}",
thisStep, initialConfiguration, clusterNode.getId());
clusterNode.coordinator.setInitialConfiguration(initialConfiguration);
}).run();
} else {
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
deterministicTaskQueue.advanceTime();
@ -566,7 +633,6 @@ public class CoordinatorTests extends ESTestCase {
// TODO other random steps:
// - reboot a node
// - abdicate leadership
// - bootstrap
} catch (CoordinationStateRejectedException ignored) {
// This is ok: it just means a message couldn't currently be handled.
@ -606,6 +672,17 @@ public class CoordinatorTests extends ESTestCase {
void stabilise(long stabilisationDurationMillis) {
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)",
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyNode();
bootstrapNode.applyInitialConfiguration();
} else {
logger.info("setting initial configuration not required");
}
runFor(stabilisationDurationMillis, "stabilising");
fixLag();
assertUniqueLeaderAndExpectedModes();
@ -706,7 +783,7 @@ public class CoordinatorTests extends ESTestCase {
ClusterNode getAnyLeader() {
List<ClusterNode> allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList());
assertThat(allLeaders, not(empty()));
assertThat("leaders", allLeaders, not(empty()));
return randomFrom(allLeaders);
}
@ -759,8 +836,8 @@ public class CoordinatorTests extends ESTestCase {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode();
persistedState = new InMemoryPersistedState(1L,
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L));
persistedState = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
onNode(localNode, this::setUp).run();
}
@ -917,6 +994,17 @@ public class CoordinatorTests extends ESTestCase {
return clusterApplier.lastAppliedClusterState;
}
void applyInitialConfiguration() {
onNode(localNode, () -> {
try {
coordinator.setInitialConfiguration(initialConfiguration);
logger.info("successfully set initial configuration to {}", initialConfiguration);
} catch (CoordinationStateRejectedException e) {
logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e);
}
}).run();
}
private class FakeClusterApplier implements ClusterApplier {
final ClusterName clusterName;