diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java new file mode 100644 index 00000000000..b8b61f76f0a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -0,0 +1,282 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportRequestOptions.Type; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are + * fairly lenient, possibly allowing multiple checks to fail before considering the leader to be faulty, to allow for the leader to + * temporarily stand down on occasion, e.g. if it needs to move to a higher term. On deciding that the leader has failed a follower will + * become a candidate and attempt to become a leader itself. + */ +public class LeaderChecker extends AbstractComponent { + + public static final String LEADER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/leader_check"; + + // the time between checks sent to the leader + public static final Setting LEADER_CHECK_INTERVAL_SETTING = + Setting.timeSetting("cluster.fault_detection.leader_check.interval", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope); + + // the timeout for each check sent to the leader + public static final Setting LEADER_CHECK_TIMEOUT_SETTING = + Setting.timeSetting("cluster.fault_detection.leader_check.timeout", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + // the number of failed checks that must happen before the leader is considered to have failed. + public static final Setting LEADER_CHECK_RETRY_COUNT_SETTING = + Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope); + + private final TimeValue leaderCheckInterval; + private final TimeValue leaderCheckTimeout; + private final int leaderCheckRetryCount; + private final TransportService transportService; + private final Runnable onLeaderFailure; + + private volatile DiscoveryNodes lastPublishedDiscoveryNodes; + + public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) { + super(settings); + leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); + leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings); + leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings); + this.transportService = transportService; + this.onLeaderFailure = onLeaderFailure; + + transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck); + } + + /** + * Start a leader checker for the given leader. Should only be called after successfully joining this leader. + * + * @param leader the node to be checked as leader + * @return a `Releasable` that can be used to stop this checker. + */ + public Releasable startLeaderChecker(final DiscoveryNode leader) { + assert transportService.getLocalNode().equals(leader) == false; + CheckScheduler checkScheduler = new CheckScheduler(leader); + checkScheduler.handleWakeUp(); + return checkScheduler; + } + + /** + * Update the "known" discovery nodes. Should be called on the leader before a new cluster state is published to reflect the new + * publication targets, and also called if a leader becomes a non-leader. + * TODO if heartbeats can make nodes become followers then this needs to be called before a heartbeat is sent to a new node too. + * + * isLocalNodeElectedMaster() should reflect whether this node is a leader, and nodeExists() + * should indicate whether nodes are known publication targets or not. + */ + public void setLastPublishedDiscoveryNodes(DiscoveryNodes discoveryNodes) { + logger.trace("updating last-published nodes: {}", discoveryNodes); + lastPublishedDiscoveryNodes = discoveryNodes; + } + + private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException { + final DiscoveryNodes lastPublishedDiscoveryNodes = this.lastPublishedDiscoveryNodes; + assert lastPublishedDiscoveryNodes != null; + + if (lastPublishedDiscoveryNodes.isLocalNodeElectedMaster() == false) { + logger.debug("non-master handling {}", request); + transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check")); + } else if (lastPublishedDiscoveryNodes.nodeExists(request.getSender()) == false) { + logger.debug("leader check from unknown node: {}", request); + transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node")); + } else { + logger.trace("handling {}", request); + transportChannel.sendResponse(Empty.INSTANCE); + } + } + + private class CheckScheduler implements Releasable { + + private final AtomicBoolean isClosed = new AtomicBoolean(); + private final AtomicLong failureCountSinceLastSuccess = new AtomicLong(); + private final DiscoveryNode leader; + + CheckScheduler(final DiscoveryNode leader) { + this.leader = leader; + } + + @Override + public void close() { + if (isClosed.compareAndSet(false, true) == false) { + logger.debug("already closed"); + } else { + logger.debug("closed"); + } + } + + void handleWakeUp() { + if (isClosed.get()) { + logger.debug("closed check scheduler woken up, doing nothing"); + return; + } + + logger.trace("checking {} with [{}] = {}", leader, LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeout); + + // TODO lag detection: + // In the PoC, the leader sent its current version to the follower in the response to a LeaderCheck, so the follower + // could detect if it was lagging. We'd prefer this to be implemented on the leader, so the response is just + // TransportResponse.Empty here. + transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()), + TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(), + + new TransportResponseHandler() { + @Override + public void handleResponse(Empty response) { + if (isClosed.get()) { + logger.debug("closed check scheduler received a response, doing nothing"); + return; + } + + failureCountSinceLastSuccess.set(0); + scheduleNextWakeUp(); // logs trace message indicating success + } + + @Override + public void handleException(TransportException exp) { + if (isClosed.get()) { + logger.debug("closed check scheduler received a response, doing nothing"); + return; + } + + if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { + logger.debug(new ParameterizedMessage("leader [{}] disconnected, failing immediately", leader), exp); + leaderFailed(); + return; + } + + long failureCount = failureCountSinceLastSuccess.incrementAndGet(); + if (failureCount >= leaderCheckRetryCount) { + logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader [{}] has failed", + failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp); + leaderFailed(); + return; + } + + logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) with leader [{}]", + failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp); + scheduleNextWakeUp(); + } + + @Override + public String executor() { + return Names.SAME; + } + }); + } + + private void leaderFailed() { + if (isClosed.compareAndSet(false, true)) { + transportService.getThreadPool().generic().execute(onLeaderFailure); + } else { + logger.debug("already closed, not failing leader"); + } + } + + private void scheduleNextWakeUp() { + logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval); + transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() { + @Override + public void run() { + handleWakeUp(); + } + + @Override + public String toString() { + return "scheduled check of leader " + leader; + } + }); + } + } + + public static class LeaderCheckRequest extends TransportRequest { + + private final DiscoveryNode sender; + + public LeaderCheckRequest(final DiscoveryNode sender) { + this.sender = sender; + } + + public LeaderCheckRequest(final StreamInput in) throws IOException { + super(in); + sender = new DiscoveryNode(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + sender.writeTo(out); + } + + public DiscoveryNode getSender() { + return sender; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final LeaderCheckRequest that = (LeaderCheckRequest) o; + return Objects.equals(sender, that.sender); + } + + @Override + public int hashCode() { + return Objects.hash(sender); + } + + @Override + public String toString() { + return "LeaderCheckRequest{" + + "sender=" + sender + + '}'; + } + } +} + diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 39ae48e0f39..1a498f38389 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -951,7 +951,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran assert responseHandlers.contains(requestId) == false; TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId); if (timeoutInfoHolder != null) { - long time = System.currentTimeMillis(); + long time = threadPool.relativeTimeInMillis(); logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " + "action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId); @@ -1014,7 +1014,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran final class TimeoutHandler implements Runnable { private final long requestId; - private final long sentTime = System.currentTimeMillis(); + private final long sentTime = threadPool.relativeTimeInMillis(); private final String action; private final DiscoveryNode node; volatile ScheduledFuture future; @@ -1028,7 +1028,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran @Override public void run() { if (responseHandlers.contains(requestId)) { - long timeoutTime = System.currentTimeMillis(); + long timeoutTime = threadPool.relativeTimeInMillis(); timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(node, action, sentTime, timeoutTime)); // now that we have the information visible via timeoutInfoHandlers, we try to remove the request id final Transport.ResponseContext holder = responseHandlers.remove(requestId); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java new file mode 100644 index 00000000000..5f85a951eef --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -0,0 +1,362 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.coordination.LeaderChecker.LeaderCheckRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_ACTION_NAME; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; + +public class LeaderCheckerTests extends ESTestCase { + + public void testFollowerBehaviour() { + final DiscoveryNode leader1 = new DiscoveryNode("leader-1", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode leader2 + = randomBoolean() ? leader1 : new DiscoveryNode("leader-2", buildNewFakeTransportAddress(), Version.CURRENT); + + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + Settings.Builder settingsBuilder = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()); + + final long leaderCheckIntervalMillis; + if (randomBoolean()) { + leaderCheckIntervalMillis = randomLongBetween(1000, 60000); + settingsBuilder.put(LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckIntervalMillis + "ms"); + } else { + leaderCheckIntervalMillis = LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + } + + final long leaderCheckTimeoutMillis; + if (randomBoolean()) { + leaderCheckTimeoutMillis = randomLongBetween(1, 60000); + settingsBuilder.put(LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeoutMillis + "ms"); + } else { + leaderCheckTimeoutMillis = LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis(); + } + + final int leaderCheckRetryCount; + if (randomBoolean()) { + leaderCheckRetryCount = randomIntBetween(1, 10); + settingsBuilder.put(LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount); + } else { + leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY); + } + + final AtomicLong checkCount = new AtomicLong(); + final AtomicBoolean allResponsesFail = new AtomicBoolean(); + + final Settings settings = settingsBuilder.build(); + logger.info("--> using {}", settings); + + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); + final MockTransport mockTransport = new MockTransport() { + + int consecutiveFailedRequestsCount; + + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); + assertTrue(node.equals(leader1) || node.equals(leader2)); + super.onSendRequest(requestId, action, request, node); + + final boolean mustSucceed = leaderCheckRetryCount - 1 <= consecutiveFailedRequestsCount; + final long responseDelay = randomLongBetween(0, leaderCheckTimeoutMillis + (mustSucceed ? -1 : 60000)); + final boolean successResponse = allResponsesFail.get() == false && (mustSucceed || randomBoolean()); + + if (responseDelay >= leaderCheckTimeoutMillis || successResponse == false) { + consecutiveFailedRequestsCount += 1; + } else { + consecutiveFailedRequestsCount = 0; + } + + checkCount.incrementAndGet(); + + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + responseDelay, new Runnable() { + @Override + public void run() { + if (successResponse) { + handleResponse(requestId, Empty.INSTANCE); + } else { + handleRemoteError(requestId, new ElasticsearchException("simulated error")); + } + } + + @Override + public String toString() { + return (successResponse ? "successful" : "unsuccessful") + " response to request " + requestId; + } + }); + } + }; + + final TransportService transportService = mockTransport.createTransportService(settings, + deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final AtomicBoolean leaderFailed = new AtomicBoolean(); + + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, + () -> assertTrue(leaderFailed.compareAndSet(false, true))); + + logger.info("--> creating first checker"); + try (Releasable ignored = leaderChecker.startLeaderChecker(leader1)) { + final long maxCheckCount = randomLongBetween(2, 1000); + logger.info("--> checking that no failure is detected in {} checks", maxCheckCount); + while (checkCount.get() < maxCheckCount) { + deterministicTaskQueue.runAllRunnableTasks(random()); + deterministicTaskQueue.advanceTime(); + } + } + + logger.info("--> running remaining tasks"); + deterministicTaskQueue.runAllTasks(random()); + assertFalse(leaderFailed.get()); + + logger.info("--> creating second checker"); + try (Releasable ignored = leaderChecker.startLeaderChecker(leader2)) { + checkCount.set(0); + final long maxCheckCount = randomLongBetween(2, 1000); + logger.info("--> checking again that no failure is detected in {} checks", maxCheckCount); + while (checkCount.get() < maxCheckCount) { + deterministicTaskQueue.runAllRunnableTasks(random()); + deterministicTaskQueue.advanceTime(); + } + + deterministicTaskQueue.runAllRunnableTasks(random()); + + final long failureTime = deterministicTaskQueue.getCurrentTimeMillis(); + allResponsesFail.set(true); + logger.info("--> failing at {}ms", failureTime); + + while (leaderFailed.get() == false) { + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(random()); + } + + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - failureTime, + lessThanOrEqualTo((leaderCheckIntervalMillis + leaderCheckTimeoutMillis) * leaderCheckRetryCount + + leaderCheckTimeoutMillis // needed because a successful check response might be in flight at the time of failure + )); + } + } + + enum Response { + SUCCESS, REMOTE_ERROR, DIRECT_ERROR + } + + public void testFollowerFailsImmediatelyOnDisconnection() { + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT); + + final Response[] responseHolder = new Response[]{Response.SUCCESS}; + + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); + final MockTransport mockTransport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); + assertTrue(node.equals(leader)); + final Response response = responseHolder[0]; + + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + switch (response) { + case SUCCESS: + handleResponse(requestId, Empty.INSTANCE); + break; + case REMOTE_ERROR: + handleRemoteError(requestId, new ConnectTransportException(leader, "simulated error")); + break; + case DIRECT_ERROR: + handleError(requestId, new ConnectTransportException(leader, "simulated error")); + } + } + + @Override + public String toString() { + return response + " response to request " + requestId; + } + }); + } + }; + + final TransportService transportService = mockTransport.createTransportService(settings, + deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final AtomicBoolean leaderFailed = new AtomicBoolean(); + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, + () -> assertTrue(leaderFailed.compareAndSet(false, true))); + + try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) { + while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { + deterministicTaskQueue.runAllRunnableTasks(random()); + deterministicTaskQueue.advanceTime(); + } + + deterministicTaskQueue.runAllRunnableTasks(random()); + assertFalse(leaderFailed.get()); + + responseHolder[0] = Response.REMOTE_ERROR; + + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(random()); + + assertTrue(leaderFailed.get()); + } + + deterministicTaskQueue.runAllTasks(random()); + leaderFailed.set(false); + responseHolder[0] = Response.SUCCESS; + + try (Releasable ignored = leaderChecker.startLeaderChecker(leader)) { + while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { + deterministicTaskQueue.runAllRunnableTasks(random()); + deterministicTaskQueue.advanceTime(); + } + + deterministicTaskQueue.runAllRunnableTasks(random()); + assertFalse(leaderFailed.get()); + + responseHolder[0] = Response.DIRECT_ERROR; + + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(random()); + + assertTrue(leaderFailed.get()); + } + } + + public void testLeaderBehaviour() { + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings); + final CapturingTransport capturingTransport = new CapturingTransport(); + + final TransportService transportService = capturingTransport.createTransportService(settings, + deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> fail("shouldn't be checking anything")); + + final DiscoveryNodes discoveryNodes + = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build(); + + { + leaderChecker.setLastPublishedDiscoveryNodes(discoveryNodes); + + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); + transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); + deterministicTaskQueue.runAllTasks(random()); + + assertFalse(handler.successfulResponseReceived); + assertThat(handler.transportException.getRootCause(), instanceOf(CoordinationStateRejectedException.class)); + CoordinationStateRejectedException cause = (CoordinationStateRejectedException) handler.transportException.getRootCause(); + assertThat(cause.getMessage(), equalTo("leader check from unknown node")); + } + + { + leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build()); + + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); + transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); + deterministicTaskQueue.runAllTasks(random()); + + assertTrue(handler.successfulResponseReceived); + assertThat(handler.transportException, nullValue()); + } + + { + leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build()); + + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); + transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); + deterministicTaskQueue.runAllTasks(random()); + + assertFalse(handler.successfulResponseReceived); + assertThat(handler.transportException.getRootCause(), instanceOf(CoordinationStateRejectedException.class)); + CoordinationStateRejectedException cause = (CoordinationStateRejectedException) handler.transportException.getRootCause(); + assertThat(cause.getMessage(), equalTo("non-leader rejecting leader check")); + } + } + + private class CapturingTransportResponseHandler implements TransportResponseHandler { + + TransportException transportException; + boolean successfulResponseReceived; + + @Override + public void handleResponse(Empty response) { + successfulResponseReceived = true; + } + + @Override + public void handleException(TransportException exp) { + transportException = exp; + } + + @Override + public String executor() { + return Names.GENERIC; + } + } + + public void testLeaderCheckRequestEqualsHashcodeSerialization() { + LeaderCheckRequest request = new LeaderCheckRequest( + new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(request, + rq -> copyWriteable(rq, writableRegistry(), LeaderCheckRequest::new), + rq -> new LeaderCheckRequest(new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT))); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 41e0414570c..e3c69b03e37 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -39,6 +39,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class DeterministicTaskQueue extends AbstractComponent { @@ -312,7 +313,25 @@ public class DeterministicTaskQueue extends AbstractComponent { @Override public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { - scheduleAt(currentTimeMillis + delay.millis(), command); + final int NOT_STARTED = 0; + final int STARTED = 1; + final int CANCELLED = 2; + final AtomicInteger taskState = new AtomicInteger(NOT_STARTED); + + scheduleAt(currentTimeMillis + delay.millis(), new Runnable() { + @Override + public void run() { + if (taskState.compareAndSet(NOT_STARTED, STARTED)) { + command.run(); + } + } + + @Override + public String toString() { + return command.toString(); + } + }); + return new ScheduledFuture() { @Override public long getDelay(TimeUnit unit) { @@ -326,12 +345,13 @@ public class DeterministicTaskQueue extends AbstractComponent { @Override public boolean cancel(boolean mayInterruptIfRunning) { - return false; + assert mayInterruptIfRunning == false; + return taskState.compareAndSet(NOT_STARTED, CANCELLED); } @Override public boolean isCancelled() { - throw new UnsupportedOperationException(); + return taskState.get() == CANCELLED; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index caa24a31320..1fb942f857a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; @@ -88,7 +89,10 @@ public class MockTransport implements Transport, LifecycleComponent { */ @SuppressWarnings("unchecked") public void handleResponse(final long requestId, final TransportResponse response) { - responseHandlers.onResponseReceived(requestId, listener).handleResponse(response); + final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); + if (transportResponseHandler != null) { + transportResponseHandler.handleResponse(response); + } } /** @@ -140,7 +144,10 @@ public class MockTransport implements Transport, LifecycleComponent { * @param e the failure */ public void handleError(final long requestId, final TransportException e) { - responseHandlers.onResponseReceived(requestId, listener).handleException(e); + final TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); + if (transportResponseHandler != null) { + transportResponseHandler.handleException(e); + } } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java index 5a1347a7aa9..26c554f9d7b 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC; @@ -299,6 +300,12 @@ public class DeterministicTaskQueueTests extends ESTestCase { taskQueue.runAllTasks(); assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1)); + final TimeValue cancelledDelay = TimeValue.timeValueMillis(randomLongBetween(1, 100)); + final ScheduledFuture future = threadPool.schedule(cancelledDelay, "", () -> strings.add("cancelled before execution")); + + future.cancel(false); + taskQueue.runAllTasks(random()); + assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); }