Integrate FollowerChecker with Coordinator (#34075)

This change ensures that the leader node periodically checks that its followers
are healthy, and that they are removed from the cluster if not.
This commit is contained in:
David Turner 2018-09-28 12:29:34 +01:00 committed by GitHub
parent d995fc85c6
commit 980cfc69d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 234 additions and 47 deletions

View File

@ -18,18 +18,22 @@
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
@ -43,16 +47,19 @@ import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.UnicastConfiguredHostsResolver;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery.NodeRemovalClusterStateTaskExecutor;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
@ -64,7 +71,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
private final TransportService transportService;
private final MasterService masterService;
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
// TODO: the following two fields are package-private as some tests require access to them
// These tests can be rewritten to use public methods once Coordinator is more feature-complete
@ -79,6 +88,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final TimeValue publishTimeout;
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
private final FollowersChecker followersChecker;
@Nullable
private Releasable electionScheduler;
@Nullable
@ -98,6 +108,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
UnicastHostsProvider unicastHostsProvider, Random random) {
super(settings);
this.transportService = transportService;
this.masterService = masterService;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm);
this.persistedStateSupplier = persistedStateSupplier;
@ -112,10 +123,32 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure);
this.nodeRemovalExecutor = getNodeRemovalExecutor(settings, allocationService, logger);
masterService.setClusterStateSupplier(this::getStateForMasterService);
}
private static NodeRemovalClusterStateTaskExecutor getNodeRemovalExecutor(Settings settings, AllocationService allocationService,
Logger logger) {
// TODO move NodeRemovalClusterStateTaskExecutor out of Zen since it's not Zen-specific
return new NodeRemovalClusterStateTaskExecutor(allocationService, new ElectMasterService(settings) {
@Override
public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
return true;
}
@Override
public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, ClusterState newState) {
// ignore
}
},
s -> {
throw new AssertionError("not implemented");
}, logger);
}
private Runnable getOnLeaderFailure() {
return new Runnable() {
@Override
@ -132,6 +165,32 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
};
}
private void onFollowerFailure(DiscoveryNode discoveryNode) {
synchronized (mutex) {
if (mode == Mode.LEADER) {
masterService.submitStateUpdateTask("node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
nodeRemovalExecutor);
}
}
}
private void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {
synchronized (mutex) {
ensureTermAtLeast(followerCheckRequest.getSender(), followerCheckRequest.getTerm());
if (getCurrentTerm() != followerCheckRequest.getTerm()) {
logger.trace("onFollowerCheckRequest: current term is [{}], rejecting {}", getCurrentTerm(), followerCheckRequest);
throw new CoordinationStateRejectedException("onFollowerCheckRequest: current term is ["
+ getCurrentTerm() + "], rejecting " + followerCheckRequest);
}
becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender());
}
}
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
synchronized (mutex) {
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);
@ -217,7 +276,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
lastJoin = Optional.of(join);
peerFinder.setCurrentTerm(getCurrentTerm());
if (mode != Mode.CANDIDATE) {
becomeCandidate("joinLeaderInTerm");
becomeCandidate("joinLeaderInTerm"); // updates followersChecker
} else {
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
}
return join;
}
@ -259,6 +320,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
leaderCheckScheduler.close();
leaderCheckScheduler = null;
}
followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
}
preVoteCollector.update(getPreVoteResponse(), null);
@ -279,6 +343,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
assert leaderCheckScheduler == null : leaderCheckScheduler;
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
}
void becomeFollower(String method, DiscoveryNode leaderNode) {
@ -306,6 +371,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode);
}
followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
}
private PreVoteResponse getPreVoteResponse() {
@ -332,6 +400,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return transportService.getLocalNode();
}
// package-visible for testing
boolean publicationInProgress() {
synchronized (mutex) {
return currentPublication.isPresent();
}
}
@Override
protected void doStart() {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
@ -360,24 +435,35 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
protected void doClose() {
}
public void invariant() {
synchronized (mutex) {
final Optional<DiscoveryNode> peerFinderLeader = peerFinder.getLeader();
assert peerFinder.getCurrentTerm() == getCurrentTerm();
assert followersChecker.getFastResponseState().term == getCurrentTerm();
assert followersChecker.getFastResponseState().mode == getMode();
if (mode == Mode.LEADER) {
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();
assert coordinationState.get().electionWon();
assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode());
assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert getStateForMasterService().nodes().getMasterNodeId() != null
|| getStateForMasterService().term() != getCurrentTerm() :
getStateForMasterService();
assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
final Set<DiscoveryNode> knownFollowers = followersChecker.getKnownFollowers();
final Set<DiscoveryNode> lastPublishedNodes = new HashSet<>();
if (becomingMaster == false || publicationInProgress()) {
final ClusterState lastPublishedState
= currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState());
lastPublishedState.nodes().forEach(lastPublishedNodes::add);
assert lastPublishedNodes.remove(getLocalNode());
}
assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers;
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
@ -388,6 +474,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler != null;
assert followersChecker.getKnownFollowers().isEmpty();
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
@ -396,6 +483,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert followersChecker.getKnownFollowers().isEmpty();
}
}
}
@ -622,7 +710,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
});
leaderChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes());
publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here
followersChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes());
publication.start(followersChecker.getFaultyNodes());
}
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("[{}] publishing failed", clusterChangedEvent.source()), e);

