Zen2: Turn to follower on follower check when no state accepted yet from new leader (#37003)

Improves on #36449 which did not cover the situation where a node had bumped its term during
the election, and not when receiving the first follower check. This was uncovered while refactoring
NodeJoinTests so that they don't need to access to an internal field of Coordinator anymore (which
can now be made private).
This commit is contained in:
Yannick Welsch 2018-12-28 08:37:04 +01:00 committed by GitHub
parent e16fd4ebd6
commit 935c2e98b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 25 deletions

View File

@ -103,10 +103,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
private final DiscoverySettings discoverySettings;
// TODO: the following two fields are package-private as some tests require access to them
// TODO: the following field is package-private as some tests require access to it
// These tests can be rewritten to use public methods once Coordinator is more feature-complete
final Object mutex = new Object();
final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
private final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier
private final PeerFinder peerFinder;
@ -210,7 +210,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {
synchronized (mutex) {
final long previousTerm = getCurrentTerm();
ensureTermAtLeast(followerCheckRequest.getSender(), followerCheckRequest.getTerm());
if (getCurrentTerm() != followerCheckRequest.getTerm()) {
@ -219,7 +218,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
+ getCurrentTerm() + "], rejecting " + followerCheckRequest);
}
if (previousTerm != getCurrentTerm()) {
// check if node has accepted a state in this term already. If not, this node has never committed a cluster state in this
// term and therefore never removed the NO_MASTER_BLOCK for this term. This logic ensures that we quickly turn a node
// into follower, even before receiving the first cluster state update, but also don't have to deal with the situation
// where we would possibly have to remove the NO_MASTER_BLOCK from the applierState when turning a candidate back to follower.
if (getLastAcceptedState().term() < getCurrentTerm()) {
becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender());
}
}
@ -592,7 +595,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert prevotingRound == null : prevotingRound;
assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService();
assert leaderChecker.leader() == null : leaderChecker.leader();
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert getLocalNode().equals(applierState.nodes().getMasterNode()) ||
(applierState.nodes().getMasterNodeId() == null && applierState.term() < getCurrentTerm());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
assert clusterFormationFailureHelper.isRunning() == false;
@ -620,7 +624,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
coordinationState.get().getLastAcceptedConfiguration().equals(coordinationState.get().getLastCommittedConfiguration())
: coordinationState.get().getLastAcceptedConfiguration() + " != "
+ coordinationState.get().getLastCommittedConfiguration();
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
@ -632,6 +635,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert leaderChecker.currentNodeIsMaster() == false;
assert lastKnownLeader.equals(Optional.of(leaderChecker.leader()));
assert followersChecker.getKnownFollowers().isEmpty();
assert lastKnownLeader.get().equals(applierState.nodes().getMasterNode()) ||
(applierState.nodes().getMasterNodeId() == null &&
(applierState.term() < getCurrentTerm() || applierState.version() < getLastAcceptedState().version()));
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
assert clusterFormationFailureHelper.isRunning() == false;

View File

