From 9d417984bde50608f4c58b76dfbed64e3e2ff17e Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 7 Dec 2018 17:23:20 +0000 Subject: [PATCH] [Zen2] Add warning if cluster fails to form fast enough (#35993) * Add warning if cluster fails to form fast enough Today if a leader is not discovered or elected then nodes are essentially silent at INFO and above, and log copiously at DEBUG and below. A short delay when electing a leader is not unusual, for instance if other nodes have not yet started, but a persistent failure to elect a leader is a problem worthy of log messages in the default configuration. With this change, while there is no leader each node outputs a WARN-level log message every 10 seconds (by default) indicating as such, describing the current discovery state and the current quorum(s). * Add note about whether the discovered nodes form a quorum or not * Introduce separate ClusterFormationFailureHelper ... and back out the unnecessary changes elsewhere * It can be volatile --- .../ClusterFormationFailureHelper.java | 209 +++++++++++++ .../cluster/coordination/Coordinator.java | 19 +- .../common/settings/ClusterSettings.java | 4 +- .../elasticsearch/discovery/PeerFinder.java | 11 +- .../ClusterFormationFailureHelperTests.java | 294 ++++++++++++++++++ 5 files changed, 532 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java new file mode 100644 index 00000000000..79142ec07c1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -0,0 +1,209 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; +import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; + +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; + +public class ClusterFormationFailureHelper { + private static final Logger logger = LogManager.getLogger(ClusterFormationFailureHelper.class); + + public static final Setting DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING = + Setting.timeSetting("discovery.cluster_formation_warning_timeout", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + private final Supplier clusterFormationStateSupplier; + private final ThreadPool threadPool; + private final TimeValue clusterFormationWarningTimeout; + @Nullable // if no warning is scheduled + private volatile WarningScheduler warningScheduler; + + public ClusterFormationFailureHelper(Settings settings, Supplier clusterFormationStateSupplier, + ThreadPool threadPool) { + this.clusterFormationStateSupplier = clusterFormationStateSupplier; + this.threadPool = threadPool; + this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings); + } + + public boolean isRunning() { + return warningScheduler != null; + } + + public void start() { + assert warningScheduler == null; + warningScheduler = new WarningScheduler(); + warningScheduler.scheduleNextWarning(); + } + + public void stop() { + warningScheduler = null; + } + + private class WarningScheduler { + + private boolean isActive() { + return warningScheduler == this; + } + + void scheduleNextWarning() { + threadPool.scheduleUnlessShuttingDown(clusterFormationWarningTimeout, Names.GENERIC, new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.debug("unexpected exception scheduling cluster formation warning", e); + } + + @Override + protected void doRun() { + if (isActive()) { + logger.warn(clusterFormationStateSupplier.get().getDescription()); + } + } + + @Override + public void onAfter() { + if (isActive()) { + scheduleNextWarning(); + } + } + + @Override + public String toString() { + return "emit warning if cluster not formed"; + } + }); + } + } + + static class ClusterFormationState { + private final Settings settings; + private final ClusterState clusterState; + private final List resolvedAddresses; + private final List foundPeers; + + ClusterFormationState(Settings settings, ClusterState clusterState, List resolvedAddresses, + List foundPeers) { + this.settings = settings; + this.clusterState = clusterState; + this.resolvedAddresses = resolvedAddresses; + this.foundPeers = foundPeers; + } + + String getDescription() { + final List clusterStateNodes + = StreamSupport.stream(clusterState.nodes().spliterator(), false).map(DiscoveryNode::toString).collect(Collectors.toList()); + + final String discoveryWillContinueDescription = String.format(Locale.ROOT, + "discovery will continue using %s from hosts providers and %s from last-known cluster state", + resolvedAddresses, clusterStateNodes); + + final String discoveryStateIgnoringQuorum = String.format(Locale.ROOT, "have discovered %s; %s", + foundPeers, discoveryWillContinueDescription); + + if (clusterState.nodes().getLocalNode().isMasterNode() == false) { + return String.format(Locale.ROOT, "master not discovered yet: %s", discoveryStateIgnoringQuorum); + } + + if (clusterState.getLastAcceptedConfiguration().isEmpty()) { + + // TODO handle the case that there is a 6.x node around here, when rolling upgrades are supported + + final String bootstrappingDescription; + + if (INITIAL_MASTER_NODE_COUNT_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODE_COUNT_SETTING.get(settings)) + && INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) { + bootstrappingDescription = "cluster bootstrapping is disabled on this node"; + } else if (INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) { + bootstrappingDescription = String.format(Locale.ROOT, + "this node must discover at least [%d] master-eligible nodes to bootstrap a cluster", + INITIAL_MASTER_NODE_COUNT_SETTING.get(settings)); + } else if (INITIAL_MASTER_NODE_COUNT_SETTING.get(settings) <= INITIAL_MASTER_NODES_SETTING.get(settings).size()) { + // TODO update this when we can bootstrap on only a quorum of the initial nodes + bootstrappingDescription = String.format(Locale.ROOT, + "this node must discover master-eligible nodes %s to bootstrap a cluster", + INITIAL_MASTER_NODES_SETTING.get(settings)); + } else { + // TODO update this when we can bootstrap on only a quorum of the initial nodes + bootstrappingDescription = String.format(Locale.ROOT, + "this node must discover at least [%d] master-eligible nodes, including %s, to bootstrap a cluster", + INITIAL_MASTER_NODE_COUNT_SETTING.get(settings), INITIAL_MASTER_NODES_SETTING.get(settings)); + } + + return String.format(Locale.ROOT, + "master not discovered yet, this node has not previously joined a bootstrapped (v%d+) cluster, and %s: %s", + Version.V_6_6_0.major + 1, bootstrappingDescription, discoveryStateIgnoringQuorum); + } + + assert clusterState.getLastCommittedConfiguration().isEmpty() == false; + + final String quorumDescription; + if (clusterState.getLastAcceptedConfiguration().equals(clusterState.getLastCommittedConfiguration())) { + quorumDescription = describeQuorum(clusterState.getLastAcceptedConfiguration()); + } else { + quorumDescription = describeQuorum(clusterState.getLastAcceptedConfiguration()) + + " and " + + describeQuorum(clusterState.getLastCommittedConfiguration()); + } + + final VoteCollection voteCollection = new VoteCollection(); + foundPeers.forEach(voteCollection::addVote); + final String isQuorumOrNot + = CoordinationState.isElectionQuorum(voteCollection, clusterState) ? "is a quorum" : "is not a quorum"; + + return String.format(Locale.ROOT, + "master not discovered or elected yet, an election requires %s, have discovered %s which %s; %s", + quorumDescription, foundPeers, isQuorumOrNot, discoveryWillContinueDescription); + } + + private String describeQuorum(VotingConfiguration votingConfiguration) { + final Set nodeIds = votingConfiguration.getNodeIds(); + assert nodeIds.isEmpty() == false; + + if (nodeIds.size() == 1) { + return "a node with id " + nodeIds; + } else if (nodeIds.size() == 2) { + return "two nodes with ids " + nodeIds; + } else { + return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index ec73ee25088..ef4fd454a11 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -32,8 +32,9 @@ import org.elasticsearch.cluster.ClusterState.Builder; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; +import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; import org.elasticsearch.cluster.metadata.MetaData; @@ -121,6 +122,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final Reconfigurator reconfigurator; private final ClusterBootstrapService clusterBootstrapService; private final LagDetector lagDetector; + private final ClusterFormationFailureHelper clusterFormationFailureHelper; private Mode mode; private Optional lastKnownLeader; @@ -161,6 +163,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); + this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, + transportService.getThreadPool()); + } + + private ClusterFormationState getClusterFormationState() { + return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(), + StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList())); } private Runnable getOnLeaderFailure() { @@ -374,6 +383,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery joinAccumulator = joinHelper.new CandidateJoinAccumulator(); peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes()); + clusterFormationFailureHelper.start(); leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); leaderChecker.updateLeader(null); @@ -404,6 +414,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery lastKnownLeader = Optional.of(getLocalNode()); peerFinder.deactivate(getLocalNode()); + clusterFormationFailureHelper.stop(); closePrevotingAndElectionScheduler(); preVoteCollector.update(getPreVoteResponse(), getLocalNode()); @@ -428,6 +439,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery lastKnownLeader = Optional.of(leaderNode); peerFinder.deactivate(leaderNode); + clusterFormationFailureHelper.stop(); closePrevotingAndElectionScheduler(); cancelActivePublication(); preVoteCollector.update(getPreVoteResponse(), leaderNode); @@ -543,6 +555,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert leaderChecker.leader() == null : leaderChecker.leader(); assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; + assert clusterFormationFailureHelper.isRunning() == false; final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false); if (becomingMaster && activePublication == false) { @@ -582,6 +595,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; + assert clusterFormationFailureHelper.isRunning() == false; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -594,6 +608,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader() == null : preVoteCollector; + assert clusterFormationFailureHelper.isRunning(); } } } @@ -823,7 +838,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery Strings.toString(clusterChangedEvent.previousState()).equals( Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()))) : Strings.toString(clusterChangedEvent.previousState()) + " vs " - + Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState())); + + Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState())); final ClusterState clusterState = clusterChangedEvent.state(); diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 401cdd9059b..7775f1de1a2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -33,11 +33,12 @@ import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; -import org.elasticsearch.cluster.coordination.LagDetector; +import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.coordination.FollowersChecker; import org.elasticsearch.cluster.coordination.JoinHelper; +import org.elasticsearch.cluster.coordination.LagDetector; import org.elasticsearch.cluster.coordination.LeaderChecker; import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.metadata.IndexGraveyard; @@ -457,6 +458,7 @@ public final class ClusterSettings extends AbstractScopedSettings { EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, + ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING, ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index f1e930f8e2e..36090f645f6 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -62,6 +62,7 @@ import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; public abstract class PeerFinder { @@ -94,6 +95,7 @@ public abstract class PeerFinder { private DiscoveryNodes lastAcceptedNodes; private final Map peersByAddress = newConcurrentMap(); private Optional leader = Optional.empty(); + private volatile List lastResolvedAddresses = emptyList(); public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { @@ -164,7 +166,7 @@ public abstract class PeerFinder { knownPeers = getFoundPeersUnderLock(); } else { assert leader.isPresent() || lastAcceptedNodes == null; - knownPeers = Collections.emptyList(); + knownPeers = emptyList(); } return new PeersResponse(leader, knownPeers, currentTerm); } @@ -207,6 +209,10 @@ public abstract class PeerFinder { */ protected abstract void onFoundPeersUpdated(); + public List getLastResolvedAddresses() { + return lastResolvedAddresses; + } + public interface TransportAddressConnector { /** * Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it. @@ -266,6 +272,7 @@ public abstract class PeerFinder { configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> { synchronized (mutex) { + lastResolvedAddresses = providedAddresses; logger.trace("probing resolved transport addresses {}", providedAddresses); providedAddresses.forEach(this::startProbe); } @@ -495,7 +502,7 @@ public abstract class PeerFinder { @Override public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception { final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(), - Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(Collections.emptyList())); + Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(emptyList())); final PeersResponse peersResponse = handlePeersRequest(peersRequest); final List pingResponses = new ArrayList<>(); final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java new file mode 100644 index 00000000000..40b15709fb7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -0,0 +1,294 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class ClusterFormationFailureHelperTests extends ESTestCase { + public void testScheduling() { + final long expectedDelayMillis; + final Settings.Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + expectedDelayMillis + = ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(Settings.EMPTY).millis(); + } else { + expectedDelayMillis = randomLongBetween(100, 100000); + settingsBuilder.put(ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.getKey(), + expectedDelayMillis + "ms"); + } + + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + final DeterministicTaskQueue deterministicTaskQueue + = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); + + final AtomicLong warningCount = new AtomicLong(); + + final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(), + () -> { + warningCount.incrementAndGet(); + return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList()); + }, + deterministicTaskQueue.getThreadPool()); + + deterministicTaskQueue.runAllTasks(); + assertThat("should not schedule anything yet", warningCount.get(), is(0L)); + + final long startTimeMillis = deterministicTaskQueue.getCurrentTimeMillis(); + clusterFormationFailureHelper.start(); + + while (warningCount.get() == 0) { + assertTrue(clusterFormationFailureHelper.isRunning()); + if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } else { + deterministicTaskQueue.advanceTime(); + } + } + assertThat(warningCount.get(), is(1L)); + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, is(expectedDelayMillis)); + + while (warningCount.get() < 5) { + assertTrue(clusterFormationFailureHelper.isRunning()); + if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } else { + deterministicTaskQueue.advanceTime(); + } + } + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, equalTo(5 * expectedDelayMillis)); + + clusterFormationFailureHelper.stop(); + assertFalse(clusterFormationFailureHelper.isRunning()); + deterministicTaskQueue.runAllTasksInTimeOrder(); + + assertThat(warningCount.get(), is(5L)); + + warningCount.set(0); + clusterFormationFailureHelper.start(); + clusterFormationFailureHelper.stop(); + clusterFormationFailureHelper.start(); + final long secondStartTimeMillis = deterministicTaskQueue.getCurrentTimeMillis(); + + while (warningCount.get() < 5) { + assertTrue(clusterFormationFailureHelper.isRunning()); + if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } else { + deterministicTaskQueue.advanceTime(); + } + } + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - secondStartTimeMillis, equalTo(5 * expectedDelayMillis)); + + clusterFormationFailureHelper.stop(); + assertFalse(clusterFormationFailureHelper.isRunning()); + deterministicTaskQueue.runAllTasksInTimeOrder(); + + assertThat(warningCount.get(), is(5L)); + } + + public void testDescriptionOnMasterIneligibleNodes() { + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet: have discovered []; discovery will continue using [] from hosts providers and [" + localNode + + "] from last-known cluster state")); + + final TransportAddress otherAddress = buildNewFakeTransportAddress(); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList()).getDescription(), + is("master not discovered yet: have discovered []; discovery will continue using [" + otherAddress + + "] from hosts providers and [" + localNode + "] from last-known cluster state")); + + final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode)).getDescription(), + is("master not discovered yet: have discovered [" + otherNode + "]; discovery will continue using [] from hosts providers and [" + + localNode + "] from last-known cluster state")); + } + + public void testDescriptionBeforeBootstrapping() { + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "cluster bootstrapping is disabled on this node: have discovered []; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + final TransportAddress otherAddress = buildNewFakeTransportAddress(); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "cluster bootstrapping is disabled on this node: have discovered []; " + + "discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode + + "] from last-known cluster state")); + + final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode)).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "cluster bootstrapping is disabled on this node: have discovered [" + otherNode + "]; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.builder().put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 2).build(), + clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "this node must discover at least [2] master-eligible nodes to bootstrap a cluster: have discovered []; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other").build(), + clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "this node must discover master-eligible nodes [other] to bootstrap a cluster: have discovered []; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other") + .put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1).build(), + clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "this node must discover master-eligible nodes [other] to bootstrap a cluster: have discovered []; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other") + .put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 2).build(), + clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "this node must discover at least [2] master-eligible nodes, including [other], to bootstrap a cluster: have discovered " + + "[]; discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + } + + private static VotingConfiguration config(String[] nodeIds) { + return new VotingConfiguration(Arrays.stream(nodeIds).collect(Collectors.toSet())); + } + + private static ClusterState state(DiscoveryNode localNode, String... configNodeIds) { + return state(localNode, configNodeIds, configNodeIds); + } + + private static ClusterState state(DiscoveryNode localNode, String[] acceptedConfig, String[] committedConfig) { + return ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())) + .metaData(MetaData.builder().coordinationMetaData(CoordinationMetaData.builder() + .lastAcceptedConfiguration(config(acceptedConfig)) + .lastCommittedConfiguration(config(committedConfig)).build())).build(); + } + + public void testDescriptionAfterBootstrapping() { + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + + final ClusterState clusterState = state(localNode, "otherNode"); + + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [otherNode], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + final TransportAddress otherAddress = buildNewFakeTransportAddress(); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [otherNode], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode + + "] from last-known cluster state")); + + final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode)).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [otherNode], " + + "have discovered [" + otherNode + "] which is a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + final DiscoveryNode yetAnotherNode = new DiscoveryNode("yetAnotherNode", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode)).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [otherNode], " + + "have discovered [" + yetAnotherNode + "] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2"), emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3"), emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires at least 2 nodes with ids from [n1, n2, n3], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4"), emptyList(), emptyList()) + .getDescription(), + is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", "n5"), emptyList(), emptyList()) + .getDescription(), + is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4, n5], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n1"}), + emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [n1], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2"}), + emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [n1] and a node with id [n2], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3"}), + emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [n1] and two nodes with ids [n2, n3], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3", "n4"}), + emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [n1] and " + + "at least 2 nodes with ids from [n2, n3, n4], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + } +}