From 6d6ac74a08fbe0535679bbd3d349bf20310ccf96 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 22 Oct 2018 17:20:12 +0200 Subject: [PATCH] Zen2: Fail fast on disconnects (#34503) Integrates the failure detectors with the Connection lifecycle, to fail nodes as soon as: - a leader detects one of his followers disconnecting. - a follower detects its leader disconnecting. --- .../cluster/coordination/Coordinator.java | 27 ++--- .../coordination/FollowersChecker.java | 66 +++++++++---- .../cluster/coordination/LeaderChecker.java | 56 +++++++++-- .../coordination/CoordinatorTests.java | 98 ++++++++++++++----- .../coordination/FollowersCheckerTests.java | 71 ++++++++++++-- .../coordination/LeaderCheckerTests.java | 35 ++++++- .../test/transport/MockTransport.java | 17 +--- 7 files changed, 271 insertions(+), 99 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index c1dd4a1d375..692641422b1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -107,8 +107,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Releasable electionScheduler; @Nullable private Releasable prevotingRound; - @Nullable - private Releasable leaderCheckScheduler; private long maxTermSeen; private final Reconfigurator reconfigurator; @@ -140,7 +138,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.publicationHandler = new PublicationTransportHandler(settings, transportService, this::handlePublishRequest, this::handleApplyCommit); this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); - this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure); + this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); @@ -163,11 +161,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery }; } - private void onFollowerFailure(DiscoveryNode discoveryNode) { + private void removeNode(DiscoveryNode discoveryNode, String reason) { synchronized (mutex) { if (mode == Mode.LEADER) { masterService.submitStateUpdateTask("node-left", - new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"), + new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason), ClusterStateTaskConfig.build(Priority.IMMEDIATE), nodeRemovalExecutor, nodeRemovalExecutor); @@ -358,11 +356,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes()); leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); - - if (leaderCheckScheduler != null) { - leaderCheckScheduler.close(); - leaderCheckScheduler = null; - } + leaderChecker.updateLeader(null); followersChecker.clearCurrentNodes(); followersChecker.updateFastResponseState(getCurrentTerm(), mode); @@ -391,7 +385,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery closePrevotingAndElectionScheduler(); preVoteCollector.update(getPreVoteResponse(), getLocalNode()); - assert leaderCheckScheduler == null : leaderCheckScheduler; + assert leaderChecker.leader() == null : leaderChecker.leader(); followersChecker.updateFastResponseState(getCurrentTerm(), mode); } @@ -415,10 +409,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery preVoteCollector.update(getPreVoteResponse(), leaderNode); if (restartLeaderChecker) { - if (leaderCheckScheduler != null) { - leaderCheckScheduler.close(); - } - leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode); + leaderChecker.updateLeader(leaderNode); } followersChecker.clearCurrentNodes(); @@ -515,7 +506,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert electionScheduler == null : electionScheduler; assert prevotingRound == null : prevotingRound; assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService(); - assert leaderCheckScheduler == null : leaderCheckScheduler; + assert leaderChecker.leader() == null : leaderChecker.leader(); assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; @@ -553,7 +544,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert prevotingRound == null : prevotingRound; assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService(); assert leaderChecker.currentNodeIsMaster() == false; - assert leaderCheckScheduler != null; + assert lastKnownLeader.equals(Optional.of(leaderChecker.leader())); assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; @@ -564,7 +555,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert prevotingRound == null || electionScheduler != null; assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService(); assert leaderChecker.currentNodeIsMaster() == false; - assert leaderCheckScheduler == null : leaderCheckScheduler; + assert leaderChecker.leader() == null : leaderChecker.leader(); assert followersChecker.getKnownFollowers().isEmpty(); assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index c2cd05bc34e..68dcecede7e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -46,6 +47,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; @@ -78,7 +80,7 @@ public class FollowersChecker extends AbstractComponent { private final TimeValue followerCheckInterval; private final TimeValue followerCheckTimeout; private final int followerCheckRetryCount; - private final Consumer onNodeFailure; + private final BiConsumer onNodeFailure; private final Consumer handleRequestAndUpdateState; private final Object mutex = new Object(); // protects writes to this state; read access does not need sync @@ -91,7 +93,7 @@ public class FollowersChecker extends AbstractComponent { public FollowersChecker(Settings settings, TransportService transportService, Consumer handleRequestAndUpdateState, - Consumer onNodeFailure) { + BiConsumer onNodeFailure) { super(settings); this.transportService = transportService; this.handleRequestAndUpdateState = handleRequestAndUpdateState; @@ -104,6 +106,12 @@ public class FollowersChecker extends AbstractComponent { updateFastResponseState(0, Mode.CANDIDATE); transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new, (request, transportChannel, task) -> handleFollowerCheck(request, transportChannel)); + transportService.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + handleDisconnectedNode(node); + } + }); } /** @@ -228,6 +236,15 @@ public class FollowersChecker extends AbstractComponent { } } + private void handleDisconnectedNode(DiscoveryNode discoveryNode) { + synchronized (mutex) { + FollowerChecker followerChecker = followerCheckers.get(discoveryNode); + if (followerChecker != null && followerChecker.running()) { + followerChecker.failNode("disconnected"); + } + } + } + static class FastResponseState { final long term; final Mode mode; @@ -303,36 +320,21 @@ public class FollowersChecker extends AbstractComponent { failureCountSinceLastSuccess++; + final String reason; if (failureCountSinceLastSuccess >= followerCheckRetryCount) { logger.debug(() -> new ParameterizedMessage("{} failed too many times", FollowerChecker.this), exp); + reason = "followers check retry count exceeded"; } else if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp); + reason = "disconnected"; } else { logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp); scheduleNextWakeUp(); return; } - transportService.getThreadPool().generic().execute(new Runnable() { - @Override - public void run() { - synchronized (mutex) { - if (running() == false) { - logger.debug("{} no longer running, not marking faulty", FollowerChecker.this); - return; - } - faultyNodes.add(discoveryNode); - followerCheckers.remove(discoveryNode); - } - onNodeFailure.accept(discoveryNode); - } - - @Override - public String toString() { - return "detected failure of " + discoveryNode; - } - }); + failNode(reason); } @@ -343,6 +345,28 @@ public class FollowersChecker extends AbstractComponent { }); } + void failNode(String reason) { + transportService.getThreadPool().generic().execute(new Runnable() { + @Override + public void run() { + synchronized (mutex) { + if (running() == false) { + logger.debug("{} condition no longer applies, not marking faulty", discoveryNode); + return; + } + faultyNodes.add(discoveryNode); + followerCheckers.remove(discoveryNode); + } + onNodeFailure.accept(discoveryNode, reason); + } + + @Override + public String toString() { + return "detected failure of " + discoveryNode; + } + }); + } + private void scheduleNextWakeUp() { transportService.getThreadPool().schedule(followerCheckInterval, Names.SAME, new Runnable() { @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index fca23018a4f..2ed9e5de918 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -33,6 +34,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -46,6 +48,7 @@ import java.io.IOException; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are @@ -77,6 +80,8 @@ public class LeaderChecker extends AbstractComponent { private final TransportService transportService; private final Runnable onLeaderFailure; + private AtomicReference currentChecker = new AtomicReference<>(); + private volatile DiscoveryNodes discoveryNodes; public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) { @@ -88,19 +93,39 @@ public class LeaderChecker extends AbstractComponent { this.onLeaderFailure = onLeaderFailure; transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck); + transportService.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + handleDisconnectedNode(node); + } + }); + } + + public DiscoveryNode leader() { + CheckScheduler checkScheduler = currentChecker.get(); + return checkScheduler == null ? null : checkScheduler.leader; } /** - * Start a leader checker for the given leader. Should only be called after successfully joining this leader. + * Starts and / or stops a leader checker for the given leader. Should only be called after successfully joining this leader. * - * @param leader the node to be checked as leader - * @return a `Releasable` that can be used to stop this checker. + * @param leader the node to be checked as leader, or null if checks should be disabled */ - public Releasable startLeaderChecker(final DiscoveryNode leader) { + public void updateLeader(@Nullable final DiscoveryNode leader) { assert transportService.getLocalNode().equals(leader) == false; - CheckScheduler checkScheduler = new CheckScheduler(leader); - checkScheduler.handleWakeUp(); - return checkScheduler; + final CheckScheduler checkScheduler; + if (leader != null) { + checkScheduler = new CheckScheduler(leader); + } else { + checkScheduler = null; + } + CheckScheduler previousChecker = currentChecker.getAndSet(checkScheduler); + if (previousChecker != null) { + previousChecker.close(); + } + if (checkScheduler != null) { + checkScheduler.handleWakeUp(); + } } /** @@ -137,6 +162,15 @@ public class LeaderChecker extends AbstractComponent { } } + private void handleDisconnectedNode(DiscoveryNode discoveryNode) { + CheckScheduler checkScheduler = currentChecker.get(); + if (checkScheduler != null) { + checkScheduler.handleDisconnectedNode(discoveryNode); + } else { + logger.trace("disconnect event ignored for {}, no check scheduler", discoveryNode); + } + } + private class CheckScheduler implements Releasable { private final AtomicBoolean isClosed = new AtomicBoolean(); @@ -222,7 +256,7 @@ public class LeaderChecker extends AbstractComponent { }); } - private void leaderFailed() { + void leaderFailed() { if (isClosed.compareAndSet(false, true)) { transportService.getThreadPool().generic().execute(onLeaderFailure); } else { @@ -230,6 +264,12 @@ public class LeaderChecker extends AbstractComponent { } } + void handleDisconnectedNode(DiscoveryNode discoveryNode) { + if (discoveryNode.equals(leader)) { + leaderFailed(); + } + } + private void scheduleNextWakeUp() { logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval); transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 4cf258dbc71..84ba9d06e36 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -387,7 +387,29 @@ public class CoordinatorTests extends ESTestCase { cluster.stabilise(); // would not work if disconnect1 were removed from the configuration } - public void testLeaderDisconnectionDetectedQuickly() { + public void testLeaderDisconnectionWithDisconnectEventDetectedQuickly() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode originalLeader = cluster.getAnyLeader(); + logger.info("--> disconnecting leader {}", originalLeader); + originalLeader.disconnect(); + logger.info("--> followers get disconnect event for leader {} ", originalLeader); + cluster.getAllNodesExcept(originalLeader).forEach(cn -> cn.onDisconnectEventFrom(originalLeader)); + // turn leader into candidate, which stabilisation asserts at the end + cluster.getAllNodesExcept(originalLeader).forEach(cn -> originalLeader.onDisconnectEventFrom(cn)); + cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + // then wait for a new election + + DEFAULT_ELECTION_DELAY + // wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); + } + + public void testLeaderDisconnectionWithoutDisconnectEventDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); cluster.stabilise(); @@ -398,7 +420,6 @@ public class CoordinatorTests extends ESTestCase { cluster.stabilise(Math.max( // Each follower may have just sent a leader check, which receives no response - // TODO not necessary if notified of disconnection defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) // then wait for the follower to check the leader + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) @@ -408,7 +429,6 @@ public class CoordinatorTests extends ESTestCase { + DEFAULT_ELECTION_DELAY, // ALSO the leader may have just sent a follower check, which receives no response - // TODO unnecessary if notified of disconnection defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) // wait for the leader to check its followers + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) @@ -478,10 +498,27 @@ public class CoordinatorTests extends ESTestCase { final ClusterNode follower = cluster.getAnyNodeExcept(leader); logger.info("--> disconnecting follower {}", follower); follower.disconnect(); + logger.info("--> leader {} and follower {} get disconnect event", leader, follower); + leader.onDisconnectEventFrom(follower); + follower.onDisconnectEventFrom(leader); // to turn follower into candidate, which stabilisation asserts at the end + cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId())); + } + public void testFollowerDisconnectionWithoutDisconnectEventDetectedQuickly() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower = cluster.getAnyNodeExcept(leader); + logger.info("--> disconnecting follower {}", follower); + follower.disconnect(); cluster.stabilise(Math.max( // the leader may have just sent a follower check, which receives no response - // TODO unnecessary if notified of disconnection defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) // wait for the leader to check the follower + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) @@ -493,7 +530,6 @@ public class CoordinatorTests extends ESTestCase { + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, // ALSO the follower may have just sent a leader check, which receives no response - // TODO not necessary if notified of disconnection defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) // then wait for the follower to check the leader + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) @@ -627,25 +663,26 @@ public class CoordinatorTests extends ESTestCase { } public void testAckListenerReceivesNacksIfLeaderStandsDown() { - // TODO: needs support for handling disconnects -// final Cluster cluster = new Cluster(3); -// cluster.runRandomly(); -// cluster.stabilise(); -// final ClusterNode leader = cluster.getAnyLeader(); -// final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); -// final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); -// -// leader.partition(); -// follower0.coordinator.handleDisconnectedNode(leader.localNode); -// follower1.coordinator.handleDisconnectedNode(leader.localNode); -// cluster.runUntil(cluster.getCurrentTimeMillis() + cluster.DEFAULT_ELECTION_TIME); -// AckCollector ackCollector = leader.submitRandomValue(); -// cluster.runUntil(cluster.currentTimeMillis + Cluster.DEFAULT_DELAY_VARIABILITY); -// leader.connectionStatus = ConnectionStatus.CONNECTED; -// cluster.stabilise(cluster.DEFAULT_STABILISATION_TIME, 0L); -// assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); -// assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); -// assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1)); + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); + final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + + leader.blackhole(); + follower0.onDisconnectEventFrom(leader); + follower1.onDisconnectEventFrom(leader); + // let followers elect a leader among themselves before healing the leader and running the publication + cluster.runFor(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled + + DEFAULT_ELECTION_DELAY, "elect new leader"); + // cluster has two nodes in mode LEADER, in different terms ofc, and the one in the lower term won’t be able to publish anything + leader.heal(); + AckCollector ackCollector = leader.submitValue(randomLong()); + cluster.stabilise(); // TODO: check if can find a better bound here + assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); + assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); + assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1)); } public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { @@ -1124,11 +1161,16 @@ public class CoordinatorTests extends ESTestCase { } ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { + List filteredNodes = getAllNodesExcept(clusterNodes); + assert filteredNodes.isEmpty() == false; + return randomFrom(filteredNodes); + } + + List getAllNodesExcept(ClusterNode... clusterNodes) { Set forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet()); List acceptableNodes = this.clusterNodes.stream().filter(n -> forbiddenIds.contains(n.getId()) == false).collect(Collectors.toList()); - assert acceptableNodes.isEmpty() == false; - return randomFrom(acceptableNodes); + return acceptableNodes; } ClusterNode getAnyNodePreferringLeaders() { @@ -1367,6 +1409,10 @@ public class CoordinatorTests extends ESTestCase { return blackholed; } + void onDisconnectEventFrom(ClusterNode clusterNode) { + transportService.disconnectFromNode(clusterNode.localNode); + } + ClusterState getLastAppliedClusterState() { return clusterApplier.lastAppliedClusterState; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index 452ff3cdf8e..08bd6be3816 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -51,6 +52,7 @@ import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_C import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; @@ -93,7 +95,7 @@ public class FollowersCheckerTests extends ESTestCase { final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> { assert false : fcr; - }, node -> { + }, (node, reason) -> { assert false : node; }); @@ -163,6 +165,7 @@ public class FollowersCheckerTests extends ESTestCase { final Settings settings = settingsBuilder.build(); testBehaviourOfFailingNode(settings, () -> null, + "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis() + FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis()); } @@ -180,6 +183,7 @@ public class FollowersCheckerTests extends ESTestCase { testBehaviourOfFailingNode(settings, () -> { throw new ElasticsearchException("simulated exception"); }, + "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()); } @@ -211,17 +215,71 @@ public class FollowersCheckerTests extends ESTestCase { throw new ElasticsearchException("simulated exception"); } }, + "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()); } - public void testFailsNodeThatDisconnects() { + public void testFailsNodeThatIsDisconnected() { testBehaviourOfFailingNode(Settings.EMPTY, () -> { throw new ConnectTransportException(null, "simulated exception"); - }, 0); + }, "disconnected", 0); } - private void testBehaviourOfFailingNode(Settings testSettings, Supplier responder, long expectedFailureTime) { + public void testFailsNodeThatDisconnects() { + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + + final MockTransport mockTransport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + assertFalse(node.equals(localNode)); + if (action.equals(HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + return; + } + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + handleResponse(requestId, Empty.INSTANCE); + } + + @Override + public String toString() { + return "sending response to [" + action + "][" + requestId + "] from " + node; + } + }); + } + }; + + final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final AtomicBoolean nodeFailed = new AtomicBoolean(); + + final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> { + assert false : fcr; + }, (node, reason) -> { + assertTrue(nodeFailed.compareAndSet(false, true)); + assertThat(reason, equalTo("disconnected")); + }); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); + followersChecker.setCurrentNodes(discoveryNodes); + + transportService.connectToNode(otherNode); + transportService.disconnectFromNode(otherNode); + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(nodeFailed.get()); + assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); + } + + private void testBehaviourOfFailingNode(Settings testSettings, Supplier responder, String failureReason, + long expectedFailureTime) { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build(); @@ -266,8 +324,9 @@ public class FollowersCheckerTests extends ESTestCase { final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> { assert false : fcr; - }, node -> { + }, (node, reason) -> { assertTrue(nodeFailed.compareAndSet(false, true)); + assertThat(reason, equalTo(failureReason)); }); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -357,7 +416,7 @@ public class FollowersCheckerTests extends ESTestCase { if (exception != null) { throw exception; } - }, node -> { + }, (node, reason) -> { assert false : node; }); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index 60702c9eab1..afe34e5c161 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -21,10 +21,10 @@ package org.elasticsearch.cluster.coordination; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.coordination.LeaderChecker.LeaderCheckRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; @@ -47,6 +47,7 @@ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -145,7 +146,8 @@ public class LeaderCheckerTests extends ESTestCase { () -> assertTrue(leaderFailed.compareAndSet(false, true))); logger.info("--> creating first checker"); - try (Releasable ignored = leaderChecker.startLeaderChecker(leader1)) { + leaderChecker.updateLeader(leader1); + { final long maxCheckCount = randomLongBetween(2, 1000); logger.info("--> checking that no failure is detected in {} checks", maxCheckCount); while (checkCount.get() < maxCheckCount) { @@ -153,13 +155,15 @@ public class LeaderCheckerTests extends ESTestCase { deterministicTaskQueue.advanceTime(); } } + leaderChecker.updateLeader(null); logger.info("--> running remaining tasks"); deterministicTaskQueue.runAllTasks(); assertFalse(leaderFailed.get()); logger.info("--> creating second checker"); - try (Releasable ignored = leaderChecker.startLeaderChecker(leader2)) { + leaderChecker.updateLeader(leader2); + { checkCount.set(0); final long maxCheckCount = randomLongBetween(2, 1000); logger.info("--> checking again that no failure is detected in {} checks", maxCheckCount); @@ -184,6 +188,7 @@ public class LeaderCheckerTests extends ESTestCase { + leaderCheckTimeoutMillis // needed because a successful check response might be in flight at the time of failure )); } + leaderChecker.updateLeader(null); } enum Response { @@ -201,6 +206,10 @@ public class LeaderCheckerTests extends ESTestCase { final MockTransport mockTransport = new MockTransport() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + if (action.equals(HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + return; + } assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); assertTrue(node.equals(leader)); final Response response = responseHolder[0]; @@ -237,7 +246,8 @@ public class LeaderCheckerTests extends ESTestCase { final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> assertTrue(leaderFailed.compareAndSet(false, true))); - try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) { + leaderChecker.updateLeader(leader); + { while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.advanceTime(); @@ -253,12 +263,14 @@ public class LeaderCheckerTests extends ESTestCase { assertTrue(leaderFailed.get()); } + leaderChecker.updateLeader(null); deterministicTaskQueue.runAllTasks(); leaderFailed.set(false); responseHolder[0] = Response.SUCCESS; - try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) { + leaderChecker.updateLeader(leader); + { while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.advanceTime(); @@ -274,6 +286,19 @@ public class LeaderCheckerTests extends ESTestCase { assertTrue(leaderFailed.get()); } + + deterministicTaskQueue.runAllTasks(); + leaderFailed.set(false); + responseHolder[0] = Response.SUCCESS; + + leaderChecker.updateLeader(leader); + { + transportService.connectToNode(leader); // need to connect first for disconnect to have any effect + + transportService.disconnectFromNode(leader); + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(leaderFailed.get()); + } } public void testLeaderBehaviour() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index d642798d688..184c8294d58 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.transport; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; @@ -34,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.CloseableConnection; import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RemoteTransportException; @@ -158,7 +158,7 @@ public class MockTransport implements Transport, LifecycleComponent { @Override public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) { - return new Connection() { + return new CloseableConnection() { @Override public DiscoveryNode getNode() { return node; @@ -170,19 +170,6 @@ public class MockTransport implements Transport, LifecycleComponent { requests.put(requestId, Tuple.tuple(node, action)); onSendRequest(requestId, action, request, node); } - - @Override - public void addCloseListener(ActionListener listener) { - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close() { - } }; }