@ -44,6 +44,7 @@ import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
@ -73,6 +74,7 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@TestLogging("org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE")
public class NodeJoinTests extends ESTestCase {
@ -82,7 +84,7 @@ public class NodeJoinTests extends ESTestCase {
private MasterService masterService;
private Coordinator coordinator;
private DeterministicTaskQueue deterministicTaskQueue;
private RequestHandlerRegistry<TransportRequest> transportRequestHandler;
private Transport transport;
@BeforeClass
public static void beforeClass() {
@ -174,7 +176,7 @@ public class NodeJoinTests extends ESTestCase {
random);
transportService.start();
transportService.acceptIncomingRequests();
transportRequestHandler = capturingTransport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME);
transport = capturingTransport;
coordinator.start();
coordinator.startInitialJoin();
}
@ -219,7 +221,9 @@ public class NodeJoinTests extends ESTestCase {
// clone the node before submitting to simulate an incoming join, which is guaranteed to have a new
// disco node object serialized off the network
try {
transportRequestHandler.processMessageReceived(joinRequest, new TransportChannel() {
final RequestHandlerRegistry<JoinRequest> joinHandler = (RequestHandlerRegistry<JoinRequest>)
transport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME);
joinHandler.processMessageReceived(joinRequest, new TransportChannel() {
@Override
public String getProfileName() {
return "dummy";
@ -353,7 +357,7 @@ public class NodeJoinTests extends ESTestCase {
FutureUtils.get(futNode1);
}
public void testJoinFollowerWithHigherTerm() {
public void testJoinFollowerWithHigherTerm() throws Exception {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
@ -361,18 +365,74 @@ public class NodeJoinTests extends ESTestCase {
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node0.getId()))));
long newTerm = initialTerm + randomLongBetween(1, 10);
coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm));
synchronized (coordinator.mutex) {
coordinator.becomeFollower("test", node1);
}
assertFalse(isLocalNodeElectedMaster());
handleStartJoinFrom(node1, newTerm);
handleFollowerCheckFrom(node1, newTerm);
long newerTerm = newTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node1,
Optional.of(new Join(node1, node0, newerTerm, initialTerm, initialVersion))));
assertTrue(isLocalNodeElectedMaster());
}
public void testJoinFollowerFails() {
private void handleStartJoinFrom(DiscoveryNode node, long term) throws Exception {
final RequestHandlerRegistry<StartJoinRequest> startJoinHandler = (RequestHandlerRegistry<StartJoinRequest>)
transport.getRequestHandler(JoinHelper.START_JOIN_ACTION_NAME);
startJoinHandler.processMessageReceived(new StartJoinRequest(node, term), new TransportChannel() {
@Override
public String getProfileName() {
return "dummy";
}
@Override
public String getChannelType() {
return "dummy";
}
@Override
public void sendResponse(TransportResponse response) {
}
@Override
public void sendResponse(Exception exception) {
fail();
}
});
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(isLocalNodeElectedMaster());
assertThat(coordinator.getMode(), equalTo(Coordinator.Mode.CANDIDATE));
}
private void handleFollowerCheckFrom(DiscoveryNode node, long term) throws Exception {
final RequestHandlerRegistry<FollowersChecker.FollowerCheckRequest> followerCheckHandler =
(RequestHandlerRegistry<FollowersChecker.FollowerCheckRequest>)
transport.getRequestHandler(FollowersChecker.FOLLOWER_CHECK_ACTION_NAME);
followerCheckHandler.processMessageReceived(new FollowersChecker.FollowerCheckRequest(term, node), new TransportChannel() {
@Override
public String getProfileName() {
return "dummy";
}
@Override
public String getChannelType() {
return "dummy";
}
@Override
public void sendResponse(TransportResponse response) {
}
@Override
public void sendResponse(Exception exception) {
fail();
}
});
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(isLocalNodeElectedMaster());
assertThat(coordinator.getMode(), equalTo(Coordinator.Mode.FOLLOWER));
}
public void testJoinFollowerFails() throws Exception {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
@ -380,18 +440,15 @@ public class NodeJoinTests extends ESTestCase {
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
new VotingConfiguration(Collections.singleton(node0.getId()))));
long newTerm = initialTerm + randomLongBetween(1, 10);
coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm));
synchronized (coordinator.mutex) {
coordinator.becomeFollower("test", node1);
}
assertFalse(isLocalNodeElectedMaster());
handleStartJoinFrom(node1, newTerm);
handleFollowerCheckFrom(node1, newTerm);
assertThat(expectThrows(CoordinationStateRejectedException.class,
() -> joinNodeAndRun(new JoinRequest(node1, Optional.empty()))).getMessage(),
containsString("join target is a follower"));
assertFalse(isLocalNodeElectedMaster());
}
public void testBecomeFollowerFailsPendingJoin() {
public void testBecomeFollowerFailsPendingJoin() throws Exception {
DiscoveryNode node0 = newNode(0, true);
DiscoveryNode node1 = newNode(1, true);
long initialTerm = randomLongBetween(1, 10);
@ -403,9 +460,7 @@ public class NodeJoinTests extends ESTestCase {
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(fut.isDone());
assertFalse(isLocalNodeElectedMaster());
synchronized (coordinator.mutex) {
coordinator.becomeFollower("test", node1);
}
handleFollowerCheckFrom(node1, newTerm);
assertFalse(isLocalNodeElectedMaster());
assertThat(expectThrows(CoordinationStateRejectedException.class,
() -> FutureUtils.get(fut)).getMessage(),