[Zen2] Introduce LeaderChecker (#33024)

It is important that follower nodes periodically check that their leader is
still healthy and that they remain part of its cluster. If these checks fail
repeatedly then followers should attempt to find and join a new leader,
possibly electing one in the process. The LeaderChecker, introduced in this
commit, performs these periodic checks and deals with retries.
This commit is contained in:
David Turner 2018-09-20 20:05:55 +01:00 committed by GitHub
parent 0b4a6ae97c
commit 187f787f52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 686 additions and 8 deletions

View File

@ -0,0 +1,282 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
* fairly lenient, possibly allowing multiple checks to fail before considering the leader to be faulty, to allow for the leader to
* temporarily stand down on occasion, e.g. if it needs to move to a higher term. On deciding that the leader has failed a follower will
* become a candidate and attempt to become a leader itself.
*/
public class LeaderChecker extends AbstractComponent {
public static final String LEADER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/leader_check";
// the time between checks sent to the leader
public static final Setting<TimeValue> LEADER_CHECK_INTERVAL_SETTING =
Setting.timeSetting("cluster.fault_detection.leader_check.interval",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope);
// the timeout for each check sent to the leader
public static final Setting<TimeValue> LEADER_CHECK_TIMEOUT_SETTING =
Setting.timeSetting("cluster.fault_detection.leader_check.timeout",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
// the number of failed checks that must happen before the leader is considered to have failed.
public static final Setting<Integer> LEADER_CHECK_RETRY_COUNT_SETTING =
Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope);
private final TimeValue leaderCheckInterval;
private final TimeValue leaderCheckTimeout;
private final int leaderCheckRetryCount;
private final TransportService transportService;
private final Runnable onLeaderFailure;
private volatile DiscoveryNodes lastPublishedDiscoveryNodes;
public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
super(settings);
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck);
}
/**
* Start a leader checker for the given leader. Should only be called after successfully joining this leader.
*
* @param leader the node to be checked as leader
* @return a `Releasable` that can be used to stop this checker.
*/
public Releasable startLeaderChecker(final DiscoveryNode leader) {
assert transportService.getLocalNode().equals(leader) == false;
CheckScheduler checkScheduler = new CheckScheduler(leader);
checkScheduler.handleWakeUp();
return checkScheduler;
}
/**
* Update the "known" discovery nodes. Should be called on the leader before a new cluster state is published to reflect the new
* publication targets, and also called if a leader becomes a non-leader.
* TODO if heartbeats can make nodes become followers then this needs to be called before a heartbeat is sent to a new node too.
*
* isLocalNodeElectedMaster() should reflect whether this node is a leader, and nodeExists()
* should indicate whether nodes are known publication targets or not.
*/
public void setLastPublishedDiscoveryNodes(DiscoveryNodes discoveryNodes) {
logger.trace("updating last-published nodes: {}", discoveryNodes);
lastPublishedDiscoveryNodes = discoveryNodes;
}
private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException {
final DiscoveryNodes lastPublishedDiscoveryNodes = this.lastPublishedDiscoveryNodes;
assert lastPublishedDiscoveryNodes != null;
if (lastPublishedDiscoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("non-master handling {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check"));
} else if (lastPublishedDiscoveryNodes.nodeExists(request.getSender()) == false) {
logger.debug("leader check from unknown node: {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node"));
} else {
logger.trace("handling {}", request);
transportChannel.sendResponse(Empty.INSTANCE);
}
}
private class CheckScheduler implements Releasable {
private final AtomicBoolean isClosed = new AtomicBoolean();
private final AtomicLong failureCountSinceLastSuccess = new AtomicLong();
private final DiscoveryNode leader;
CheckScheduler(final DiscoveryNode leader) {
this.leader = leader;
}
@Override
public void close() {
if (isClosed.compareAndSet(false, true) == false) {
logger.debug("already closed");
} else {
logger.debug("closed");
}
}
void handleWakeUp() {
if (isClosed.get()) {
logger.debug("closed check scheduler woken up, doing nothing");
return;
}
logger.trace("checking {} with [{}] = {}", leader, LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeout);
// TODO lag detection:
// In the PoC, the leader sent its current version to the follower in the response to a LeaderCheck, so the follower
// could detect if it was lagging. We'd prefer this to be implemented on the leader, so the response is just
// TransportResponse.Empty here.
transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()),
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public void handleResponse(Empty response) {
if (isClosed.get()) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}
failureCountSinceLastSuccess.set(0);
scheduleNextWakeUp(); // logs trace message indicating success
}
@Override
public void handleException(TransportException exp) {
if (isClosed.get()) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage("leader [{}] disconnected, failing immediately", leader), exp);
leaderFailed();
return;
}
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
if (failureCount >= leaderCheckRetryCount) {
logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader [{}] has failed",
failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp);
leaderFailed();
return;
}
logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) with leader [{}]",
failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp);
scheduleNextWakeUp();
}
@Override
public String executor() {
return Names.SAME;
}
});
}
private void leaderFailed() {
if (isClosed.compareAndSet(false, true)) {
transportService.getThreadPool().generic().execute(onLeaderFailure);
} else {
logger.debug("already closed, not failing leader");
}
}
private void scheduleNextWakeUp() {
logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval);
transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() {
@Override
public void run() {
handleWakeUp();
}
@Override
public String toString() {
return "scheduled check of leader " + leader;
}
});
}
}
public static class LeaderCheckRequest extends TransportRequest {
private final DiscoveryNode sender;
public LeaderCheckRequest(final DiscoveryNode sender) {
this.sender = sender;
}
public LeaderCheckRequest(final StreamInput in) throws IOException {
super(in);
sender = new DiscoveryNode(in);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
sender.writeTo(out);
}
public DiscoveryNode getSender() {
return sender;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final LeaderCheckRequest that = (LeaderCheckRequest) o;
return Objects.equals(sender, that.sender);
}
@Override
public int hashCode() {
return Objects.hash(sender);
}
@Override
public String toString() {
return "LeaderCheckRequest{" +
"sender=" + sender +
'}';
}
}
}

