Uniquely associate term with update task during election (#62212)

There is a small race when processing the cluster state that is used to
establish a newly elected leader as master of the cluster: it can pick the term
in its master state update task from a different (newer) election. This trips
an assertion in `Coordinator.publish(...)` where we claim that the term on the
state allows to uniquely define the pre-state but this isn't so. There are no
bad consequences of this race since such a publication fails later on anyway.

This PR fixes things so that the assertion holds true by improving the handling
of terms during cluster state processing by associating each master state
update task that is used to establish a newly elected leader with the correct
corresponding term from its election. It also explicitly handles the case where
the pre-state that is used as base state has already superseded the current
state. As a nice side-effect, join batching now only happens based on the same
term.

Closes #61437
This commit is contained in:
Yannick Welsch 2020-10-05 12:29:42 +02:00 committed by David Turner
parent 106695bec8
commit b4a1199e87
1 changed files with 21 additions and 11 deletions

View File

@ -88,7 +88,7 @@ public class JoinHelper {
private final MasterService masterService;
private final TransportService transportService;
private final JoinTaskExecutor joinTaskExecutor;
private volatile JoinTaskExecutor joinTaskExecutor;
private final TimeValue joinTimeout; // only used for Zen1 joining
private final NodeHealthService nodeHealthService;
@ -97,6 +97,8 @@ public class JoinHelper {
private final AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
private final Supplier<JoinTaskExecutor> joinTaskExecutorGenerator;
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
@ -106,23 +108,28 @@ public class JoinHelper {
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
private final long term = currentTermSupplier.getAsLong();
@Override
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)
throws Exception {
// This is called when preparing the next cluster state for publication. There is no guarantee that the term we see here is
// the term under which this state will eventually be published: the current term may be increased after this check due to
// some other activity. That the term is correct is, however, checked properly during publication, so it is sufficient to
// check it here on a best-effort basis. This is fine because a concurrent change indicates the existence of another leader
// in a higher term which will cause this node to stand down.
final long currentTerm = currentTermSupplier.getAsLong();
if (currentState.term() != currentTerm) {
// The current state that MasterService uses might have been updated by a (different) master in a higher term already
// Stop processing the current cluster state update, as there's no point in continuing to compute it as
// it will later be rejected by Coordinator.publish(...) anyhow
if (currentState.term() > term) {
logger.trace("encountered higher term {} than current {}, there is a newer master", currentState.term(), term);
throw new NotMasterException("Higher term encountered (current: " + currentState.term() + " > used: " +
term + "), there is a newer master");
} else if (currentState.nodes().getMasterNodeId() == null && joiningTasks.stream().anyMatch(Task::isBecomeMasterTask)) {
assert currentState.term() < term : "there should be at most one become master task per election (= by term)";
final CoordinationMetadata coordinationMetadata =
CoordinationMetadata.builder(currentState.coordinationMetadata()).term(currentTerm).build();
CoordinationMetadata.builder(currentState.coordinationMetadata()).term(term).build();
final Metadata metadata = Metadata.builder(currentState.metadata()).coordinationMetadata(coordinationMetadata).build();
currentState = ClusterState.builder(currentState).metadata(metadata).build();
} else if (currentState.nodes().isLocalNodeElectedMaster()) {
assert currentState.term() == term : "term should be stable for the same master";
}
return super.execute(currentState, joiningTasks);
}
@ -408,6 +415,7 @@ public class JoinHelper {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader");
assert joinTaskExecutor != null;
masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor, new JoinTaskListener(task, joinCallback));
}
@ -474,10 +482,12 @@ public class JoinHelper {
});
pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {
});
joinTaskExecutor = joinTaskExecutorGenerator.get();
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor);
} else {
assert newMode == Mode.FOLLOWER : newMode;
joinTaskExecutor = null;
joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure(
new CoordinationStateRejectedException("became follower")));
}