Zen2: Fail fast on disconnects (#34503)

Integrates the failure detectors with the Connection lifecycle, to fail nodes as soon as:
- a leader detects one of his followers disconnecting.
- a follower detects its leader disconnecting.
This commit is contained in:
Yannick Welsch 2018-10-22 17:20:12 +02:00 committed by GitHub
parent bfd24fc030
commit 6d6ac74a08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 271 additions and 99 deletions

View File

@ -107,8 +107,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Releasable electionScheduler;
@Nullable
private Releasable prevotingRound;
@Nullable
private Releasable leaderCheckScheduler;
private long maxTermSeen;
private final Reconfigurator reconfigurator;
@ -140,7 +138,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.publicationHandler = new PublicationTransportHandler(settings, transportService, this::handlePublishRequest,
this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
@ -163,11 +161,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
};
}
private void onFollowerFailure(DiscoveryNode discoveryNode) {
private void removeNode(DiscoveryNode discoveryNode, String reason) {
synchronized (mutex) {
if (mode == Mode.LEADER) {
masterService.submitStateUpdateTask("node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"),
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
nodeRemovalExecutor);
@ -358,11 +356,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
leaderCheckScheduler = null;
}
leaderChecker.updateLeader(null);
followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
@ -391,7 +385,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert leaderChecker.leader() == null : leaderChecker.leader();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
}
@ -415,10 +409,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
preVoteCollector.update(getPreVoteResponse(), leaderNode);
if (restartLeaderChecker) {
if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
}
leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode);
leaderChecker.updateLeader(leaderNode);
}
followersChecker.clearCurrentNodes();
@ -515,7 +506,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert leaderChecker.leader() == null : leaderChecker.leader();
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
@ -553,7 +544,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert prevotingRound == null : prevotingRound;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler != null;
assert lastKnownLeader.equals(Optional.of(leaderChecker.leader()));
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
@ -564,7 +555,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert prevotingRound == null || electionScheduler != null;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert leaderChecker.leader() == null : leaderChecker.leader();
assert followersChecker.getKnownFollowers().isEmpty();
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -46,6 +47,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -78,7 +80,7 @@ public class FollowersChecker extends AbstractComponent {
private final TimeValue followerCheckInterval;
private final TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
private final Consumer<DiscoveryNode> onNodeFailure;
private final BiConsumer<DiscoveryNode, String> onNodeFailure;
private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;
private final Object mutex = new Object(); // protects writes to this state; read access does not need sync
@ -91,7 +93,7 @@ public class FollowersChecker extends AbstractComponent {
public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
Consumer<DiscoveryNode> onNodeFailure) {
BiConsumer<DiscoveryNode, String> onNodeFailure) {
super(settings);
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
@ -104,6 +106,12 @@ public class FollowersChecker extends AbstractComponent {
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new,
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleDisconnectedNode(node);
}
});
}
/**
@ -228,6 +236,15 @@ public class FollowersChecker extends AbstractComponent {
}
}
private void handleDisconnectedNode(DiscoveryNode discoveryNode) {
synchronized (mutex) {
FollowerChecker followerChecker = followerCheckers.get(discoveryNode);
if (followerChecker != null && followerChecker.running()) {
followerChecker.failNode("disconnected");
}
}
}
static class FastResponseState {
final long term;
final Mode mode;
@ -303,36 +320,21 @@ public class FollowersChecker extends AbstractComponent {
failureCountSinceLastSuccess++;
final String reason;
if (failureCountSinceLastSuccess >= followerCheckRetryCount) {
logger.debug(() -> new ParameterizedMessage("{} failed too many times", FollowerChecker.this), exp);
reason = "followers check retry count exceeded";
} else if (exp instanceof ConnectTransportException
|| exp.getCause() instanceof ConnectTransportException) {
logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
reason = "disconnected";
} else {
logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
scheduleNextWakeUp();
return;
}
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (running() == false) {
logger.debug("{} no longer running, not marking faulty", FollowerChecker.this);
return;
}
faultyNodes.add(discoveryNode);
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode);
}
@Override
public String toString() {
return "detected failure of " + discoveryNode;
}
});
failNode(reason);
}
@ -343,6 +345,28 @@ public class FollowersChecker extends AbstractComponent {
});
}
void failNode(String reason) {
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (running() == false) {
logger.debug("{} condition no longer applies, not marking faulty", discoveryNode);
return;
}
faultyNodes.add(discoveryNode);
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode, reason);
}
@Override
public String toString() {
return "detected failure of " + discoveryNode;
}
});
}
private void scheduleNextWakeUp() {
transportService.getThreadPool().schedule(followerCheckInterval, Names.SAME, new Runnable() {
@Override

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -33,6 +34,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -46,6 +48,7 @@ import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
@ -77,6 +80,8 @@ public class LeaderChecker extends AbstractComponent {
private final TransportService transportService;
private final Runnable onLeaderFailure;
private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
private volatile DiscoveryNodes discoveryNodes;
public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
@ -88,19 +93,39 @@ public class LeaderChecker extends AbstractComponent {
this.onLeaderFailure = onLeaderFailure;
transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck);
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleDisconnectedNode(node);
}
});
}
public DiscoveryNode leader() {
CheckScheduler checkScheduler = currentChecker.get();
return checkScheduler == null ? null : checkScheduler.leader;
}
/**
* Start a leader checker for the given leader. Should only be called after successfully joining this leader.
* Starts and / or stops a leader checker for the given leader. Should only be called after successfully joining this leader.
*
* @param leader the node to be checked as leader
* @return a `Releasable` that can be used to stop this checker.
* @param leader the node to be checked as leader, or null if checks should be disabled
*/
public Releasable startLeaderChecker(final DiscoveryNode leader) {
public void updateLeader(@Nullable final DiscoveryNode leader) {
assert transportService.getLocalNode().equals(leader) == false;
CheckScheduler checkScheduler = new CheckScheduler(leader);
checkScheduler.handleWakeUp();
return checkScheduler;
final CheckScheduler checkScheduler;
if (leader != null) {
checkScheduler = new CheckScheduler(leader);
} else {
checkScheduler = null;
}
CheckScheduler previousChecker = currentChecker.getAndSet(checkScheduler);
if (previousChecker != null) {
previousChecker.close();
}
if (checkScheduler != null) {
checkScheduler.handleWakeUp();
}
}
/**
@ -137,6 +162,15 @@ public class LeaderChecker extends AbstractComponent {
}
}
private void handleDisconnectedNode(DiscoveryNode discoveryNode) {
CheckScheduler checkScheduler = currentChecker.get();
if (checkScheduler != null) {
checkScheduler.handleDisconnectedNode(discoveryNode);
} else {
logger.trace("disconnect event ignored for {}, no check scheduler", discoveryNode);
}
}
private class CheckScheduler implements Releasable {
private final AtomicBoolean isClosed = new AtomicBoolean();
@ -222,7 +256,7 @@ public class LeaderChecker extends AbstractComponent {
});
}
private void leaderFailed() {
void leaderFailed() {
if (isClosed.compareAndSet(false, true)) {
transportService.getThreadPool().generic().execute(onLeaderFailure);
} else {
@ -230,6 +264,12 @@ public class LeaderChecker extends AbstractComponent {
}
}
void handleDisconnectedNode(DiscoveryNode discoveryNode) {
if (discoveryNode.equals(leader)) {
leaderFailed();
}
}
private void scheduleNextWakeUp() {
logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval);
transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() {

View File

@ -387,7 +387,29 @@ public class CoordinatorTests extends ESTestCase {
cluster.stabilise(); // would not work if disconnect1 were removed from the configuration
}
public void testLeaderDisconnectionDetectedQuickly() {
public void testLeaderDisconnectionWithDisconnectEventDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();
final ClusterNode originalLeader = cluster.getAnyLeader();
logger.info("--> disconnecting leader {}", originalLeader);
originalLeader.disconnect();
logger.info("--> followers get disconnect event for leader {} ", originalLeader);
cluster.getAllNodesExcept(originalLeader).forEach(cn -> cn.onDisconnectEventFrom(originalLeader));
// turn leader into candidate, which stabilisation asserts at the end
cluster.getAllNodesExcept(originalLeader).forEach(cn -> originalLeader.onDisconnectEventFrom(cn));
cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled
// then wait for a new election
+ DEFAULT_ELECTION_DELAY
// wait for the removal to be committed
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
// then wait for the followup reconfiguration
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}
public void testLeaderDisconnectionWithoutDisconnectEventDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();
@ -398,7 +420,6 @@ public class CoordinatorTests extends ESTestCase {
cluster.stabilise(Math.max(
// Each follower may have just sent a leader check, which receives no response
// TODO not necessary if notified of disconnection
defaultMillis(LEADER_CHECK_TIMEOUT_SETTING)
// then wait for the follower to check the leader
+ defaultMillis(LEADER_CHECK_INTERVAL_SETTING)
@ -408,7 +429,6 @@ public class CoordinatorTests extends ESTestCase {
+ DEFAULT_ELECTION_DELAY,
// ALSO the leader may have just sent a follower check, which receives no response
// TODO unnecessary if notified of disconnection
defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)
// wait for the leader to check its followers
+ defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING)
@ -478,10 +498,27 @@ public class CoordinatorTests extends ESTestCase {
final ClusterNode follower = cluster.getAnyNodeExcept(leader);
logger.info("--> disconnecting follower {}", follower);
follower.disconnect();
logger.info("--> leader {} and follower {} get disconnect event", leader, follower);
leader.onDisconnectEventFrom(follower);
follower.onDisconnectEventFrom(leader); // to turn follower into candidate, which stabilisation asserts at the end
cluster.stabilise(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
// then wait for the followup reconfiguration
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId()));
}
public void testFollowerDisconnectionWithoutDisconnectEventDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower = cluster.getAnyNodeExcept(leader);
logger.info("--> disconnecting follower {}", follower);
follower.disconnect();
cluster.stabilise(Math.max(
// the leader may have just sent a follower check, which receives no response
// TODO unnecessary if notified of disconnection
defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)
// wait for the leader to check the follower
+ defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING)
@ -493,7 +530,6 @@ public class CoordinatorTests extends ESTestCase {
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
// ALSO the follower may have just sent a leader check, which receives no response
// TODO not necessary if notified of disconnection
defaultMillis(LEADER_CHECK_TIMEOUT_SETTING)
// then wait for the follower to check the leader
+ defaultMillis(LEADER_CHECK_INTERVAL_SETTING)
@ -627,25 +663,26 @@ public class CoordinatorTests extends ESTestCase {
}
public void testAckListenerReceivesNacksIfLeaderStandsDown() {
// TODO: needs support for handling disconnects
// final Cluster cluster = new Cluster(3);
// cluster.runRandomly();
// cluster.stabilise();
// final ClusterNode leader = cluster.getAnyLeader();
// final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
// final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
//
// leader.partition();
// follower0.coordinator.handleDisconnectedNode(leader.localNode);
// follower1.coordinator.handleDisconnectedNode(leader.localNode);
// cluster.runUntil(cluster.getCurrentTimeMillis() + cluster.DEFAULT_ELECTION_TIME);
// AckCollector ackCollector = leader.submitRandomValue();
// cluster.runUntil(cluster.currentTimeMillis + Cluster.DEFAULT_DELAY_VARIABILITY);
// leader.connectionStatus = ConnectionStatus.CONNECTED;
// cluster.stabilise(cluster.DEFAULT_STABILISATION_TIME, 0L);
// assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
// assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
// assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
leader.blackhole();
follower0.onDisconnectEventFrom(leader);
follower1.onDisconnectEventFrom(leader);
// let followers elect a leader among themselves before healing the leader and running the publication
cluster.runFor(DEFAULT_DELAY_VARIABILITY // disconnect is scheduled
+ DEFAULT_ELECTION_DELAY, "elect new leader");
// cluster has two nodes in mode LEADER, in different terms ofc, and the one in the lower term wont be able to publish anything
leader.heal();
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.stabilise(); // TODO: check if can find a better bound here
assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));
}
public void testAckListenerReceivesNacksFromFollowerInHigherTerm() {
@ -1124,11 +1161,16 @@ public class CoordinatorTests extends ESTestCase {
}
ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) {
List<ClusterNode> filteredNodes = getAllNodesExcept(clusterNodes);
assert filteredNodes.isEmpty() == false;
return randomFrom(filteredNodes);
}
List<ClusterNode> getAllNodesExcept(ClusterNode... clusterNodes) {
Set<String> forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet());
List<ClusterNode> acceptableNodes
= this.clusterNodes.stream().filter(n -> forbiddenIds.contains(n.getId()) == false).collect(Collectors.toList());
assert acceptableNodes.isEmpty() == false;
return randomFrom(acceptableNodes);
return acceptableNodes;
}
ClusterNode getAnyNodePreferringLeaders() {
@ -1367,6 +1409,10 @@ public class CoordinatorTests extends ESTestCase {
return blackholed;
}
void onDisconnectEventFrom(ClusterNode clusterNode) {
transportService.disconnectFromNode(clusterNode.localNode);
}
ClusterState getLastAppliedClusterState() {
return clusterApplier.lastAppliedClusterState;
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -51,6 +52,7 @@ import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_C
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
@ -93,7 +95,7 @@ public class FollowersCheckerTests extends ESTestCase {
final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
assert false : fcr;
}, node -> {
}, (node, reason) -> {
assert false : node;
});
@ -163,6 +165,7 @@ public class FollowersCheckerTests extends ESTestCase {
final Settings settings = settingsBuilder.build();
testBehaviourOfFailingNode(settings, () -> null,
"followers check retry count exceeded",
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()
+ FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis());
}
@ -180,6 +183,7 @@ public class FollowersCheckerTests extends ESTestCase {
testBehaviourOfFailingNode(settings, () -> {
throw new ElasticsearchException("simulated exception");
},
"followers check retry count exceeded",
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis());
}
@ -211,17 +215,71 @@ public class FollowersCheckerTests extends ESTestCase {
throw new ElasticsearchException("simulated exception");
}
},
"followers check retry count exceeded",
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1)
* FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis());
}
public void testFailsNodeThatDisconnects() {
public void testFailsNodeThatIsDisconnected() {
testBehaviourOfFailingNode(Settings.EMPTY, () -> {
throw new ConnectTransportException(null, "simulated exception");
}, 0);
}, "disconnected", 0);
}
private void testBehaviourOfFailingNode(Settings testSettings, Supplier<TransportResponse.Empty> responder, long expectedFailureTime) {
public void testFailsNodeThatDisconnects() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertFalse(node.equals(localNode));
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
return;
}
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
handleResponse(requestId, Empty.INSTANCE);
}
@Override
public String toString() {
return "sending response to [" + action + "][" + requestId + "] from " + node;
}
});
}
};
final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final AtomicBoolean nodeFailed = new AtomicBoolean();
final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
assert false : fcr;
}, (node, reason) -> {
assertTrue(nodeFailed.compareAndSet(false, true));
assertThat(reason, equalTo("disconnected"));
});
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
followersChecker.setCurrentNodes(discoveryNodes);
transportService.connectToNode(otherNode);
transportService.disconnectFromNode(otherNode);
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(nodeFailed.get());
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
}
private void testBehaviourOfFailingNode(Settings testSettings, Supplier<TransportResponse.Empty> responder, String failureReason,
long expectedFailureTime) {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build();
@ -266,8 +324,9 @@ public class FollowersCheckerTests extends ESTestCase {
final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
assert false : fcr;
}, node -> {
}, (node, reason) -> {
assertTrue(nodeFailed.compareAndSet(false, true));
assertThat(reason, equalTo(failureReason));
});
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
@ -357,7 +416,7 @@ public class FollowersCheckerTests extends ESTestCase {
if (exception != null) {
throw exception;
}
}, node -> {
}, (node, reason) -> {
assert false : node;
});

View File

@ -21,10 +21,10 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.coordination.LeaderChecker.LeaderCheckRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
@ -47,6 +47,7 @@ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -145,7 +146,8 @@ public class LeaderCheckerTests extends ESTestCase {
() -> assertTrue(leaderFailed.compareAndSet(false, true)));
logger.info("--> creating first checker");
try (Releasable ignored = leaderChecker.startLeaderChecker(leader1)) {
leaderChecker.updateLeader(leader1);
{
final long maxCheckCount = randomLongBetween(2, 1000);
logger.info("--> checking that no failure is detected in {} checks", maxCheckCount);
while (checkCount.get() < maxCheckCount) {
@ -153,13 +155,15 @@ public class LeaderCheckerTests extends ESTestCase {
deterministicTaskQueue.advanceTime();
}
}
leaderChecker.updateLeader(null);
logger.info("--> running remaining tasks");
deterministicTaskQueue.runAllTasks();
assertFalse(leaderFailed.get());
logger.info("--> creating second checker");
try (Releasable ignored = leaderChecker.startLeaderChecker(leader2)) {
leaderChecker.updateLeader(leader2);
{
checkCount.set(0);
final long maxCheckCount = randomLongBetween(2, 1000);
logger.info("--> checking again that no failure is detected in {} checks", maxCheckCount);
@ -184,6 +188,7 @@ public class LeaderCheckerTests extends ESTestCase {
+ leaderCheckTimeoutMillis // needed because a successful check response might be in flight at the time of failure
));
}
leaderChecker.updateLeader(null);
}
enum Response {
@ -201,6 +206,10 @@ public class LeaderCheckerTests extends ESTestCase {
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
return;
}
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
assertTrue(node.equals(leader));
final Response response = responseHolder[0];
@ -237,7 +246,8 @@ public class LeaderCheckerTests extends ESTestCase {
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService,
() -> assertTrue(leaderFailed.compareAndSet(false, true)));
try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) {
leaderChecker.updateLeader(leader);
{
while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) {
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
@ -253,12 +263,14 @@ public class LeaderCheckerTests extends ESTestCase {
assertTrue(leaderFailed.get());
}
leaderChecker.updateLeader(null);
deterministicTaskQueue.runAllTasks();
leaderFailed.set(false);
responseHolder[0] = Response.SUCCESS;
try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) {
leaderChecker.updateLeader(leader);
{
while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) {
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
@ -274,6 +286,19 @@ public class LeaderCheckerTests extends ESTestCase {
assertTrue(leaderFailed.get());
}
deterministicTaskQueue.runAllTasks();
leaderFailed.set(false);
responseHolder[0] = Response.SUCCESS;
leaderChecker.updateLeader(leader);
{
transportService.connectToNode(leader); // need to connect first for disconnect to have any effect
transportService.disconnectFromNode(leader);
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(leaderFailed.get());
}
}
public void testLeaderBehaviour() {

View File

@ -19,7 +19,6 @@
package org.elasticsearch.test.transport;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
@ -34,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CloseableConnection;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RemoteTransportException;
@ -158,7 +158,7 @@ public class MockTransport implements Transport, LifecycleComponent {
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new Connection() {
return new CloseableConnection() {
@Override
public DiscoveryNode getNode() {
return node;
@ -170,19 +170,6 @@ public class MockTransport implements Transport, LifecycleComponent {
requests.put(requestId, Tuple.tuple(node, action));
onSendRequest(requestId, action, request, node);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
};
}