diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index 8a6e6d18332..d5b9cdf6adf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -158,13 +158,25 @@ public class CoordinationState extends AbstractComponent { */ public Join handleStartJoin(StartJoinRequest startJoinRequest) { if (startJoinRequest.getTerm() <= getCurrentTerm()) { - logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]", - startJoinRequest.getTerm(), getCurrentTerm()); + logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]", + startJoinRequest, getCurrentTerm()); throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() + " not greater than current term " + getCurrentTerm()); } - logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm()); + logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest); + + if (joinVotes.isEmpty() == false) { + final String reason; + if (electionWon == false) { + reason = "failed election"; + } else if (startJoinRequest.getSourceNode().equals(localNode)) { + reason = "bumping term"; + } else { + reason = "standing down as leader"; + } + logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason); + } persistedState.setCurrentTerm(startJoinRequest.getTerm()); assert getCurrentTerm() == startJoinRequest.getTerm(); @@ -232,6 +244,7 @@ public class CoordinationState extends AbstractComponent { join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion()); if (electionWon && prevElectionWon == false) { + logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes); lastPublishedVersion = getLastAcceptedVersion(); } return added; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index be766a86141..d04339cb410 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -99,7 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.persistedStateSupplier = persistedStateSupplier; this.lastKnownLeader = Optional.empty(); this.lastJoin = Optional.empty(); - this.joinAccumulator = joinHelper.new CandidateJoinAccumulator(); + this.joinAccumulator = new InitialJoinAccumulator(); this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings); this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index bc01612b899..500f1642329 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -202,7 +202,6 @@ public class JoinHelper extends AbstractComponent { void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback); default void close(Mode newMode) { - } } @@ -220,6 +219,19 @@ public class JoinHelper extends AbstractComponent { } } + static class InitialJoinAccumulator implements JoinAccumulator { + @Override + public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { + assert false : "unexpected join from " + sender + " during initialisation"; + joinCallback.onFailure(new CoordinationStateRejectedException("join target is not initialised yet")); + } + + @Override + public String toString() { + return "InitialJoinAccumulator"; + } + } + static class FollowerJoinAccumulator implements JoinAccumulator { @Override public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { @@ -265,13 +277,14 @@ public class JoinHelper extends AbstractComponent { }); masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); - } else if (newMode == Mode.FOLLOWER) { + } else { + assert newMode == Mode.FOLLOWER : newMode; joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure( new CoordinationStateRejectedException("became follower"))); - } else { - assert newMode == Mode.CANDIDATE; - assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet(); } + + // CandidateJoinAccumulator is only closed when becoming leader or follower, otherwise it accumulates all joins received + // regardless of term. } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index 6d2486da695..0adf483d5aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -170,8 +170,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor processException(handler, rtx)); + threadPool.executor(handler.executor()).execute(new Runnable() { + @Override + public void run() { + processException(handler, rtx); + } + + @Override + public String toString() { + return "delivery of exception response to [" + action + "][" + requestId + "]: " + exception; + } + }); } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c8588d115fd..0113e8de2c2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -76,6 +76,8 @@ public class CoordinatorTests extends ESTestCase { final ClusterNode leader = cluster.getAnyLeader(); long finalValue = randomLong(); + + logger.info("--> submitting value [{}] to [{}]", finalValue, leader); leader.submitValue(finalValue); cluster.stabilise(); // TODO this should only need a short stabilisation @@ -96,6 +98,7 @@ public class CoordinatorTests extends ESTestCase { final List clusterNodes; final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + // TODO does ThreadPool need a node name any more? Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build()); private final VotingConfiguration initialConfiguration; @@ -155,7 +158,7 @@ public class CoordinatorTests extends ESTestCase { final String nodeId = clusterNode.getId(); assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - assertTrue("leader should have received a vote from " + nodeId, + assertTrue("leader " + leader.getId() + " should have received a vote from " + nodeId, leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); @@ -267,7 +270,7 @@ public class CoordinatorTests extends ESTestCase { } void submitValue(final long value) { - masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() { + onNode(localNode, () -> masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return setValue(currentState, value); @@ -277,7 +280,12 @@ public class CoordinatorTests extends ESTestCase { public void onFailure(String source, Exception e) { logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e); } - }); + })).run(); + } + + @Override + public String toString() { + return localNode.toString(); } }