View File

@ -951,7 +951,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
assert responseHandlers.contains(requestId) == false;
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId);
if (timeoutInfoHolder != null) {
long time = System.currentTimeMillis();
long time = threadPool.relativeTimeInMillis();
logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " +
"action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(),
timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
@ -1014,7 +1014,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
final class TimeoutHandler implements Runnable {
private final long requestId;
private final long sentTime = System.currentTimeMillis();
private final long sentTime = threadPool.relativeTimeInMillis();
private final String action;
private final DiscoveryNode node;
volatile ScheduledFuture future;
@ -1028,7 +1028,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
@Override
public void run() {
if (responseHandlers.contains(requestId)) {
long timeoutTime = System.currentTimeMillis();
long timeoutTime = threadPool.relativeTimeInMillis();
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(node, action, sentTime, timeoutTime));
// now that we have the information visible via timeoutInfoHandlers, we try to remove the request id
final Transport.ResponseContext holder = responseHandlers.remove(requestId);

View File

@ -0,0 +1,362 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.coordination.LeaderChecker.LeaderCheckRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
public class LeaderCheckerTests extends ESTestCase {
public void testFollowerBehaviour() {
final DiscoveryNode leader1 = new DiscoveryNode("leader-1", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode leader2
= randomBoolean() ? leader1 : new DiscoveryNode("leader-2", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
Settings.Builder settingsBuilder = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId());
final long leaderCheckIntervalMillis;
if (randomBoolean()) {
leaderCheckIntervalMillis = randomLongBetween(1000, 60000);
settingsBuilder.put(LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckIntervalMillis + "ms");
} else {
leaderCheckIntervalMillis = LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis();
}
final long leaderCheckTimeoutMillis;
if (randomBoolean()) {
leaderCheckTimeoutMillis = randomLongBetween(1, 60000);
settingsBuilder.put(LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeoutMillis + "ms");
} else {
leaderCheckTimeoutMillis = LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis();
}
final int leaderCheckRetryCount;
if (randomBoolean()) {
leaderCheckRetryCount = randomIntBetween(1, 10);
settingsBuilder.put(LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount);
} else {
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY);
}
final AtomicLong checkCount = new AtomicLong();
final AtomicBoolean allResponsesFail = new AtomicBoolean();
final Settings settings = settingsBuilder.build();
logger.info("--> using {}", settings);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final MockTransport mockTransport = new MockTransport() {
int consecutiveFailedRequestsCount;
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
assertTrue(node.equals(leader1) || node.equals(leader2));
super.onSendRequest(requestId, action, request, node);
final boolean mustSucceed = leaderCheckRetryCount - 1 <= consecutiveFailedRequestsCount;
final long responseDelay = randomLongBetween(0, leaderCheckTimeoutMillis + (mustSucceed ? -1 : 60000));
final boolean successResponse = allResponsesFail.get() == false && (mustSucceed || randomBoolean());
if (responseDelay >= leaderCheckTimeoutMillis || successResponse == false) {
consecutiveFailedRequestsCount += 1;
} else {
consecutiveFailedRequestsCount = 0;
}
checkCount.incrementAndGet();
deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + responseDelay, new Runnable() {
@Override
public void run() {
if (successResponse) {
handleResponse(requestId, Empty.INSTANCE);
} else {
handleRemoteError(requestId, new ElasticsearchException("simulated error"));
}
}
@Override
public String toString() {
return (successResponse ? "successful" : "unsuccessful") + " response to request " + requestId;
}
});
}
};
final TransportService transportService = mockTransport.createTransportService(settings,
deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final AtomicBoolean leaderFailed = new AtomicBoolean();
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService,
() -> assertTrue(leaderFailed.compareAndSet(false, true)));
logger.info("--> creating first checker");
try (Releasable ignored = leaderChecker.startLeaderChecker(leader1)) {
final long maxCheckCount = randomLongBetween(2, 1000);
logger.info("--> checking that no failure is detected in {} checks", maxCheckCount);
while (checkCount.get() < maxCheckCount) {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.advanceTime();
}
}
logger.info("--> running remaining tasks");
deterministicTaskQueue.runAllTasks(random());
assertFalse(leaderFailed.get());
logger.info("--> creating second checker");
try (Releasable ignored = leaderChecker.startLeaderChecker(leader2)) {
checkCount.set(0);
final long maxCheckCount = randomLongBetween(2, 1000);
logger.info("--> checking again that no failure is detected in {} checks", maxCheckCount);
while (checkCount.get() < maxCheckCount) {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
final long failureTime = deterministicTaskQueue.getCurrentTimeMillis();
allResponsesFail.set(true);
logger.info("--> failing at {}ms", failureTime);
while (leaderFailed.get() == false) {
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
}
assertThat(deterministicTaskQueue.getCurrentTimeMillis() - failureTime,
lessThanOrEqualTo((leaderCheckIntervalMillis + leaderCheckTimeoutMillis) * leaderCheckRetryCount
+ leaderCheckTimeoutMillis // needed because a successful check response might be in flight at the time of failure
));
}
}
enum Response {
SUCCESS, REMOTE_ERROR, DIRECT_ERROR
}
public void testFollowerFailsImmediatelyOnDisconnection() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final Response[] responseHolder = new Response[]{Response.SUCCESS};
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
assertTrue(node.equals(leader));
final Response response = responseHolder[0];
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
switch (response) {
case SUCCESS:
handleResponse(requestId, Empty.INSTANCE);
break;
case REMOTE_ERROR:
handleRemoteError(requestId, new ConnectTransportException(leader, "simulated error"));
break;
case DIRECT_ERROR:
handleError(requestId, new ConnectTransportException(leader, "simulated error"));
}
}
@Override
public String toString() {
return response + " response to request " + requestId;
}
});
}
};
final TransportService transportService = mockTransport.createTransportService(settings,
deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final AtomicBoolean leaderFailed = new AtomicBoolean();
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService,
() -> assertTrue(leaderFailed.compareAndSet(false, true)));
try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) {
while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
assertFalse(leaderFailed.get());
responseHolder[0] = Response.REMOTE_ERROR;
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
assertTrue(leaderFailed.get());
}
deterministicTaskQueue.runAllTasks(random());
leaderFailed.set(false);
responseHolder[0] = Response.SUCCESS;
try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) {
while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) {
deterministicTaskQueue.runAllRunnableTasks(random());
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks(random());
assertFalse(leaderFailed.get());
responseHolder[0] = Response.DIRECT_ERROR;
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks(random());
assertTrue(leaderFailed.get());
}
}
public void testLeaderBehaviour() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings);
final CapturingTransport capturingTransport = new CapturingTransport();
final TransportService transportService = capturingTransport.createTransportService(settings,
deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> fail("shouldn't be checking anything"));
final DiscoveryNodes discoveryNodes
= DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build();
{
leaderChecker.setLastPublishedDiscoveryNodes(discoveryNodes);
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
deterministicTaskQueue.runAllTasks(random());
assertFalse(handler.successfulResponseReceived);
assertThat(handler.transportException.getRootCause(), instanceOf(CoordinationStateRejectedException.class));
CoordinationStateRejectedException cause = (CoordinationStateRejectedException) handler.transportException.getRootCause();
assertThat(cause.getMessage(), equalTo("leader check from unknown node"));
}
{
leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build());
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
deterministicTaskQueue.runAllTasks(random());
assertTrue(handler.successfulResponseReceived);
assertThat(handler.transportException, nullValue());
}
{
leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build());
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
deterministicTaskQueue.runAllTasks(random());
assertFalse(handler.successfulResponseReceived);
assertThat(handler.transportException.getRootCause(), instanceOf(CoordinationStateRejectedException.class));
CoordinationStateRejectedException cause = (CoordinationStateRejectedException) handler.transportException.getRootCause();
assertThat(cause.getMessage(), equalTo("non-leader rejecting leader check"));
}
}
private class CapturingTransportResponseHandler implements TransportResponseHandler<Empty> {
TransportException transportException;
boolean successfulResponseReceived;
@Override
public void handleResponse(Empty response) {
successfulResponseReceived = true;
}
@Override
public void handleException(TransportException exp) {
transportException = exp;
}
@Override
public String executor() {
return Names.GENERIC;
}
}
public void testLeaderCheckRequestEqualsHashcodeSerialization() {
LeaderCheckRequest request = new LeaderCheckRequest(
new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT));
EqualsHashCodeTestUtils.checkEqualsAndHashCode(request,
rq -> copyWriteable(rq, writableRegistry(), LeaderCheckRequest::new),
rq -> new LeaderCheckRequest(new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT)));
}
}

