Logging improvements in CoordinatorTests (#33991)

Today, we know that CoordinatorTests sometimes fail to stabilise due to an
election collision. This change improves the logging that occurs when an
election collision occurs so it will be easier to see if this is happening when
analysing a test failure. We also wrap the call to
masterService.submitStateUpdateTask() in a context that logs the node on which
it runs.

We also introduce the InitialJoinAccumulator instead of using a placeholder
CandidateJoinAccumulator at startup, which reduces the cases to consider in
CandidateJoinAccumulator.close() and tightens up the assertions we can make
here.
This commit is contained in:
David Turner 2018-09-24 20:07:32 +01:00 committed by GitHub
parent 2e774e146d
commit 02b483c372
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 15 deletions

View File

@ -158,13 +158,25 @@ public class CoordinationState extends AbstractComponent {
*/
public Join handleStartJoin(StartJoinRequest startJoinRequest) {
if (startJoinRequest.getTerm() <= getCurrentTerm()) {
logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]",
startJoinRequest.getTerm(), getCurrentTerm());
logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
startJoinRequest, getCurrentTerm());
throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() +
" not greater than current term " + getCurrentTerm());
}
logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm());
logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest);
if (joinVotes.isEmpty() == false) {
final String reason;
if (electionWon == false) {
reason = "failed election";
} else if (startJoinRequest.getSourceNode().equals(localNode)) {
reason = "bumping term";
} else {
reason = "standing down as leader";
}
logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason);
}
persistedState.setCurrentTerm(startJoinRequest.getTerm());
assert getCurrentTerm() == startJoinRequest.getTerm();
@ -232,6 +244,7 @@ public class CoordinationState extends AbstractComponent {
join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion());
if (electionWon && prevElectionWon == false) {
logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes);
lastPublishedVersion = getLastAcceptedVersion();
}
return added;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -99,7 +100,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.persistedStateSupplier = persistedStateSupplier;
this.lastKnownLeader = Optional.empty();
this.lastJoin = Optional.empty();
this.joinAccumulator = joinHelper.new CandidateJoinAccumulator();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen);

View File

@ -202,7 +202,6 @@ public class JoinHelper extends AbstractComponent {
void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback);
default void close(Mode newMode) {
}
}
@ -220,6 +219,19 @@ public class JoinHelper extends AbstractComponent {
}
}
static class InitialJoinAccumulator implements JoinAccumulator {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
assert false : "unexpected join from " + sender + " during initialisation";
joinCallback.onFailure(new CoordinationStateRejectedException("join target is not initialised yet"));
}
@Override
public String toString() {
return "InitialJoinAccumulator";
}
}
static class FollowerJoinAccumulator implements JoinAccumulator {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
@ -265,13 +277,14 @@ public class JoinHelper extends AbstractComponent {
});
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor);
} else if (newMode == Mode.FOLLOWER) {
} else {
assert newMode == Mode.FOLLOWER : newMode;
joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure(
new CoordinationStateRejectedException("became follower")));
} else {
assert newMode == Mode.CANDIDATE;
assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet();
}
// CandidateJoinAccumulator is only closed when becoming leader or follower, otherwise it accumulates all joins received
// regardless of term.
}
@Override

View File

@ -170,8 +170,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
.blocks(currentState.blocks())
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false,
"removed dead nodes on election"));
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}
@Override

View File

@ -1221,7 +1221,17 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
if (ThreadPool.Names.SAME.equals(executor)) {
processException(handler, rtx);
} else {
threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx));
threadPool.executor(handler.executor()).execute(new Runnable() {
@Override
public void run() {
processException(handler, rtx);
}
@Override
public String toString() {
return "delivery of exception response to [" + action + "][" + requestId + "]: " + exception;
}
});
}
}
}

View File

@ -76,6 +76,8 @@ public class CoordinatorTests extends ESTestCase {
final ClusterNode leader = cluster.getAnyLeader();
long finalValue = randomLong();
logger.info("--> submitting value [{}] to [{}]", finalValue, leader);
leader.submitValue(finalValue);
cluster.stabilise(); // TODO this should only need a short stabilisation
@ -96,6 +98,7 @@ public class CoordinatorTests extends ESTestCase {
final List<ClusterNode> clusterNodes;
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
// TODO does ThreadPool need a node name any more?
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
private final VotingConfiguration initialConfiguration;
@ -155,7 +158,7 @@ public class CoordinatorTests extends ESTestCase {
final String nodeId = clusterNode.getId();
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
assertTrue("leader should have received a vote from " + nodeId,
assertTrue("leader " + leader.getId() + " should have received a vote from " + nodeId,
leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode()));
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
@ -267,7 +270,7 @@ public class CoordinatorTests extends ESTestCase {
}
void submitValue(final long value) {
masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
onNode(localNode, () -> masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return setValue(currentState, value);
@ -277,7 +280,12 @@ public class CoordinatorTests extends ESTestCase {
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
}
});
})).run();
}
@Override
public String toString() {
return localNode.toString();
}
}