View File

@ -128,6 +128,13 @@ public class FollowersChecker extends AbstractComponent {
}
}
/**
* Clear the set of known nodes, stopping all checks.
*/
public void clearCurrentNodes() {
setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
}
/**
* The system is normally in a state in which every follower remains a follower of a stable leader in a single term for an extended
* period of time, and therefore our response to every follower check is the same. We handle this case with a single volatile read
@ -207,6 +214,20 @@ public class FollowersChecker extends AbstractComponent {
'}';
}
// For assertions
FastResponseState getFastResponseState() {
return fastResponseState;
}
// For assertions
Set<DiscoveryNode> getKnownFollowers() {
synchronized (mutex) {
final Set<DiscoveryNode> knownFollowers = new HashSet<>(faultyNodes);
knownFollowers.addAll(followerCheckers.keySet());
return knownFollowers;
}
}
static class FastResponseState {
final long term;
final Mode mode;
@ -251,7 +272,7 @@ public class FollowersChecker extends AbstractComponent {
return;
}
final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term);
final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode());
logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request);
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
@ -350,23 +371,32 @@ public class FollowersChecker extends AbstractComponent {
private final long term;
private final DiscoveryNode sender;
public long getTerm() {
return term;
}
public FollowerCheckRequest(final long term) {
public DiscoveryNode getSender() {
return sender;
}
public FollowerCheckRequest(final long term, final DiscoveryNode sender) {
this.term = term;
this.sender = sender;
}
public FollowerCheckRequest(final StreamInput in) throws IOException {
super(in);
term = in.readLong();
sender = new DiscoveryNode(in);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(term);
sender.writeTo(out);
}
@Override
@ -374,19 +404,21 @@ public class FollowersChecker extends AbstractComponent {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowerCheckRequest that = (FollowerCheckRequest) o;
return term == that.term;
return term == that.term &&
Objects.equals(sender, that.sender);
}
@Override
public String toString() {
return "FollowerCheckRequest{" +
"term=" + term +
", sender=" + sender +
'}';
}
@Override
public int hashCode() {
return Objects.hash(term);
return Objects.hash(term, sender);
}
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
@ -130,6 +131,11 @@ public abstract class Publication extends AbstractComponent {
return isCompleted;
}
// For assertions
ClusterState publishedState() {
return publishRequest.getAcceptedState();
}
private void onPossibleCommitFailure() {
if (applyCommitRequest.isPresent()) {
onPossibleCompletion();

View File

@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
@ -62,6 +63,9 @@ import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setV
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
@ -114,10 +118,6 @@ public class CoordinatorTests extends ESTestCase {
logger.info("--> disconnecting leader {}", originalLeader);
originalLeader.disconnect();
synchronized (originalLeader.coordinator.mutex) {
originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated
}
cluster.stabilise();
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}
@ -130,16 +130,45 @@ public class CoordinatorTests extends ESTestCase {
logger.info("--> partitioning leader {}", originalLeader);
originalLeader.partition();
synchronized (originalLeader.coordinator.mutex) {
originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated
}
cluster.stabilise(Cluster.DEFAULT_STABILISATION_TIME
+ (LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY));
cluster.stabilise(
// first wait for all the followers to notice the leader has gone
(LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY)
// then wait for the new leader to notice that the old leader is unresponsive
+ (FOLLOWER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + FOLLOWER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY));
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}
public void testFollowerDisconnectionDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower = cluster.getAnyNodeExcept(leader);
logger.info("--> disconnecting follower {}", follower);
follower.disconnect();
cluster.stabilise();
assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId()));
}
public void testUnresponsiveFollowerDetectedEventually() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower = cluster.getAnyNodeExcept(leader);
logger.info("--> partitioning follower {}", follower);
follower.partition();
cluster.stabilise(
// wait for the leader to notice that the follower is unresponsive
(FOLLOWER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + FOLLOWER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY));
assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId()));
}
private static String nodeIdFromIndex(int nodeIndex) {
return "node" + nodeIndex;
}
@ -203,15 +232,27 @@ public class CoordinatorTests extends ESTestCase {
}
if (deterministicTaskQueue.hasDeferredTasks() == false) {
break; // TODO when fault detection is enabled this should be removed, as there should _always_ be deferred tasks
// A 1-node cluster has no need for fault detection etc so will eventually run out of things to do.
assert clusterNodes.size() == 1 : clusterNodes.size();
break;
}
deterministicTaskQueue.advanceTime();
}
for (ClusterNode clusterNode : clusterNodes) {
assert clusterNode.coordinator.publicationInProgress() == false;
}
assertUniqueLeaderAndExpectedModes();
}
private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) {
return n1 == n2 ||
(getConnectionStatus(n1.getLocalNode(), n2.getLocalNode()) == ConnectionStatus.CONNECTED
&& getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED);
}
private void assertUniqueLeaderAndExpectedModes() {
final ClusterNode leader = getAnyLeader();
final long leaderTerm = leader.coordinator.getCurrentTerm();
@ -245,8 +286,9 @@ public class CoordinatorTests extends ESTestCase {
}
}
int connectedNodeCount = Math.toIntExact(clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).count());
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize),
equalTo(Optional.of(clusterNodes.size())));
equalTo(Optional.of(connectedNodeCount)));
}
ClusterNode getAnyLeader() {
@ -267,6 +309,14 @@ public class CoordinatorTests extends ESTestCase {
return connectionStatus;
}
ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) {
Set<String> forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet());
List<ClusterNode> acceptableNodes
= this.clusterNodes.stream().filter(n -> forbiddenIds.contains(n.getId()) == false).collect(Collectors.toList());
assert acceptableNodes.isEmpty() == false;
return randomFrom(acceptableNodes);
}
class ClusterNode extends AbstractComponent {
private final int nodeIndex;
private Coordinator coordinator;

View File

@ -144,7 +144,7 @@ public class FollowersCheckerTests extends ESTestCase {
assertThat(followersChecker.getFaultyNodes(), empty());
checkedNodes.clear();
followersChecker.setCurrentNodes(discoveryNodesHolder[0] = DiscoveryNodes.EMPTY_NODES);
followersChecker.clearCurrentNodes();
deterministicTaskQueue.runAllTasks(random());
assertThat(checkedNodes, empty());
}
@ -316,12 +316,21 @@ public class FollowersCheckerTests extends ESTestCase {
}
public void testFollowerCheckRequestEqualsHashCodeSerialization() {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(new FollowerCheckRequest(randomNonNegativeLong()),
EqualsHashCodeTestUtils.checkEqualsAndHashCode(new FollowerCheckRequest(randomNonNegativeLong(),
new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT)),
rq -> copyWriteable(rq, writableRegistry(), FollowerCheckRequest::new),
rq -> new FollowerCheckRequest(randomNonNegativeLong()));
rq -> {
if (randomBoolean()) {
return new FollowerCheckRequest(rq.getTerm(),
new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT));
} else {
return new FollowerCheckRequest(randomNonNegativeLong(), rq.getSender());
}
});
}
public void testResponder() {
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
@ -358,7 +367,7 @@ public class FollowersCheckerTests extends ESTestCase {
followersChecker.updateFastResponseState(term, Mode.FOLLOWER);
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term), expectsSuccess);
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term, leader), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
assertTrue(expectsSuccess.succeeded());
assertFalse(calledCoordinator.get());
@ -371,24 +380,24 @@ public class FollowersCheckerTests extends ESTestCase {
followersChecker.updateFastResponseState(followerTerm, Mode.FOLLOWER);
final AtomicReference<TransportException> receivedException = new AtomicReference<>();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm),
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm, leader),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public void handleResponse(TransportResponse.Empty response) {
fail("unexpected success");
}
@Override
public void handleResponse(TransportResponse.Empty response) {
fail("unexpected success");
}
@Override
public void handleException(TransportException exp) {
assertThat(exp, not(nullValue()));
assertTrue(receivedException.compareAndSet(null, exp));
}
@Override
public void handleException(TransportException exp) {
assertThat(exp, not(nullValue()));
assertTrue(receivedException.compareAndSet(null, exp));
}
@Override
public String executor() {
return Names.SAME;
}
});
@Override
public String executor() {
return Names.SAME;
}
});
deterministicTaskQueue.runAllTasks(random());
assertFalse(calledCoordinator.get());
assertThat(receivedException.get(), not(nullValue()));
@ -401,7 +410,8 @@ public class FollowersCheckerTests extends ESTestCase {
followersChecker.updateFastResponseState(followerTerm, Mode.FOLLOWER);
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm), expectsSuccess);
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME,
new FollowerCheckRequest(leaderTerm, leader), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
assertTrue(expectsSuccess.succeeded());
assertTrue(calledCoordinator.get());
@ -414,7 +424,7 @@ public class FollowersCheckerTests extends ESTestCase {
followersChecker.updateFastResponseState(term, randomFrom(Mode.LEADER, Mode.CANDIDATE));
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term), expectsSuccess);
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term, leader), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
assertTrue(expectsSuccess.succeeded());
assertTrue(calledCoordinator.get());
@ -429,7 +439,7 @@ public class FollowersCheckerTests extends ESTestCase {
coordinatorException.set(new ElasticsearchException(exceptionMessage));
final AtomicReference<TransportException> receivedException = new AtomicReference<>();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term),
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term, leader),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public void handleResponse(TransportResponse.Empty response) {