From d98199df14c0986695e84de3df2daad464444d5b Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 11 Oct 2018 23:24:08 +0100 Subject: [PATCH] Extend duration of fixLag() (#34364) Today, fixLag() waits for a new cluster state to be committed. However, it does not account for the fact that a term bump may occur, requiring a new election to take place after the cluster state is committed. This change fixes this. --- .../cluster/coordination/Coordinator.java | 19 ++++++++++++------- .../coordination/CoordinatorTests.java | 5 ++++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3d5e8900738..438aaa7ace4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -63,7 +63,6 @@ import java.util.List; import java.util.Optional; import java.util.Random; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -103,7 +102,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Releasable prevotingRound; @Nullable private Releasable leaderCheckScheduler; - private AtomicLong maxTermSeen = new AtomicLong(); + private long maxTermSeen; private Mode mode; private Optional lastKnownLeader; @@ -259,13 +258,19 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } private void updateMaxTermSeen(final long term) { - final long updatedMaxTermSeen = maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term)); synchronized (mutex) { - if (mode == Mode.LEADER && publicationInProgress() == false && updatedMaxTermSeen > getCurrentTerm()) { + maxTermSeen = Math.max(maxTermSeen, term); + final long currentTerm = getCurrentTerm(); + if (mode == Mode.LEADER && maxTermSeen > currentTerm) { // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that // since we check whether a term bump is needed at the end of the publication too. - ensureTermAtLeast(getLocalNode(), updatedMaxTermSeen); - startElection(); + if (publicationInProgress()) { + logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", + maxTermSeen, currentTerm); + } else { + ensureTermAtLeast(getLocalNode(), maxTermSeen); + startElection(); + } } } } @@ -276,7 +281,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery // to check our mode again here. if (mode == Mode.CANDIDATE) { final StartJoinRequest startJoinRequest - = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen.get()) + 1); + = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1); logger.debug("starting election with {}", startJoinRequest); getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index bdb2d772358..eec4bf41ecf 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -705,7 +705,10 @@ public class CoordinatorTests extends ESTestCase { leader.submitValue(randomLong()); } }).run(); - runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "re-stabilising after lag-fixing publication"); + runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // may need to bump terms too + + DEFAULT_ELECTION_DELAY, + "re-stabilising after lag-fixing publication"); } else { logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion); }