Extend duration of fixLag() ()

Today, fixLag() waits for a new cluster state to be committed. However, it does
not account for the fact that a term bump may occur, requiring a new election
to take place after the cluster state is committed. This change fixes this.
This commit is contained in:
David Turner 2018-10-11 23:24:08 +01:00 committed by GitHub
parent a32e303b0c
commit d98199df14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 8 deletions
server/src
main/java/org/elasticsearch/cluster/coordination
test/java/org/elasticsearch/cluster/coordination

View File

@ -63,7 +63,6 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -103,7 +102,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Releasable prevotingRound; private Releasable prevotingRound;
@Nullable @Nullable
private Releasable leaderCheckScheduler; private Releasable leaderCheckScheduler;
private AtomicLong maxTermSeen = new AtomicLong(); private long maxTermSeen;
private Mode mode; private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader; private Optional<DiscoveryNode> lastKnownLeader;
@ -259,16 +258,22 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
} }
private void updateMaxTermSeen(final long term) { private void updateMaxTermSeen(final long term) {
final long updatedMaxTermSeen = maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term));
synchronized (mutex) { synchronized (mutex) {
if (mode == Mode.LEADER && publicationInProgress() == false && updatedMaxTermSeen > getCurrentTerm()) { maxTermSeen = Math.max(maxTermSeen, term);
final long currentTerm = getCurrentTerm();
if (mode == Mode.LEADER && maxTermSeen > currentTerm) {
// Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
// since we check whether a term bump is needed at the end of the publication too. // since we check whether a term bump is needed at the end of the publication too.
ensureTermAtLeast(getLocalNode(), updatedMaxTermSeen); if (publicationInProgress()) {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump",
maxTermSeen, currentTerm);
} else {
ensureTermAtLeast(getLocalNode(), maxTermSeen);
startElection(); startElection();
} }
} }
} }
}
private void startElection() { private void startElection() {
synchronized (mutex) { synchronized (mutex) {
@ -276,7 +281,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
// to check our mode again here. // to check our mode again here.
if (mode == Mode.CANDIDATE) { if (mode == Mode.CANDIDATE) {
final StartJoinRequest startJoinRequest final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen.get()) + 1); = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest); logger.debug("starting election with {}", startJoinRequest);
getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node)); getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
} }

View File

@ -705,7 +705,10 @@ public class CoordinatorTests extends ESTestCase {
leader.submitValue(randomLong()); leader.submitValue(randomLong());
} }
}).run(); }).run();
runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "re-stabilising after lag-fixing publication"); runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY
// may need to bump terms too
+ DEFAULT_ELECTION_DELAY,
"re-stabilising after lag-fixing publication");
} else { } else {
logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion); logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion);
} }