From 52a3a1955157c4179c743b8d8dd9cd8571982fdd Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 15:56:48 +0100 Subject: [PATCH] Add low-level bootstrap implementation (#34345) Today we inject the initial configuration of the cluster (i.e. the set of voting nodes) at startup. In reality we must support injecting the initial configuration after startup too. This commit adds low-level support for doing so as safely as possible. --- .../coordination/CoordinationState.java | 18 ++-- .../cluster/coordination/Coordinator.java | 84 +++++++++++----- .../cluster/service/MasterService.java | 2 +- .../coordination/CoordinatorTests.java | 96 ++++++++++++++++++- 4 files changed, 164 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index d5b9cdf6adf..fb7071e7080 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -132,17 +132,17 @@ public class CoordinationState extends AbstractComponent { throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion); } - assert getLastAcceptedTerm() == 0; - assert getLastAcceptedConfiguration().isEmpty(); - assert getLastCommittedConfiguration().isEmpty(); - assert lastPublishedVersion == 0; - assert lastPublishedConfiguration.isEmpty(); + assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm(); + assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration(); + assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration(); + assert lastPublishedVersion == 0 : lastAcceptedVersion; + assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration; assert electionWon == false; - assert joinVotes.isEmpty(); - assert publishVotes.isEmpty(); + assert joinVotes.isEmpty() : joinVotes; + assert publishVotes.isEmpty() : publishVotes; - assert initialState.term() == 0; - assert initialState.version() == 1; + assert initialState.term() == 0 : initialState; + assert initialState.version() == 1 : initialState; assert initialState.getLastAcceptedConfiguration().isEmpty() == false; assert initialState.getLastCommittedConfiguration().isEmpty() == false; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a8e21da705c..3d5e8900738 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; @@ -63,6 +65,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -480,6 +483,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); + assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) + : preVoteCollector + " vs " + getPreVoteResponse(); if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); @@ -493,7 +498,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert leaderCheckScheduler == null : leaderCheckScheduler; assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false); if (becomingMaster && activePublication == false) { @@ -527,7 +531,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -540,11 +543,42 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader() == null : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } } } + public void setInitialConfiguration(final VotingConfiguration votingConfiguration) { + synchronized (mutex) { + final ClusterState currentState = getStateForMasterService(); + + if (currentState.getLastAcceptedConfiguration().isEmpty() == false) { + throw new CoordinationStateRejectedException("Cannot set initial configuration: configuration has already been set"); + } + assert currentState.term() == 0 : currentState; + assert currentState.version() == 0 : currentState; + + if (mode != Mode.CANDIDATE) { + throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode); + } + + final List knownNodes = new ArrayList<>(); + knownNodes.add(getLocalNode()); + peerFinder.getFoundPeers().forEach(knownNodes::add); + if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) { + throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " + + "[knownNodes=" + knownNodes + ", " + votingConfiguration + "]"); + } + + logger.info("setting initial configuration to {}", votingConfiguration); + final Builder builder = masterService.incrementVersion(currentState); + builder.lastAcceptedConfiguration(votingConfiguration); + builder.lastCommittedConfiguration(votingConfiguration); + coordinationState.get().setInitialState(builder.build()); + preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version + startElectionScheduler(); + } + } + // for tests boolean hasJoinVoteFrom(DiscoveryNode localNode) { return coordinationState.get().containsJoinVoteFor(localNode); @@ -731,25 +765,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery if (foundQuorum) { if (electionScheduler == null) { - final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period - electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { - @Override - public void run() { - synchronized (mutex) { - if (mode == Mode.CANDIDATE) { - if (prevotingRound != null) { - prevotingRound.close(); - } - prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); - } - } - } - - @Override - public String toString() { - return "scheduling of new prevoting round"; - } - }); + startElectionScheduler(); } } else { closePrevotingAndElectionScheduler(); @@ -759,6 +775,30 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } } + private void startElectionScheduler() { + assert electionScheduler == null : electionScheduler; + final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period + electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { + @Override + public void run() { + synchronized (mutex) { + if (mode == Mode.CANDIDATE) { + if (prevotingRound != null) { + prevotingRound.close(); + } + final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); + prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); + } + } + } + + @Override + public String toString() { + return "scheduling of new prevoting round"; + } + }); + } + class CoordinatorPublication extends Publication { private final PublishRequest publishRequest; diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 59e4fc38522..8719baeff9d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -329,7 +329,7 @@ public class MasterService extends AbstractLifecycleComponent { return newClusterState; } - protected Builder incrementVersion(ClusterState clusterState) { + public Builder incrementVersion(ClusterState clusterState) { return ClusterState.builder(clusterState).incrementVersion(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 21bca351f9c..3221bc4d985 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -51,6 +51,7 @@ import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -84,12 +85,15 @@ import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERV import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { @@ -404,6 +408,61 @@ public class CoordinatorTests extends ESTestCase { // assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); } + public void testSettingInitialConfigurationTriggersElection() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); + for (final ClusterNode clusterNode : cluster.clusterNodes) { + final String nodeId = clusterNode.getId(); + assertThat(nodeId + " is CANDIDATE", clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L)); + assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L)); + assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L)); + assertTrue(nodeId + " has an empty last-accepted configuration", + clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty()); + assertTrue(nodeId + " has an empty last-committed configuration", + clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty()); + } + + cluster.getAnyNode().applyInitialConfiguration(); + cluster.stabilise(defaultMillis( + // the first election should succeed, because only one node knows of the initial configuration and therefore can win a + // pre-voting round and proceed to an election, so there cannot be any collisions + ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately + // Allow two round-trip for pre-voting and voting + + 4 * DEFAULT_DELAY_VARIABILITY + // Then a commit of the new leader's first cluster state + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + } + + public void testCannotSetInitialConfigurationTwice() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(); + cluster.stabilise(); + + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration())); + + assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set")); + } + + public void testCannotSetInitialConfigurationWithoutQuorum() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); + final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage(); + assertThat(exceptionMessage, + startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=[")); + assertThat(exceptionMessage, + endsWith("], VotingConfiguration{unknown-node}]")); + assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString())); + + // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. + coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))); + cluster.stabilise(); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -555,6 +614,14 @@ public class CoordinatorTests extends ESTestCase { } break; } + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNode(); + onNode(clusterNode.getLocalNode(), + () -> { + logger.debug("----> [runRandomly {}] applying initial configuration {} to {}", + thisStep, initialConfiguration, clusterNode.getId()); + clusterNode.coordinator.setInitialConfiguration(initialConfiguration); + }).run(); } else { if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { deterministicTaskQueue.advanceTime(); @@ -566,7 +633,6 @@ public class CoordinatorTests extends ESTestCase { // TODO other random steps: // - reboot a node // - abdicate leadership - // - bootstrap } catch (CoordinationStateRejectedException ignored) { // This is ok: it just means a message couldn't currently be handled. @@ -606,6 +672,17 @@ public class CoordinatorTests extends ESTestCase { void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); + + if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { + assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); + assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); + runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); + final ClusterNode bootstrapNode = getAnyNode(); + bootstrapNode.applyInitialConfiguration(); + } else { + logger.info("setting initial configuration not required"); + } + runFor(stabilisationDurationMillis, "stabilising"); fixLag(); assertUniqueLeaderAndExpectedModes(); @@ -706,7 +783,7 @@ public class CoordinatorTests extends ESTestCase { ClusterNode getAnyLeader() { List allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList()); - assertThat(allLeaders, not(empty())); + assertThat("leaders", allLeaders, not(empty())); return randomFrom(allLeaders); } @@ -759,8 +836,8 @@ public class CoordinatorTests extends ESTestCase { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); this.nodeIndex = nodeIndex; localNode = createDiscoveryNode(); - persistedState = new InMemoryPersistedState(1L, - clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); + persistedState = new InMemoryPersistedState(0L, + clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); onNode(localNode, this::setUp).run(); } @@ -917,6 +994,17 @@ public class CoordinatorTests extends ESTestCase { return clusterApplier.lastAppliedClusterState; } + void applyInitialConfiguration() { + onNode(localNode, () -> { + try { + coordinator.setInitialConfiguration(initialConfiguration); + logger.info("successfully set initial configuration to {}", initialConfiguration); + } catch (CoordinationStateRejectedException e) { + logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e); + } + }).run(); + } + private class FakeClusterApplier implements ClusterApplier { final ClusterName clusterName;