diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 972cf28ecc1..17601aeedf0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -119,6 +119,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private long maxTermSeen; private final Reconfigurator reconfigurator; private final ClusterBootstrapService clusterBootstrapService; + private final LagDetector lagDetector; private Mode mode; private Optional lastKnownLeader; @@ -157,6 +158,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery masterService.setClusterStateSupplier(this::getStateForMasterService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService); + this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), + transportService::getLocalNode); } private Runnable getOnLeaderFailure() { @@ -374,6 +377,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery followersChecker.clearCurrentNodes(); followersChecker.updateFastResponseState(getCurrentTerm(), mode); + lagDetector.clearTrackedNodes(); if (applierState.nodes().getMasterNodeId() != null) { applierState = clusterStateWithNoMasterBlock(applierState); @@ -428,6 +432,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery followersChecker.clearCurrentNodes(); followersChecker.updateFastResponseState(getCurrentTerm(), mode); + lagDetector.clearTrackedNodes(); } private PreVoteResponse getPreVoteResponse() { @@ -512,6 +517,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector + " vs " + getPreVoteResponse(); + + assert lagDetector.getTrackedNodes().contains(getLocalNode()) == false : lagDetector.getTrackedNodes(); + assert followersChecker.getKnownFollowers().equals(lagDetector.getTrackedNodes()) + : followersChecker.getKnownFollowers() + " vs " + lagDetector.getTrackedNodes(); + if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); @@ -831,8 +841,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } }); - leaderChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes()); - followersChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes()); + final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes(); + leaderChecker.setCurrentNodes(publishNodes); + followersChecker.setCurrentNodes(publishNodes); + lagDetector.setTrackedNodes(publishNodes); publication.start(followersChecker.getFaultyNodes()); } } catch (Exception e) { @@ -985,6 +997,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } } else { ackListener.onNodeAck(node, e); + if (e == null) { + lagDetector.setAppliedVersion(node, publishRequest.getAcceptedState().version()); + } } } }, @@ -1051,6 +1066,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery if (mode == Mode.LEADER) { scheduleReconfigurationIfNeeded(); } + lagDetector.startLagDetector(publishRequest.getAcceptedState().version()); } ackListener.onNodeAck(getLocalNode(), null); publishListener.onResponse(null); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java new file mode 100644 index 00000000000..48ffb96aa74 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java @@ -0,0 +1,169 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; + +/** + * A publication can succeed and complete before all nodes have applied the published state and acknowledged it; however we need every node + * eventually either to apply the published state (or a later state) or be removed from the cluster. This component achieves this by + * removing any lagging nodes from the cluster after a timeout. + */ +public class LagDetector { + + private static final Logger logger = LogManager.getLogger(LagDetector.class); + + // the timeout for each node to apply a cluster state update after the leader has applied it, before being removed from the cluster + public static final Setting CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING = + Setting.timeSetting("cluster.follower_lag.timeout", + TimeValue.timeValueMillis(90000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + private final TimeValue clusterStateApplicationTimeout; + private final Consumer onLagDetected; + private final Supplier localNodeSupplier; + private final ThreadPool threadPool; + private final Map appliedStateTrackersByNode = newConcurrentMap(); + + public LagDetector(final Settings settings, final ThreadPool threadPool, final Consumer onLagDetected, + final Supplier localNodeSupplier) { + this.threadPool = threadPool; + this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings); + this.onLagDetected = onLagDetected; + this.localNodeSupplier = localNodeSupplier; + } + + public void setTrackedNodes(final Iterable discoveryNodes) { + final Set discoveryNodeSet = new HashSet<>(); + discoveryNodes.forEach(discoveryNodeSet::add); + discoveryNodeSet.remove(localNodeSupplier.get()); + appliedStateTrackersByNode.keySet().retainAll(discoveryNodeSet); + discoveryNodeSet.forEach(node -> appliedStateTrackersByNode.putIfAbsent(node, new NodeAppliedStateTracker(node))); + } + + public void clearTrackedNodes() { + appliedStateTrackersByNode.clear(); + } + + public void setAppliedVersion(final DiscoveryNode discoveryNode, final long appliedVersion) { + final NodeAppliedStateTracker nodeAppliedStateTracker = appliedStateTrackersByNode.get(discoveryNode); + if (nodeAppliedStateTracker == null) { + // Received an ack from a node that a later publication has removed (or we are no longer master). No big deal. + logger.trace("node {} applied version {} but this node's version is not being tracked", discoveryNode, appliedVersion); + } else { + nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion); + } + } + + public void startLagDetector(final long version) { + final List laggingTrackers + = appliedStateTrackersByNode.values().stream().filter(t -> t.appliedVersionLessThan(version)).collect(Collectors.toList()); + + if (laggingTrackers.isEmpty()) { + logger.trace("lag detection for version {} is unnecessary: {}", version, appliedStateTrackersByNode.values()); + } else { + logger.debug("starting lag detector for version {}: {}", version, laggingTrackers); + + threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.GENERIC, new Runnable() { + @Override + public void run() { + laggingTrackers.forEach(t -> t.checkForLag(version)); + } + + @Override + public String toString() { + return "lag detector for version " + version + " on " + laggingTrackers; + } + }); + } + } + + @Override + public String toString() { + return "LagDetector{" + + "clusterStateApplicationTimeout=" + clusterStateApplicationTimeout + + ", appliedStateTrackersByNode=" + appliedStateTrackersByNode.values() + + '}'; + } + + // for assertions + Set getTrackedNodes() { + return Collections.unmodifiableSet(appliedStateTrackersByNode.keySet()); + } + + private class NodeAppliedStateTracker { + private final DiscoveryNode discoveryNode; + private final AtomicLong appliedVersion = new AtomicLong(); + + NodeAppliedStateTracker(final DiscoveryNode discoveryNode) { + this.discoveryNode = discoveryNode; + } + + void increaseAppliedVersion(long appliedVersion) { + long maxAppliedVersion = this.appliedVersion.updateAndGet(v -> Math.max(v, appliedVersion)); + logger.trace("{} applied version {}, max now {}", this, appliedVersion, maxAppliedVersion); + } + + boolean appliedVersionLessThan(final long version) { + return appliedVersion.get() < version; + } + + @Override + public String toString() { + return "NodeAppliedStateTracker{" + + "discoveryNode=" + discoveryNode + + ", appliedVersion=" + appliedVersion + + '}'; + } + + void checkForLag(final long version) { + if (appliedStateTrackersByNode.get(discoveryNode) != this) { + logger.trace("{} no longer active when checking version {}", this, version); + return; + } + + long appliedVersion = this.appliedVersion.get(); + if (version <= appliedVersion) { + logger.trace("{} satisfied when checking version {}, node applied version {}", this, version, appliedVersion); + return; + } + + logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion); + onLagDetected.accept(discoveryNode); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index aa8d3aaa4db..69c63682b9d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; +import org.elasticsearch.cluster.coordination.LagDetector; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.coordination.FollowersChecker; @@ -469,7 +470,8 @@ public final class ClusterSettings extends AbstractScopedSettings { LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING, Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION, TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING, - ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING + ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING, + LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING ))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index f8405b639d7..9bf45ec1547 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -599,7 +599,7 @@ public class CoordinatorTests extends ESTestCase { assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); - cluster.stabilise(); + cluster.stabilise(defaultMillis(PUBLISH_TIMEOUT_SETTING)); assertTrue("expected eventual ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); assertFalse("expected no ack from " + follower0, ackCollector.hasAcked(follower0)); } @@ -1101,7 +1101,6 @@ public class CoordinatorTests extends ESTestCase { } runFor(stabilisationDurationMillis, "stabilising"); - fixLag(); final ClusterNode leader = getAnyLeader(); final long leaderTerm = leader.coordinator.getCurrentTerm(); @@ -1158,35 +1157,6 @@ public class CoordinatorTests extends ESTestCase { leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState)); } - // TODO remove this when lag detection is implemented - void fixLag() { - final ClusterNode leader = getAnyLeader(); - final long leaderVersion = leader.getLastAppliedClusterState().version(); - final long minVersion = clusterNodes.stream() - .filter(n -> isConnectedPair(n, leader)) - .map(n -> n.getLastAppliedClusterState().version()).min(Long::compare).orElse(Long.MIN_VALUE); - assert minVersion >= 0; - if (minVersion < leaderVersion) { - logger.info("--> fixLag publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion); - onNode(leader.getLocalNode(), () -> { - synchronized (leader.coordinator.mutex) { - leader.submitValue(randomLong()); - } - }).run(); - - runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY - // may need to bump terms too - + DEFAULT_ELECTION_DELAY, - "re-stabilising after lag-fixing publication"); - - if (clusterNodes.stream().anyMatch(n -> n.getClusterStateApplyResponse().equals(ClusterStateApplyResponse.HANG))) { - runFor(defaultMillis(PUBLISH_TIMEOUT_SETTING), "allowing lag-fixing publication to time out"); - } - } else { - logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion); - } - } - void runFor(long runDurationMillis, String description) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis; logger.info("--> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LagDetectorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LagDetectorTests.java new file mode 100644 index 00000000000..4318c97adbd --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LagDetectorTests.java @@ -0,0 +1,244 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.elasticsearch.cluster.coordination.LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class LagDetectorTests extends ESTestCase { + + private DeterministicTaskQueue deterministicTaskQueue; + private Set failedNodes; + private LagDetector lagDetector; + private DiscoveryNode node1, node2, localNode; + private TimeValue followerLagTimeout; + + @Before + public void setupFixture() { + deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); + + failedNodes = new HashSet<>(); + + Settings.Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + followerLagTimeout = TimeValue.timeValueMillis(randomLongBetween(2, 100000)); + settingsBuilder.put(CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.getKey(), followerLagTimeout.millis() + "ms"); + } else { + followerLagTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(Settings.EMPTY); + } + + lagDetector = new LagDetector(settingsBuilder.build(), deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode); + + localNode = CoordinationStateTests.createNode("local"); + node1 = CoordinationStateTests.createNode("node1"); + node2 = CoordinationStateTests.createNode("node2"); + } + + public void testLagDetectorNotStartedIfNodeHasAlreadyAppliedVersion() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.setAppliedVersion(node1, 1); + lagDetector.startLagDetector(1); + final long startTime = deterministicTaskQueue.getCurrentTimeMillis(); + deterministicTaskQueue.runAllTasks(); + assertThat("no lag detector started", deterministicTaskQueue.getCurrentTimeMillis(), is(startTime)); + assertThat(failedNodes, empty()); + } + + public void testNoLagDetectedIfNodeAppliesVersionAfterLagDetectorStarted() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + lagDetector.setAppliedVersion(node1, 1); + final long startTime = deterministicTaskQueue.getCurrentTimeMillis(); + deterministicTaskQueue.runAllTasks(); + assertThat("lag detector started", deterministicTaskQueue.getCurrentTimeMillis(), greaterThan(startTime)); + assertThat(failedNodes, empty()); + } + + public void testNoLagDetectedIfNodeAppliesVersionJustBeforeTimeout() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + followerLagTimeout.millis() - 1, + () -> lagDetector.setAppliedVersion(node1, 1)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, empty()); + } + + public void testLagDetectedIfNodeAppliesVersionJustAfterTimeout() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + followerLagTimeout.millis() + 1, + () -> lagDetector.setAppliedVersion(node1, 1)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, contains(node1)); + } + + public void testNoLagDetectedOnLocalNode() { + lagDetector.setTrackedNodes(Collections.singletonList(localNode)); + lagDetector.startLagDetector(1); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, empty()); + } + + public void testNoLagDetectedIfNodeAppliesLaterVersionAfterLagDetectorStarted() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + lagDetector.setAppliedVersion(node1, 2); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, empty()); + } + + public void testLagDetectedIfNodeAppliesEarlierVersionAfterLagDetectorStarted() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(2); + lagDetector.setAppliedVersion(node1, 1); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, contains(node1)); + } + + public void testNoLagDetectedIfNodeIsRemovedAfterLagDetectorStarted() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + lagDetector.setTrackedNodes(Collections.singletonList(node2)); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, empty()); + + lagDetector.startLagDetector(2); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, contains(node2)); + } + + public void testNoLagDetectedIfDetectorIsClearedAfterLagDetectorStarted() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + lagDetector.clearTrackedNodes(); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, empty()); + + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(2); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, contains(node1)); + } + + public void testDetectorIgnoresNodesAddedAfterStarted() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + lagDetector.setTrackedNodes(Arrays.asList(node1, node2)); + lagDetector.setAppliedVersion(node1, 1); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, empty()); + } + + public void testDetectorKeepsTrackingExistingNodesEvenIfNewOnesAdded() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + lagDetector.setTrackedNodes(Arrays.asList(node1, node2)); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, contains(node1)); + } + + public void testDetectorIgnoresApplicationsFromUnknownNodes() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(1); + lagDetector.setAppliedVersion(node2, 1); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, contains(node1)); + + failedNodes.clear(); + lagDetector.startLagDetector(2); + deterministicTaskQueue.runAllTasks(); + assertThat(failedNodes, contains(node1)); + } + + public void testLagDetection() { + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.setAppliedVersion(node1, 1); + lagDetector.startLagDetector(1); + { + final long startTime = deterministicTaskQueue.getCurrentTimeMillis(); + deterministicTaskQueue.runAllTasks(); + assertThat("no lag detector started", deterministicTaskQueue.getCurrentTimeMillis(), is(startTime)); + } + assertThat(failedNodes, empty()); + + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + lagDetector.startLagDetector(2); + lagDetector.setAppliedVersion(node1, 2); + { + final long startTime = deterministicTaskQueue.getCurrentTimeMillis(); + deterministicTaskQueue.runAllTasks(); + assertThat("lag detector started", deterministicTaskQueue.getCurrentTimeMillis(), greaterThan(startTime)); + } + assertThat(failedNodes, empty()); + + lagDetector.startLagDetector(3); + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + followerLagTimeout.millis() - 1, + () -> lagDetector.setAppliedVersion(node1, 3)); + assertThat(failedNodes, empty()); + + lagDetector.startLagDetector(4); + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + followerLagTimeout.millis() + 1, + () -> lagDetector.setAppliedVersion(node1, 4)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, contains(node1)); + failedNodes.clear(); + + lagDetector.startLagDetector(5); + lagDetector.clearTrackedNodes(); + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, empty()); // clearing tracked nodes cancels earlier lag detector ... + + lagDetector.startLagDetector(6); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, contains(node1)); + failedNodes.clear(); // ... but later lag detectors still work + + lagDetector.setTrackedNodes(Collections.singletonList(node2)); + lagDetector.setAppliedVersion(node2, 7); + lagDetector.startLagDetector(7); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, empty()); // removing a node from the tracked set means it is not tracked + + lagDetector.startLagDetector(8); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, contains(node2)); + failedNodes.clear(); + + lagDetector.startLagDetector(9); + lagDetector.setTrackedNodes(Collections.singletonList(node1)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(failedNodes, empty()); // nodes added after a lag detector was started are also ignored + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 3042e794cef..914ee1e95f7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -62,8 +62,7 @@ public class PublicationTests extends ESTestCase { this.localNode = localNode; ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode, CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG, CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG, 0L); - coordinationState = new CoordinationState(settings, localNode, new InMemoryPersistedState(0L, - initialState)); + coordinationState = new CoordinationState(settings, localNode, new InMemoryPersistedState(0L, initialState)); } final DiscoveryNode localNode; @@ -436,6 +435,4 @@ public class PublicationTests extends ESTestCase { return ts.stream(); }); } - - }