diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index c6e54455566..dd31a436f75 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -18,18 +18,22 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lease.Releasable; @@ -43,16 +47,19 @@ import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.HandshakingTransportAddressConnector; import org.elasticsearch.discovery.PeerFinder; import org.elasticsearch.discovery.UnicastConfiguredHostsResolver; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.ZenDiscovery.NodeRemovalClusterStateTaskExecutor; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -64,7 +71,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); private final TransportService transportService; + private final MasterService masterService; private final JoinHelper joinHelper; + private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final Supplier persistedStateSupplier; // TODO: the following two fields are package-private as some tests require access to them // These tests can be rewritten to use public methods once Coordinator is more feature-complete @@ -79,6 +88,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final TimeValue publishTimeout; private final PublicationTransportHandler publicationHandler; private final LeaderChecker leaderChecker; + private final FollowersChecker followersChecker; @Nullable private Releasable electionScheduler; @Nullable @@ -98,6 +108,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery UnicastHostsProvider unicastHostsProvider, Random random) { super(settings); this.transportService = transportService; + this.masterService = masterService; this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm); this.persistedStateSupplier = persistedStateSupplier; @@ -112,10 +123,32 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit); this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); - + this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure); + this.nodeRemovalExecutor = getNodeRemovalExecutor(settings, allocationService, logger); masterService.setClusterStateSupplier(this::getStateForMasterService); } + private static NodeRemovalClusterStateTaskExecutor getNodeRemovalExecutor(Settings settings, AllocationService allocationService, + Logger logger) { + // TODO move NodeRemovalClusterStateTaskExecutor out of Zen since it's not Zen-specific + return new NodeRemovalClusterStateTaskExecutor(allocationService, new ElectMasterService(settings) { + + @Override + public boolean hasEnoughMasterNodes(Iterable nodes) { + return true; + } + + @Override + public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, ClusterState newState) { + // ignore + } + + }, + s -> { + throw new AssertionError("not implemented"); + }, logger); + } + private Runnable getOnLeaderFailure() { return new Runnable() { @Override @@ -132,6 +165,32 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery }; } + private void onFollowerFailure(DiscoveryNode discoveryNode) { + synchronized (mutex) { + if (mode == Mode.LEADER) { + masterService.submitStateUpdateTask("node-left", + new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"), + ClusterStateTaskConfig.build(Priority.IMMEDIATE), + nodeRemovalExecutor, + nodeRemovalExecutor); + } + } + } + + private void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) { + synchronized (mutex) { + ensureTermAtLeast(followerCheckRequest.getSender(), followerCheckRequest.getTerm()); + + if (getCurrentTerm() != followerCheckRequest.getTerm()) { + logger.trace("onFollowerCheckRequest: current term is [{}], rejecting {}", getCurrentTerm(), followerCheckRequest); + throw new CoordinationStateRejectedException("onFollowerCheckRequest: current term is [" + + getCurrentTerm() + "], rejecting " + followerCheckRequest); + } + + becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender()); + } + } + private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) { synchronized (mutex) { logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest); @@ -217,7 +276,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery lastJoin = Optional.of(join); peerFinder.setCurrentTerm(getCurrentTerm()); if (mode != Mode.CANDIDATE) { - becomeCandidate("joinLeaderInTerm"); + becomeCandidate("joinLeaderInTerm"); // updates followersChecker + } else { + followersChecker.updateFastResponseState(getCurrentTerm(), mode); } return join; } @@ -259,6 +320,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery leaderCheckScheduler.close(); leaderCheckScheduler = null; } + + followersChecker.clearCurrentNodes(); + followersChecker.updateFastResponseState(getCurrentTerm(), mode); } preVoteCollector.update(getPreVoteResponse(), null); @@ -279,6 +343,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery preVoteCollector.update(getPreVoteResponse(), getLocalNode()); assert leaderCheckScheduler == null : leaderCheckScheduler; + followersChecker.updateFastResponseState(getCurrentTerm(), mode); } void becomeFollower(String method, DiscoveryNode leaderNode) { @@ -306,6 +371,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery } leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode); } + + followersChecker.clearCurrentNodes(); + followersChecker.updateFastResponseState(getCurrentTerm(), mode); } private PreVoteResponse getPreVoteResponse() { @@ -332,6 +400,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery return transportService.getLocalNode(); } + // package-visible for testing + boolean publicationInProgress() { + synchronized (mutex) { + return currentPublication.isPresent(); + } + } + @Override protected void doStart() { CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); @@ -360,24 +435,35 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery @Override protected void doClose() { - } public void invariant() { synchronized (mutex) { final Optional peerFinderLeader = peerFinder.getLeader(); assert peerFinder.getCurrentTerm() == getCurrentTerm(); + assert followersChecker.getFastResponseState().term == getCurrentTerm(); + assert followersChecker.getFastResponseState().mode == getMode(); if (mode == Mode.LEADER) { + final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); + assert coordinationState.get().electionWon(); assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode()); assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator; assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader; assert electionScheduler == null : electionScheduler; assert prevotingRound == null : prevotingRound; - assert getStateForMasterService().nodes().getMasterNodeId() != null - || getStateForMasterService().term() != getCurrentTerm() : - getStateForMasterService(); + assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService(); assert leaderCheckScheduler == null : leaderCheckScheduler; + + final Set knownFollowers = followersChecker.getKnownFollowers(); + final Set lastPublishedNodes = new HashSet<>(); + if (becomingMaster == false || publicationInProgress()) { + final ClusterState lastPublishedState + = currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState()); + lastPublishedState.nodes().forEach(lastPublishedNodes::add); + assert lastPublishedNodes.remove(getLocalNode()); + } + assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers; } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); @@ -388,6 +474,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService(); assert leaderChecker.currentNodeIsMaster() == false; assert leaderCheckScheduler != null; + assert followersChecker.getKnownFollowers().isEmpty(); } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -396,6 +483,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService(); assert leaderChecker.currentNodeIsMaster() == false; assert leaderCheckScheduler == null : leaderCheckScheduler; + assert followersChecker.getKnownFollowers().isEmpty(); } } } @@ -622,7 +710,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery }); leaderChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes()); - publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here + followersChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes()); + publication.start(followersChecker.getFaultyNodes()); } } catch (Exception e) { logger.debug(() -> new ParameterizedMessage("[{}] publishing failed", clusterChangedEvent.source()), e); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index 4300787d396..c2cd05bc34e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -128,6 +128,13 @@ public class FollowersChecker extends AbstractComponent { } } + /** + * Clear the set of known nodes, stopping all checks. + */ + public void clearCurrentNodes() { + setCurrentNodes(DiscoveryNodes.EMPTY_NODES); + } + /** * The system is normally in a state in which every follower remains a follower of a stable leader in a single term for an extended * period of time, and therefore our response to every follower check is the same. We handle this case with a single volatile read @@ -207,6 +214,20 @@ public class FollowersChecker extends AbstractComponent { '}'; } + // For assertions + FastResponseState getFastResponseState() { + return fastResponseState; + } + + // For assertions + Set getKnownFollowers() { + synchronized (mutex) { + final Set knownFollowers = new HashSet<>(faultyNodes); + knownFollowers.addAll(followerCheckers.keySet()); + return knownFollowers; + } + } + static class FastResponseState { final long term; final Mode mode; @@ -251,7 +272,7 @@ public class FollowersChecker extends AbstractComponent { return; } - final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term); + final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode()); logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request); transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request, TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(), @@ -350,23 +371,32 @@ public class FollowersChecker extends AbstractComponent { private final long term; + private final DiscoveryNode sender; + public long getTerm() { return term; } - public FollowerCheckRequest(final long term) { + public DiscoveryNode getSender() { + return sender; + } + + public FollowerCheckRequest(final long term, final DiscoveryNode sender) { this.term = term; + this.sender = sender; } public FollowerCheckRequest(final StreamInput in) throws IOException { super(in); term = in.readLong(); + sender = new DiscoveryNode(in); } @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(term); + sender.writeTo(out); } @Override @@ -374,19 +404,21 @@ public class FollowersChecker extends AbstractComponent { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; FollowerCheckRequest that = (FollowerCheckRequest) o; - return term == that.term; + return term == that.term && + Objects.equals(sender, that.sender); } @Override public String toString() { return "FollowerCheckRequest{" + "term=" + term + + ", sender=" + sender + '}'; } @Override public int hashCode() { - return Objects.hash(term); + return Objects.hash(term, sender); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index a0905c0c729..41a0a7c78df 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -130,6 +131,11 @@ public abstract class Publication extends AbstractComponent { return isCompleted; } + // For assertions + ClusterState publishedState() { + return publishRequest.getAcceptedState(); + } + private void onPossibleCommitFailure() { if (applyCommitRequest.isPresent()) { onPossibleCompletion(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 428d03720cd..5667f68369a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; @@ -62,6 +63,9 @@ import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setV import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; +import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING; +import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING; +import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; @@ -114,10 +118,6 @@ public class CoordinatorTests extends ESTestCase { logger.info("--> disconnecting leader {}", originalLeader); originalLeader.disconnect(); - synchronized (originalLeader.coordinator.mutex) { - originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated - } - cluster.stabilise(); assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } @@ -130,16 +130,45 @@ public class CoordinatorTests extends ESTestCase { logger.info("--> partitioning leader {}", originalLeader); originalLeader.partition(); - synchronized (originalLeader.coordinator.mutex) { - originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated - } - - cluster.stabilise(Cluster.DEFAULT_STABILISATION_TIME - + (LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis()) - * LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY)); + cluster.stabilise( + // first wait for all the followers to notice the leader has gone + (LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis()) + * LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY) + // then wait for the new leader to notice that the old leader is unresponsive + + (FOLLOWER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + FOLLOWER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis()) + * FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY)); assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } + public void testFollowerDisconnectionDetectedQuickly() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower = cluster.getAnyNodeExcept(leader); + logger.info("--> disconnecting follower {}", follower); + follower.disconnect(); + + cluster.stabilise(); + assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId())); + } + + public void testUnresponsiveFollowerDetectedEventually() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower = cluster.getAnyNodeExcept(leader); + logger.info("--> partitioning follower {}", follower); + follower.partition(); + + cluster.stabilise( + // wait for the leader to notice that the follower is unresponsive + (FOLLOWER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + FOLLOWER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis()) + * FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY)); + assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId())); + } + private static String nodeIdFromIndex(int nodeIndex) { return "node" + nodeIndex; } @@ -203,15 +232,27 @@ public class CoordinatorTests extends ESTestCase { } if (deterministicTaskQueue.hasDeferredTasks() == false) { - break; // TODO when fault detection is enabled this should be removed, as there should _always_ be deferred tasks + // A 1-node cluster has no need for fault detection etc so will eventually run out of things to do. + assert clusterNodes.size() == 1 : clusterNodes.size(); + break; } deterministicTaskQueue.advanceTime(); } + for (ClusterNode clusterNode : clusterNodes) { + assert clusterNode.coordinator.publicationInProgress() == false; + } + assertUniqueLeaderAndExpectedModes(); } + private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) { + return n1 == n2 || + (getConnectionStatus(n1.getLocalNode(), n2.getLocalNode()) == ConnectionStatus.CONNECTED + && getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED); + } + private void assertUniqueLeaderAndExpectedModes() { final ClusterNode leader = getAnyLeader(); final long leaderTerm = leader.coordinator.getCurrentTerm(); @@ -245,8 +286,9 @@ public class CoordinatorTests extends ESTestCase { } } + int connectedNodeCount = Math.toIntExact(clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).count()); assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize), - equalTo(Optional.of(clusterNodes.size()))); + equalTo(Optional.of(connectedNodeCount))); } ClusterNode getAnyLeader() { @@ -267,6 +309,14 @@ public class CoordinatorTests extends ESTestCase { return connectionStatus; } + ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { + Set forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet()); + List acceptableNodes + = this.clusterNodes.stream().filter(n -> forbiddenIds.contains(n.getId()) == false).collect(Collectors.toList()); + assert acceptableNodes.isEmpty() == false; + return randomFrom(acceptableNodes); + } + class ClusterNode extends AbstractComponent { private final int nodeIndex; private Coordinator coordinator; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index 759097205a1..856626cc109 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -144,7 +144,7 @@ public class FollowersCheckerTests extends ESTestCase { assertThat(followersChecker.getFaultyNodes(), empty()); checkedNodes.clear(); - followersChecker.setCurrentNodes(discoveryNodesHolder[0] = DiscoveryNodes.EMPTY_NODES); + followersChecker.clearCurrentNodes(); deterministicTaskQueue.runAllTasks(random()); assertThat(checkedNodes, empty()); } @@ -316,12 +316,21 @@ public class FollowersCheckerTests extends ESTestCase { } public void testFollowerCheckRequestEqualsHashCodeSerialization() { - EqualsHashCodeTestUtils.checkEqualsAndHashCode(new FollowerCheckRequest(randomNonNegativeLong()), + EqualsHashCodeTestUtils.checkEqualsAndHashCode(new FollowerCheckRequest(randomNonNegativeLong(), + new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT)), rq -> copyWriteable(rq, writableRegistry(), FollowerCheckRequest::new), - rq -> new FollowerCheckRequest(randomNonNegativeLong())); + rq -> { + if (randomBoolean()) { + return new FollowerCheckRequest(rq.getTerm(), + new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT)); + } else { + return new FollowerCheckRequest(randomNonNegativeLong(), rq.getSender()); + } + }); } public void testResponder() { + final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build(); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); @@ -358,7 +367,7 @@ public class FollowersCheckerTests extends ESTestCase { followersChecker.updateFastResponseState(term, Mode.FOLLOWER); final ExpectsSuccess expectsSuccess = new ExpectsSuccess(); - transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term), expectsSuccess); + transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term, leader), expectsSuccess); deterministicTaskQueue.runAllTasks(random()); assertTrue(expectsSuccess.succeeded()); assertFalse(calledCoordinator.get()); @@ -371,24 +380,24 @@ public class FollowersCheckerTests extends ESTestCase { followersChecker.updateFastResponseState(followerTerm, Mode.FOLLOWER); final AtomicReference receivedException = new AtomicReference<>(); - transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm), + transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm, leader), new TransportResponseHandler() { - @Override - public void handleResponse(TransportResponse.Empty response) { - fail("unexpected success"); - } + @Override + public void handleResponse(TransportResponse.Empty response) { + fail("unexpected success"); + } - @Override - public void handleException(TransportException exp) { - assertThat(exp, not(nullValue())); - assertTrue(receivedException.compareAndSet(null, exp)); - } + @Override + public void handleException(TransportException exp) { + assertThat(exp, not(nullValue())); + assertTrue(receivedException.compareAndSet(null, exp)); + } - @Override - public String executor() { - return Names.SAME; - } - }); + @Override + public String executor() { + return Names.SAME; + } + }); deterministicTaskQueue.runAllTasks(random()); assertFalse(calledCoordinator.get()); assertThat(receivedException.get(), not(nullValue())); @@ -401,7 +410,8 @@ public class FollowersCheckerTests extends ESTestCase { followersChecker.updateFastResponseState(followerTerm, Mode.FOLLOWER); final ExpectsSuccess expectsSuccess = new ExpectsSuccess(); - transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm), expectsSuccess); + transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, + new FollowerCheckRequest(leaderTerm, leader), expectsSuccess); deterministicTaskQueue.runAllTasks(random()); assertTrue(expectsSuccess.succeeded()); assertTrue(calledCoordinator.get()); @@ -414,7 +424,7 @@ public class FollowersCheckerTests extends ESTestCase { followersChecker.updateFastResponseState(term, randomFrom(Mode.LEADER, Mode.CANDIDATE)); final ExpectsSuccess expectsSuccess = new ExpectsSuccess(); - transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term), expectsSuccess); + transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term, leader), expectsSuccess); deterministicTaskQueue.runAllTasks(random()); assertTrue(expectsSuccess.succeeded()); assertTrue(calledCoordinator.get()); @@ -429,7 +439,7 @@ public class FollowersCheckerTests extends ESTestCase { coordinatorException.set(new ElasticsearchException(exceptionMessage)); final AtomicReference receivedException = new AtomicReference<>(); - transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term), + transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term, leader), new TransportResponseHandler() { @Override public void handleResponse(TransportResponse.Empty response) {