View File

@ -39,6 +39,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class DeterministicTaskQueue extends AbstractComponent {
@ -312,7 +313,25 @@ public class DeterministicTaskQueue extends AbstractComponent {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) {
scheduleAt(currentTimeMillis + delay.millis(), command);
final int NOT_STARTED = 0;
final int STARTED = 1;
final int CANCELLED = 2;
final AtomicInteger taskState = new AtomicInteger(NOT_STARTED);
scheduleAt(currentTimeMillis + delay.millis(), new Runnable() {
@Override
public void run() {
if (taskState.compareAndSet(NOT_STARTED, STARTED)) {
command.run();
}
}
@Override
public String toString() {
return command.toString();
}
});
return new ScheduledFuture<Object>() {
@Override
public long getDelay(TimeUnit unit) {
@ -326,12 +345,13 @@ public class DeterministicTaskQueue extends AbstractComponent {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
assert mayInterruptIfRunning == false;
return taskState.compareAndSet(NOT_STARTED, CANCELLED);
}
@Override
public boolean isCancelled() {
throw new UnsupportedOperationException();
return taskState.get() == CANCELLED;
}
@Override

View File

@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportStats;
@ -88,7 +89,10 @@ public class MockTransport implements Transport, LifecycleComponent {
*/
@SuppressWarnings("unchecked")
public void handleResponse(final long requestId, final TransportResponse response) {
responseHandlers.onResponseReceived(requestId, listener).handleResponse(response);
final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
if (transportResponseHandler != null) {
transportResponseHandler.handleResponse(response);
}
}
/**
@ -140,7 +144,10 @@ public class MockTransport implements Transport, LifecycleComponent {
* @param e the failure
*/
public void handleError(final long requestId, final TransportException e) {
responseHandlers.onResponseReceived(requestId, listener).handleException(e);
final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
if (transportResponseHandler != null) {
transportResponseHandler.handleException(e);
}
}
@Override

View File

@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;
@ -299,6 +300,12 @@ public class DeterministicTaskQueueTests extends ESTestCase {
taskQueue.runAllTasks();
assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1));
final TimeValue cancelledDelay = TimeValue.timeValueMillis(randomLongBetween(1, 100));
final ScheduledFuture<?> future = threadPool.schedule(cancelledDelay, "", () -> strings.add("cancelled before execution"));
future.cancel(false);
taskQueue.runAllTasks(random());
assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred"));
}