Introduce FollowersChecker (#33917)

It is important that the leader periodically checks that its followers are
still healthy and can remain part of its cluster. If these checks fail
repeatedly then the leader should remove the faulty node from the cluster. The
FollowerChecker, introduced in this commit, performs these periodic checks and
deals with retries.
This commit is contained in:
David Turner 2018-09-22 11:34:16 +01:00 committed by GitHub
parent a612dd1272
commit 1761b6c85c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 868 additions and 2 deletions

View File

@ -0,0 +1,387 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
/**
* The FollowersChecker is responsible for allowing a leader to check that its followers are still connected and healthy. On deciding that a
* follower has failed the leader will remove it from the cluster. We are fairly lenient, possibly allowing multiple checks to fail before
* considering a follower to be faulty, to allow for a brief network partition or a long GC cycle to occur without triggering the removal of
* a node and the consequent shard reallocation.
*/
public class FollowersChecker extends AbstractComponent {
public static final String FOLLOWER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/follower_check";
// the time between checks sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_INTERVAL_SETTING =
Setting.timeSetting("cluster.fault_detection.follower_check.interval",
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope);
// the timeout for each check sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_TIMEOUT_SETTING =
Setting.timeSetting("cluster.fault_detection.follower_check.timeout",
TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
// the number of failed checks that must happen before the follower is considered to have failed.
public static final Setting<Integer> FOLLOWER_CHECK_RETRY_COUNT_SETTING =
Setting.intSetting("cluster.fault_detection.follower_check.retry_count", 3, 1, Setting.Property.NodeScope);
private final TimeValue followerCheckInterval;
private final TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
private final Consumer<DiscoveryNode> onNodeFailure;
private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;
private final Object mutex = new Object(); // protects writes to this state; read access does not need sync
private final Map<DiscoveryNode, FollowerChecker> followerCheckers = newConcurrentMap();
private final Set<DiscoveryNode> faultyNodes = new HashSet<>();
private final TransportService transportService;
private volatile FastResponseState fastResponseState;
public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
Consumer<DiscoveryNode> onNodeFailure) {
super(settings);
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
this.onNodeFailure = onNodeFailure;
followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new,
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
}
/**
* Update the set of known nodes, starting to check any new ones and stopping checking any previously-known-but-now-unknown ones.
*/
public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
synchronized (mutex) {
final Predicate<DiscoveryNode> isUnknownNode = n -> discoveryNodes.nodeExists(n) == false;
followerCheckers.keySet().removeIf(isUnknownNode);
faultyNodes.removeIf(isUnknownNode);
for (final DiscoveryNode discoveryNode : discoveryNodes) {
if (discoveryNode.equals(discoveryNodes.getLocalNode()) == false
&& followerCheckers.containsKey(discoveryNode) == false
&& faultyNodes.contains(discoveryNode) == false) {
final FollowerChecker followerChecker = new FollowerChecker(discoveryNode);
followerCheckers.put(discoveryNode, followerChecker);
followerChecker.start();
}
}
}
}
/**
* The system is normally in a state in which every follower remains a follower of a stable leader in a single term for an extended
* period of time, and therefore our response to every follower check is the same. We handle this case with a single volatile read
* entirely on the network thread, and only if the fast path fails do we perform some work in the background, by notifying the
* FollowersChecker whenever our term or mode changes here.
*/
public void updateFastResponseState(final long term, final Mode mode) {
fastResponseState = new FastResponseState(term, mode);
}
private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel transportChannel) throws IOException {
FastResponseState responder = this.fastResponseState;
if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
// TODO trigger a term bump if we voted for a different leader in this term
logger.trace("responding to {} on fast path", request);
transportChannel.sendResponse(Empty.INSTANCE);
return;
}
if (request.term < responder.term) {
throw new CoordinationStateRejectedException("rejecting " + request + " since local state is " + this);
}
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws IOException {
logger.trace("responding to {} on slow path", request);
try {
handleRequestAndUpdateState.accept(request);
} catch (Exception e) {
transportChannel.sendResponse(e);
return;
}
transportChannel.sendResponse(Empty.INSTANCE);
}
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("exception while responding to {}", request), e);
}
@Override
public String toString() {
return "slow path response to " + request;
}
});
}
// TODO in the PoC a faulty node was considered non-faulty again if it sent us a PeersRequest:
// - node disconnects, detected faulty, removal is enqueued
// - node reconnects, pings us, finds we are master, requests to join, all before removal is applied
// - join is processed before removal, but we do not publish to known-faulty nodes so the joining node does not receive this publication
// - it doesn't start its leader checker since it receives nothing to cause it to become a follower
// Apparently this meant that it remained a candidate for too long, leading to a test failure. At the time this logic was added, we did
// not have gossip-based discovery which would (I think) have retried this joining process a short time later. It's therefore possible
// that this is no longer required, so it's omitted here until we can be sure if it's necessary or not.
/**
* @return nodes in the current cluster state which have failed their follower checks.
*/
public Set<DiscoveryNode> getFaultyNodes() {
synchronized (mutex) {
return new HashSet<>(this.faultyNodes);
}
}
@Override
public String toString() {
return "FollowersChecker{" +
"followerCheckInterval=" + followerCheckInterval +
", followerCheckTimeout=" + followerCheckTimeout +
", followerCheckRetryCount=" + followerCheckRetryCount +
", followerCheckers=" + followerCheckers +
", faultyNodes=" + faultyNodes +
", fastResponseState=" + fastResponseState +
'}';
}
static class FastResponseState {
final long term;
final Mode mode;
FastResponseState(final long term, final Mode mode) {
this.term = term;
this.mode = mode;
}
@Override
public String toString() {
return "FastResponseState{" +
"term=" + term +
", mode=" + mode +
'}';
}
}
/**
* A checker for an individual follower.
*/
private class FollowerChecker {
private final DiscoveryNode discoveryNode;
private int failureCountSinceLastSuccess;
FollowerChecker(DiscoveryNode discoveryNode) {
this.discoveryNode = discoveryNode;
}
private boolean running() {
return this == followerCheckers.get(discoveryNode);
}
void start() {
assert running();
handleWakeUp();
}
private void handleWakeUp() {
if (running() == false) {
logger.trace("handleWakeUp: not running");
return;
}
final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term);
logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request);
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler<Empty>() {
@Override
public void handleResponse(Empty response) {
if (running() == false) {
logger.trace("{} no longer running", FollowerChecker.this);
return;
}
failureCountSinceLastSuccess = 0;
logger.trace("{} check successful", FollowerChecker.this);
scheduleNextWakeUp();
}
@Override
public void handleException(TransportException exp) {
if (running() == false) {
logger.debug(new ParameterizedMessage("{} no longer running", FollowerChecker.this), exp);
return;
}
failureCountSinceLastSuccess++;
if (failureCountSinceLastSuccess >= followerCheckRetryCount) {
logger.debug(() -> new ParameterizedMessage("{} failed too many times", FollowerChecker.this), exp);
} else if (exp instanceof ConnectTransportException
|| exp.getCause() instanceof ConnectTransportException) {
logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
} else {
logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
scheduleNextWakeUp();
return;
}
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (running() == false) {
logger.debug("{} no longer running, not marking faulty", FollowerChecker.this);
return;
}
faultyNodes.add(discoveryNode);
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode);
}
@Override
public String toString() {
return "detected failure of " + discoveryNode;
}
});
}
@Override
public String executor() {
return Names.SAME;
}
});
}
private void scheduleNextWakeUp() {
transportService.getThreadPool().schedule(followerCheckInterval, Names.SAME, new Runnable() {
@Override
public void run() {
handleWakeUp();
}
@Override
public String toString() {
return FollowerChecker.this + "::handleWakeUp";
}
});
}
@Override
public String toString() {
return "FollowerChecker{" +
"discoveryNode=" + discoveryNode +
", failureCountSinceLastSuccess=" + failureCountSinceLastSuccess +
", [" + FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey() + "]=" + followerCheckRetryCount +
'}';
}
}
public static class FollowerCheckRequest extends TransportRequest {
private final long term;
public long getTerm() {
return term;
}
public FollowerCheckRequest(final long term) {
this.term = term;
}
public FollowerCheckRequest(final StreamInput in) throws IOException {
super(in);
term = in.readLong();
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(term);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowerCheckRequest that = (FollowerCheckRequest) o;
return term == that.term;
}
@Override
public String toString() {
return "FollowerCheckRequest{" +
"term=" + term +
'}';
}
@Override
public int hashCode() {
return Objects.hash(term);
}
}
}

