From 02b483c372683a0abfde2f7c398463e58d181581 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 24 Sep 2018 20:07:32 +0100 Subject: [PATCH] Logging improvements in CoordinatorTests (#33991) Today, we know that CoordinatorTests sometimes fail to stabilise due to an election collision. This change improves the logging that occurs when an election collision occurs so it will be easier to see if this is happening when analysing a test failure. We also wrap the call to masterService.submitStateUpdateTask() in a context that logs the node on which it runs. We also introduce the InitialJoinAccumulator instead of using a placeholder CandidateJoinAccumulator at startup, which reduces the cases to consider in CandidateJoinAccumulator.close() and tightens up the assertions we can make here. --- .../coordination/CoordinationState.java | 19 ++++++++++++--- .../cluster/coordination/Coordinator.java | 3 ++- .../cluster/coordination/JoinHelper.java | 23 +++++++++++++++---- .../coordination/JoinTaskExecutor.java | 4 ++-- .../transport/TransportService.java | 12 +++++++++- .../coordination/CoordinatorTests.java | 14 ++++++++--- 6 files changed, 60 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index 8a6e6d18332..d5b9cdf6adf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -158,13 +158,25 @@ public class CoordinationState extends AbstractComponent { */ public Join handleStartJoin(StartJoinRequest startJoinRequest) { if (startJoinRequest.getTerm() <= getCurrentTerm()) { - logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]", - startJoinRequest.getTerm(), getCurrentTerm()); + logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]", + startJoinRequest, getCurrentTerm()); throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() + " not greater than current term " + getCurrentTerm()); } - logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm()); + logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest); + + if (joinVotes.isEmpty() == false) { + final String reason; + if (electionWon == false) { + reason = "failed election"; + } else if (startJoinRequest.getSourceNode().equals(localNode)) { + reason = "bumping term"; + } else { + reason = "standing down as leader"; + } + logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason); + } persistedState.setCurrentTerm(startJoinRequest.getTerm()); assert getCurrentTerm() == startJoinRequest.getTerm(); @@ -232,6 +244,7 @@ public class CoordinationState extends AbstractComponent { join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion()); if (electionWon && prevElectionWon == false) { + logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes); lastPublishedVersion = getLastAcceptedVersion(); } return added; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index be766a86141..d04339cb410 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -99,7 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.persistedStateSupplier = persistedStateSupplier; this.lastKnownLeader = Optional.empty(); this.lastJoin = Optional.empty(); - this.joinAccumulator = joinHelper.new CandidateJoinAccumulator(); + this.joinAccumulator = new InitialJoinAccumulator(); this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings); this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index bc01612b899..500f1642329 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -202,7 +202,6 @@ public class JoinHelper extends AbstractComponent { void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback); default void close(Mode newMode) { - } } @@ -220,6 +219,19 @@ public class JoinHelper extends AbstractComponent { } } + static class InitialJoinAccumulator implements JoinAccumulator { + @Override + public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { + assert false : "unexpected join from " + sender + " during initialisation"; + joinCallback.onFailure(new CoordinationStateRejectedException("join target is not initialised yet")); + } + + @Override + public String toString() { + return "InitialJoinAccumulator"; + } + } + static class FollowerJoinAccumulator implements JoinAccumulator { @Override public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { @@ -265,13 +277,14 @@ public class JoinHelper extends AbstractComponent { }); masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); - } else if (newMode == Mode.FOLLOWER) { + } else { + assert newMode == Mode.FOLLOWER : newMode; joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure( new CoordinationStateRejectedException("became follower"))); - } else { - assert newMode == Mode.CANDIDATE; - assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet(); } + + // CandidateJoinAccumulator is only closed when becoming leader or follower, otherwise it accumulates all joins received + // regardless of term. } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index 6d2486da695..0adf483d5aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -170,8 +170,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor processException(handler, rtx)); + threadPool.executor(handler.executor()).execute(new Runnable() { + @Override + public void run() { + processException(handler, rtx); + } + + @Override + public String toString() { + return "delivery of exception response to [" + action + "][" + requestId + "]: " + exception; + } + }); } } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c8588d115fd..0113e8de2c2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -76,6 +76,8 @@ public class CoordinatorTests extends ESTestCase { final ClusterNode leader = cluster.getAnyLeader(); long finalValue = randomLong(); + + logger.info("--> submitting value [{}] to [{}]", finalValue, leader); leader.submitValue(finalValue); cluster.stabilise(); // TODO this should only need a short stabilisation @@ -96,6 +98,7 @@ public class CoordinatorTests extends ESTestCase { final List clusterNodes; final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + // TODO does ThreadPool need a node name any more? Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build()); private final VotingConfiguration initialConfiguration; @@ -155,7 +158,7 @@ public class CoordinatorTests extends ESTestCase { final String nodeId = clusterNode.getId(); assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - assertTrue("leader should have received a vote from " + nodeId, + assertTrue("leader " + leader.getId() + " should have received a vote from " + nodeId, leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); @@ -267,7 +270,7 @@ public class CoordinatorTests extends ESTestCase { } void submitValue(final long value) { - masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() { + onNode(localNode, () -> masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return setValue(currentState, value); @@ -277,7 +280,12 @@ public class CoordinatorTests extends ESTestCase { public void onFailure(String source, Exception e) { logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e); } - }); + })).run(); + } + + @Override + public String toString() { + return localNode.toString(); } }