diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 01b2fae9977..36871799edd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -103,10 +103,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final Supplier persistedStateSupplier; private final DiscoverySettings discoverySettings; - // TODO: the following two fields are package-private as some tests require access to them + // TODO: the following field is package-private as some tests require access to it // These tests can be rewritten to use public methods once Coordinator is more feature-complete final Object mutex = new Object(); - final SetOnce coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) + private final SetOnce coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier private final PeerFinder peerFinder; @@ -210,7 +210,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) { synchronized (mutex) { - final long previousTerm = getCurrentTerm(); ensureTermAtLeast(followerCheckRequest.getSender(), followerCheckRequest.getTerm()); if (getCurrentTerm() != followerCheckRequest.getTerm()) { @@ -219,7 +218,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery + getCurrentTerm() + "], rejecting " + followerCheckRequest); } - if (previousTerm != getCurrentTerm()) { + // check if node has accepted a state in this term already. If not, this node has never committed a cluster state in this + // term and therefore never removed the NO_MASTER_BLOCK for this term. This logic ensures that we quickly turn a node + // into follower, even before receiving the first cluster state update, but also don't have to deal with the situation + // where we would possibly have to remove the NO_MASTER_BLOCK from the applierState when turning a candidate back to follower. + if (getLastAcceptedState().term() < getCurrentTerm()) { becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender()); } } @@ -592,7 +595,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert prevotingRound == null : prevotingRound; assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService(); assert leaderChecker.leader() == null : leaderChecker.leader(); - assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); + assert getLocalNode().equals(applierState.nodes().getMasterNode()) || + (applierState.nodes().getMasterNodeId() == null && applierState.term() < getCurrentTerm()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; assert clusterFormationFailureHelper.isRunning() == false; @@ -620,7 +624,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery coordinationState.get().getLastAcceptedConfiguration().equals(coordinationState.get().getLastCommittedConfiguration()) : coordinationState.get().getLastAcceptedConfiguration() + " != " + coordinationState.get().getLastCommittedConfiguration(); - } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); @@ -632,6 +635,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert leaderChecker.currentNodeIsMaster() == false; assert lastKnownLeader.equals(Optional.of(leaderChecker.leader())); assert followersChecker.getKnownFollowers().isEmpty(); + assert lastKnownLeader.get().equals(applierState.nodes().getMasterNode()) || + (applierState.nodes().getMasterNodeId() == null && + (applierState.term() < getCurrentTerm() || applierState.version() < getLastAcceptedState().version())); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; assert clusterFormationFailureHelper.isRunning() == false; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index ea45eb42d89..214174d2a52 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RequestHandlerRegistry; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; @@ -73,6 +74,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; @TestLogging("org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE") public class NodeJoinTests extends ESTestCase { @@ -82,7 +84,7 @@ public class NodeJoinTests extends ESTestCase { private MasterService masterService; private Coordinator coordinator; private DeterministicTaskQueue deterministicTaskQueue; - private RequestHandlerRegistry transportRequestHandler; + private Transport transport; @BeforeClass public static void beforeClass() { @@ -174,7 +176,7 @@ public class NodeJoinTests extends ESTestCase { random); transportService.start(); transportService.acceptIncomingRequests(); - transportRequestHandler = capturingTransport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME); + transport = capturingTransport; coordinator.start(); coordinator.startInitialJoin(); } @@ -219,7 +221,9 @@ public class NodeJoinTests extends ESTestCase { // clone the node before submitting to simulate an incoming join, which is guaranteed to have a new // disco node object serialized off the network try { - transportRequestHandler.processMessageReceived(joinRequest, new TransportChannel() { + final RequestHandlerRegistry joinHandler = (RequestHandlerRegistry) + transport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME); + joinHandler.processMessageReceived(joinRequest, new TransportChannel() { @Override public String getProfileName() { return "dummy"; @@ -353,7 +357,7 @@ public class NodeJoinTests extends ESTestCase { FutureUtils.get(futNode1); } - public void testJoinFollowerWithHigherTerm() { + public void testJoinFollowerWithHigherTerm() throws Exception { DiscoveryNode node0 = newNode(0, true); DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); @@ -361,18 +365,74 @@ public class NodeJoinTests extends ESTestCase { setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node0.getId())))); long newTerm = initialTerm + randomLongBetween(1, 10); - coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm)); - synchronized (coordinator.mutex) { - coordinator.becomeFollower("test", node1); - } - assertFalse(isLocalNodeElectedMaster()); + handleStartJoinFrom(node1, newTerm); + handleFollowerCheckFrom(node1, newTerm); long newerTerm = newTerm + randomLongBetween(1, 10); joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newerTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); } - public void testJoinFollowerFails() { + private void handleStartJoinFrom(DiscoveryNode node, long term) throws Exception { + final RequestHandlerRegistry startJoinHandler = (RequestHandlerRegistry) + transport.getRequestHandler(JoinHelper.START_JOIN_ACTION_NAME); + startJoinHandler.processMessageReceived(new StartJoinRequest(node, term), new TransportChannel() { + @Override + public String getProfileName() { + return "dummy"; + } + + @Override + public String getChannelType() { + return "dummy"; + } + + @Override + public void sendResponse(TransportResponse response) { + + } + + @Override + public void sendResponse(Exception exception) { + fail(); + } + }); + deterministicTaskQueue.runAllRunnableTasks(); + assertFalse(isLocalNodeElectedMaster()); + assertThat(coordinator.getMode(), equalTo(Coordinator.Mode.CANDIDATE)); + } + + private void handleFollowerCheckFrom(DiscoveryNode node, long term) throws Exception { + final RequestHandlerRegistry followerCheckHandler = + (RequestHandlerRegistry) + transport.getRequestHandler(FollowersChecker.FOLLOWER_CHECK_ACTION_NAME); + followerCheckHandler.processMessageReceived(new FollowersChecker.FollowerCheckRequest(term, node), new TransportChannel() { + @Override + public String getProfileName() { + return "dummy"; + } + + @Override + public String getChannelType() { + return "dummy"; + } + + @Override + public void sendResponse(TransportResponse response) { + + } + + @Override + public void sendResponse(Exception exception) { + fail(); + } + }); + deterministicTaskQueue.runAllRunnableTasks(); + assertFalse(isLocalNodeElectedMaster()); + assertThat(coordinator.getMode(), equalTo(Coordinator.Mode.FOLLOWER)); + } + + public void testJoinFollowerFails() throws Exception { DiscoveryNode node0 = newNode(0, true); DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); @@ -380,18 +440,15 @@ public class NodeJoinTests extends ESTestCase { setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node0.getId())))); long newTerm = initialTerm + randomLongBetween(1, 10); - coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm)); - synchronized (coordinator.mutex) { - coordinator.becomeFollower("test", node1); - } - assertFalse(isLocalNodeElectedMaster()); + handleStartJoinFrom(node1, newTerm); + handleFollowerCheckFrom(node1, newTerm); assertThat(expectThrows(CoordinationStateRejectedException.class, () -> joinNodeAndRun(new JoinRequest(node1, Optional.empty()))).getMessage(), containsString("join target is a follower")); assertFalse(isLocalNodeElectedMaster()); } - public void testBecomeFollowerFailsPendingJoin() { + public void testBecomeFollowerFailsPendingJoin() throws Exception { DiscoveryNode node0 = newNode(0, true); DiscoveryNode node1 = newNode(1, true); long initialTerm = randomLongBetween(1, 10); @@ -403,9 +460,7 @@ public class NodeJoinTests extends ESTestCase { deterministicTaskQueue.runAllRunnableTasks(); assertFalse(fut.isDone()); assertFalse(isLocalNodeElectedMaster()); - synchronized (coordinator.mutex) { - coordinator.becomeFollower("test", node1); - } + handleFollowerCheckFrom(node1, newTerm); assertFalse(isLocalNodeElectedMaster()); assertThat(expectThrows(CoordinationStateRejectedException.class, () -> FutureUtils.get(fut)).getMessage(),