View File

@ -60,12 +60,12 @@ public class LeaderChecker extends AbstractComponent {
// the time between checks sent to the leader
public static final Setting<TimeValue> LEADER_CHECK_INTERVAL_SETTING =
Setting.timeSetting("cluster.fault_detection.leader_check.interval",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope);
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope);
// the timeout for each check sent to the leader
public static final Setting<TimeValue> LEADER_CHECK_TIMEOUT_SETTING =
Setting.timeSetting("cluster.fault_detection.leader_check.timeout",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
// the number of failed checks that must happen before the leader is considered to have failed.
public static final Setting<Integer> LEADER_CHECK_RETRY_COUNT_SETTING =

View File

@ -0,0 +1,479 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class FollowersCheckerTests extends ESTestCase {
public void testChecksExpectedNodes() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();
final DiscoveryNodes[] discoveryNodesHolder
= new DiscoveryNodes[]{DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build()};
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final Set<DiscoveryNode> checkedNodes = new HashSet<>();
final AtomicInteger checkCount = new AtomicInteger();
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertThat(action, equalTo(FOLLOWER_CHECK_ACTION_NAME));
assertThat(request, instanceOf(FollowerCheckRequest.class));
assertTrue(discoveryNodesHolder[0].nodeExists(node));
assertThat(node, not(equalTo(localNode)));
checkedNodes.add(node);
checkCount.incrementAndGet();
handleResponse(requestId, Empty.INSTANCE);
}
};
final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
assert false : fcr;
}, node -> {
assert false : node;
});
followersChecker.setCurrentNodes(discoveryNodesHolder[0]);
deterministicTaskQueue.runAllTasks(random());
assertThat(checkedNodes, empty());
assertThat(followersChecker.getFaultyNodes(), empty());
final DiscoveryNode otherNode1 = new DiscoveryNode("other-node-1", buildNewFakeTransportAddress(), Version.CURRENT);
followersChecker.setCurrentNodes(discoveryNodesHolder[0] = DiscoveryNodes.builder(discoveryNodesHolder[0]).add(otherNode1).build());
while (checkCount.get() < 10) {
if (deterministicTaskQueue.hasRunnableTasks()) {
deterministicTaskQueue.runRandomTask(random());
} else {
deterministicTaskQueue.advanceTime();
}
}
assertThat(checkedNodes, contains(otherNode1));
assertThat(followersChecker.getFaultyNodes(), empty());
checkedNodes.clear();
checkCount.set(0);
final DiscoveryNode otherNode2 = new DiscoveryNode("other-node-2", buildNewFakeTransportAddress(), Version.CURRENT);
followersChecker.setCurrentNodes(discoveryNodesHolder[0] = DiscoveryNodes.builder(discoveryNodesHolder[0]).add(otherNode2).build());
while (checkCount.get() < 10) {
if (deterministicTaskQueue.hasRunnableTasks()) {
deterministicTaskQueue.runRandomTask(random());
} else {
deterministicTaskQueue.advanceTime();
}
}
assertThat(checkedNodes, containsInAnyOrder(otherNode1, otherNode2));
assertThat(followersChecker.getFaultyNodes(), empty());
checkedNodes.clear();
checkCount.set(0);
followersChecker.setCurrentNodes(discoveryNodesHolder[0]
= DiscoveryNodes.builder(discoveryNodesHolder[0]).remove(otherNode1).build());
while (checkCount.get() < 10) {
if (deterministicTaskQueue.hasRunnableTasks()) {
deterministicTaskQueue.runRandomTask(random());
} else {
deterministicTaskQueue.advanceTime();
}
}
assertThat(checkedNodes, contains(otherNode2));
assertThat(followersChecker.getFaultyNodes(), empty());
checkedNodes.clear();
followersChecker.setCurrentNodes(discoveryNodesHolder[0] = DiscoveryNodes.EMPTY_NODES);
deterministicTaskQueue.runAllTasks(random());
assertThat(checkedNodes, empty());
}
public void testFailsNodeThatDoesNotRespond() {
final Builder settingsBuilder = Settings.builder();
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), randomIntBetween(1, 10));
}
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), randomIntBetween(100, 100000) + "ms");
}
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 100000) + "ms");
}
final Settings settings = settingsBuilder.build();
testBehaviourOfFailingNode(settings, () -> null,
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()
+ FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis());
}
public void testFailsNodeThatRejectsCheck() {
final Builder settingsBuilder = Settings.builder();
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), randomIntBetween(1, 10));
}
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), randomIntBetween(100, 100000) + "ms");
}
final Settings settings = settingsBuilder.build();
testBehaviourOfFailingNode(settings, () -> {
throw new ElasticsearchException("simulated exception");
},
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis());
}
public void testFailureCounterResetsOnSuccess() {
final Builder settingsBuilder = Settings.builder();
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), randomIntBetween(2, 10));
}
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), randomIntBetween(100, 100000) + "ms");
}
final Settings settings = settingsBuilder.build();
final int retryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);
final int maxRecoveries = randomIntBetween(3, 10);
// passes just enough checks to keep it alive, up to maxRecoveries, and then fails completely
testBehaviourOfFailingNode(settings, new Supplier<Empty>() {
private int checkIndex;
private int recoveries;
@Override
public Empty get() {
checkIndex++;
if (checkIndex % retryCount == 0 && recoveries < maxRecoveries) {
recoveries++;
return Empty.INSTANCE;
}
throw new ElasticsearchException("simulated exception");
}
},
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1)
* FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis());
}
public void testFailsNodeThatDisconnects() {
testBehaviourOfFailingNode(Settings.EMPTY, () -> {
throw new ConnectTransportException(null, "simulated exception");
}, 0);
}
private void testBehaviourOfFailingNode(Settings testSettings, Supplier<TransportResponse.Empty> responder, long expectedFailureTime) {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertFalse(node.equals(localNode));
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
if (node.equals(otherNode) == false) {
// other nodes are ok
handleResponse(requestId, Empty.INSTANCE);
return;
}
try {
final Empty response = responder.get();
if (response != null) {
handleResponse(requestId, response);
}
} catch (Exception e) {
handleRemoteError(requestId, e);
}
}
@Override
public String toString() {
return "sending response to [" + action + "][" + requestId + "] from " + node;
}
});
}
};
final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final AtomicBoolean nodeFailed = new AtomicBoolean();
final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
assert false : fcr;
}, node -> {
assertTrue(nodeFailed.compareAndSet(false, true));
});
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
followersChecker.setCurrentNodes(discoveryNodes);
while (nodeFailed.get() == false) {
if (deterministicTaskQueue.hasRunnableTasks() == false) {
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
}
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(expectedFailureTime));
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
deterministicTaskQueue.runAllTasks(random()); // checks it does not continue checking a failed node
// add another node and see that it schedules checks for this new node but keeps on considering the old one faulty
final DiscoveryNode otherNode2 = new DiscoveryNode("other-node-2", buildNewFakeTransportAddress(), Version.CURRENT);
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).add(otherNode2).build();
followersChecker.setCurrentNodes(discoveryNodes);
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
// remove the faulty node and see that it is removed
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).remove(otherNode).build();
followersChecker.setCurrentNodes(discoveryNodes);
assertThat(followersChecker.getFaultyNodes(), empty());
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
// remove the working node and see that everything eventually stops
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).remove(otherNode2).build();
followersChecker.setCurrentNodes(discoveryNodes);
deterministicTaskQueue.runAllTasks(random());
// add back the faulty node afresh and see that it fails again
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).add(otherNode).build();
followersChecker.setCurrentNodes(discoveryNodes);
nodeFailed.set(false);
assertThat(followersChecker.getFaultyNodes(), empty());
deterministicTaskQueue.runAllTasks(random());
assertTrue(nodeFailed.get());
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
}
public void testFollowerCheckRequestEqualsHashCodeSerialization() {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(new FollowerCheckRequest(randomNonNegativeLong()),
rq -> copyWriteable(rq, writableRegistry(), FollowerCheckRequest::new),
rq -> new FollowerCheckRequest(randomNonNegativeLong()));
}
public void testResponder() {
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
throw new AssertionError("no requests expected");
}
};
final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> follower, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final AtomicBoolean calledCoordinator = new AtomicBoolean();
final AtomicReference<RuntimeException> coordinatorException = new AtomicReference<>();
final FollowersChecker followersChecker = new FollowersChecker(settings, transportService,
fcr -> {
assertTrue(calledCoordinator.compareAndSet(false, true));
final RuntimeException exception = coordinatorException.get();
if (exception != null) {
throw exception;
}
}, node -> {
assert false : node;
});
{
// Does not call into the coordinator in the normal case
final long term = randomNonNegativeLong();
followersChecker.updateFastResponseState(term, Mode.FOLLOWER);
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
assertTrue(expectsSuccess.succeeded());
assertFalse(calledCoordinator.get());
}
{
// Does not call into the coordinator for a term that's too low, just rejects immediately
final long leaderTerm = randomLongBetween(1, Long.MAX_VALUE - 1);
final long followerTerm = randomLongBetween(leaderTerm + 1, Long.MAX_VALUE);
followersChecker.updateFastResponseState(followerTerm, Mode.FOLLOWER);
final AtomicReference<TransportException> receivedException = new AtomicReference<>();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public void handleResponse(TransportResponse.Empty response) {
fail("unexpected success");
}
@Override
public void handleException(TransportException exp) {
assertThat(exp, not(nullValue()));
assertTrue(receivedException.compareAndSet(null, exp));
}
@Override
public String executor() {
return Names.SAME;
}
});
deterministicTaskQueue.runAllTasks(random());
assertFalse(calledCoordinator.get());
assertThat(receivedException.get(), not(nullValue()));
}
{
// Calls into the coordinator if the term needs bumping
final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE);
final long followerTerm = randomLongBetween(1, leaderTerm - 1);
followersChecker.updateFastResponseState(followerTerm, Mode.FOLLOWER);
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
assertTrue(expectsSuccess.succeeded());
assertTrue(calledCoordinator.get());
calledCoordinator.set(false);
}
{
// Calls into the coordinator if not a follower
final long term = randomNonNegativeLong();
followersChecker.updateFastResponseState(term, randomFrom(Mode.LEADER, Mode.CANDIDATE));
final ExpectsSuccess expectsSuccess = new ExpectsSuccess();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term), expectsSuccess);
deterministicTaskQueue.runAllTasks(random());
assertTrue(expectsSuccess.succeeded());
assertTrue(calledCoordinator.get());
calledCoordinator.set(false);
}
{
// If it calls into the coordinator and the coordinator throws an exception then it's passed back to the caller
final long term = randomNonNegativeLong();
followersChecker.updateFastResponseState(term, randomFrom(Mode.LEADER, Mode.CANDIDATE));
final String exceptionMessage = "test simulated exception " + randomNonNegativeLong();
coordinatorException.set(new ElasticsearchException(exceptionMessage));
final AtomicReference<TransportException> receivedException = new AtomicReference<>();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(term),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public void handleResponse(TransportResponse.Empty response) {
fail("unexpected success");
}
@Override
public void handleException(TransportException exp) {
assertThat(exp, not(nullValue()));
assertTrue(receivedException.compareAndSet(null, exp));
}
@Override
public String executor() {
return Names.SAME;
}
});
deterministicTaskQueue.runAllTasks(random());
assertTrue(calledCoordinator.get());
assertThat(receivedException.get(), not(nullValue()));
assertThat(receivedException.get().getRootCause().getMessage(), equalTo(exceptionMessage));
}
}
private static class ExpectsSuccess implements TransportResponseHandler<Empty> {
private final AtomicBoolean responseReceived = new AtomicBoolean();
@Override
public void handleResponse(Empty response) {
assertTrue(responseReceived.compareAndSet(false, true));
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("unexpected", exp);
}
@Override
public String executor() {
return Names.SAME;
}
public boolean succeeded() {
return responseReceived.get();
}
}
}