diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 92b92ac1cef..dc8bb29ad70 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -88,7 +88,7 @@ public class JoinHelper { private final MasterService masterService; private final TransportService transportService; - private final JoinTaskExecutor joinTaskExecutor; + private volatile JoinTaskExecutor joinTaskExecutor; private final TimeValue joinTimeout; // only used for Zen1 joining private final NodeHealthService nodeHealthService; @@ -97,6 +97,8 @@ public class JoinHelper { private final AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); + private final Supplier joinTaskExecutorGenerator; + JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, BiConsumer joinHandler, Function joinLeaderInTerm, @@ -106,23 +108,28 @@ public class JoinHelper { this.transportService = transportService; this.nodeHealthService = nodeHealthService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); - this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { + this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { + + private final long term = currentTermSupplier.getAsLong(); @Override public ClusterTasksResult execute(ClusterState currentState, List joiningTasks) throws Exception { - // This is called when preparing the next cluster state for publication. There is no guarantee that the term we see here is - // the term under which this state will eventually be published: the current term may be increased after this check due to - // some other activity. That the term is correct is, however, checked properly during publication, so it is sufficient to - // check it here on a best-effort basis. This is fine because a concurrent change indicates the existence of another leader - // in a higher term which will cause this node to stand down. - - final long currentTerm = currentTermSupplier.getAsLong(); - if (currentState.term() != currentTerm) { + // The current state that MasterService uses might have been updated by a (different) master in a higher term already + // Stop processing the current cluster state update, as there's no point in continuing to compute it as + // it will later be rejected by Coordinator.publish(...) anyhow + if (currentState.term() > term) { + logger.trace("encountered higher term {} than current {}, there is a newer master", currentState.term(), term); + throw new NotMasterException("Higher term encountered (current: " + currentState.term() + " > used: " + + term + "), there is a newer master"); + } else if (currentState.nodes().getMasterNodeId() == null && joiningTasks.stream().anyMatch(Task::isBecomeMasterTask)) { + assert currentState.term() < term : "there should be at most one become master task per election (= by term)"; final CoordinationMetadata coordinationMetadata = - CoordinationMetadata.builder(currentState.coordinationMetadata()).term(currentTerm).build(); + CoordinationMetadata.builder(currentState.coordinationMetadata()).term(term).build(); final Metadata metadata = Metadata.builder(currentState.metadata()).coordinationMetadata(coordinationMetadata).build(); currentState = ClusterState.builder(currentState).metadata(metadata).build(); + } else if (currentState.nodes().isLocalNodeElectedMaster()) { + assert currentState.term() == term : "term should be stable for the same master"; } return super.execute(currentState, joiningTasks); } @@ -408,6 +415,7 @@ public class JoinHelper { @Override public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader"); + assert joinTaskExecutor != null; masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor, new JoinTaskListener(task, joinCallback)); } @@ -474,10 +482,12 @@ public class JoinHelper { }); pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> { }); + joinTaskExecutor = joinTaskExecutorGenerator.get(); masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); } else { assert newMode == Mode.FOLLOWER : newMode; + joinTaskExecutor = null; joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure( new CoordinationStateRejectedException("became follower"))); }