From d995fc85c60a639d827b45263cf4e29fba4f2d08 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 26 Sep 2018 12:18:13 +0100 Subject: [PATCH] Integrate LeaderChecker with Coordinator (#34049) This change ensures that follower nodes periodically check that their leader is healthy, and that they elect a new leader if not. --- .../cluster/coordination/Coordinator.java | 45 ++++ .../cluster/coordination/LeaderChecker.java | 21 +- .../TransportReplicationActionTests.java | 3 + .../coordination/CoordinatorTests.java | 108 ++++++++- .../coordination/LeaderCheckerTests.java | 6 +- .../cluster/coordination/NodeJoinTests.java | 45 ++-- .../disruption/DisruptableMockTransport.java | 227 ++++++++++-------- 7 files changed, 308 insertions(+), 147 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 5a04e5a5444..c6e54455566 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -78,10 +78,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final UnicastConfiguredHostsResolver configuredHostsResolver; private final TimeValue publishTimeout; private final PublicationTransportHandler publicationHandler; + private final LeaderChecker leaderChecker; @Nullable private Releasable electionScheduler; @Nullable private Releasable prevotingRound; + @Nullable + private Releasable leaderCheckScheduler; private AtomicLong maxTermSeen = new AtomicLong(); private Mode mode; @@ -108,10 +111,27 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.peerFinder = new CoordinatorPeerFinder(settings, transportService, new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit); + this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); masterService.setClusterStateSupplier(this::getStateForMasterService); } + private Runnable getOnLeaderFailure() { + return new Runnable() { + @Override + public void run() { + synchronized (mutex) { + becomeCandidate("onLeaderFailure"); + } + } + + @Override + public String toString() { + return "notification of leader failure"; + } + }; + } + private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) { synchronized (mutex) { logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest); @@ -233,6 +253,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery joinAccumulator = joinHelper.new CandidateJoinAccumulator(); peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes()); + leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); + + if (leaderCheckScheduler != null) { + leaderCheckScheduler.close(); + leaderCheckScheduler = null; + } } preVoteCollector.update(getPreVoteResponse(), null); @@ -251,16 +277,21 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery peerFinder.deactivate(getLocalNode()); closePrevotingAndElectionScheduler(); preVoteCollector.update(getPreVoteResponse(), getLocalNode()); + + assert leaderCheckScheduler == null : leaderCheckScheduler; } void becomeFollower(String method, DiscoveryNode leaderNode) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader); + final boolean restartLeaderChecker = (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) == false; + if (mode != Mode.FOLLOWER) { mode = Mode.FOLLOWER; joinAccumulator.close(mode); joinAccumulator = new JoinHelper.FollowerJoinAccumulator(); + leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); } lastKnownLeader = Optional.of(leaderNode); @@ -268,6 +299,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery closePrevotingAndElectionScheduler(); cancelActivePublication(); preVoteCollector.update(getPreVoteResponse(), leaderNode); + + if (restartLeaderChecker) { + if (leaderCheckScheduler != null) { + leaderCheckScheduler.close(); + } + leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode); + } } private PreVoteResponse getPreVoteResponse() { @@ -339,6 +377,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert getStateForMasterService().nodes().getMasterNodeId() != null || getStateForMasterService().term() != getCurrentTerm() : getStateForMasterService(); + assert leaderCheckScheduler == null : leaderCheckScheduler; } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); @@ -347,12 +386,16 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert electionScheduler == null : electionScheduler; assert prevotingRound == null : prevotingRound; assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService(); + assert leaderChecker.currentNodeIsMaster() == false; + assert leaderCheckScheduler != null; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; assert peerFinderLeader.isPresent() == false : peerFinderLeader; assert prevotingRound == null || electionScheduler != null; assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService(); + assert leaderChecker.currentNodeIsMaster() == false; + assert leaderCheckScheduler == null : leaderCheckScheduler; } } } @@ -577,6 +620,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery return "scheduled timeout for " + publication; } }); + + leaderChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes()); publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here } } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 2dd98643b2d..fca23018a4f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -77,7 +77,7 @@ public class LeaderChecker extends AbstractComponent { private final TransportService transportService; private final Runnable onLeaderFailure; - private volatile DiscoveryNodes lastPublishedDiscoveryNodes; + private volatile DiscoveryNodes discoveryNodes; public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) { super(settings); @@ -111,19 +111,24 @@ public class LeaderChecker extends AbstractComponent { * isLocalNodeElectedMaster() should reflect whether this node is a leader, and nodeExists() * should indicate whether nodes are known publication targets or not. */ - public void setLastPublishedDiscoveryNodes(DiscoveryNodes discoveryNodes) { - logger.trace("updating last-published nodes: {}", discoveryNodes); - lastPublishedDiscoveryNodes = discoveryNodes; + public void setCurrentNodes(DiscoveryNodes discoveryNodes) { + logger.trace("setCurrentNodes: {}", discoveryNodes); + this.discoveryNodes = discoveryNodes; + } + + // For assertions + boolean currentNodeIsMaster() { + return discoveryNodes.isLocalNodeElectedMaster(); } private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException { - final DiscoveryNodes lastPublishedDiscoveryNodes = this.lastPublishedDiscoveryNodes; - assert lastPublishedDiscoveryNodes != null; + final DiscoveryNodes discoveryNodes = this.discoveryNodes; + assert discoveryNodes != null; - if (lastPublishedDiscoveryNodes.isLocalNodeElectedMaster() == false) { + if (discoveryNodes.isLocalNodeElectedMaster() == false) { logger.debug("non-master handling {}", request); transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check")); - } else if (lastPublishedDiscoveryNodes.nodeExists(request.getSender()) == false) { + } else if (discoveryNodes.nodeExists(request.getSender()) == false) { logger.debug("leader check from unknown node: {}", request); transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node")); } else { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 05a59c11937..96861457dd3 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1105,6 +1105,9 @@ public class TransportReplicationActionTests extends ESTestCase { } static class TestResponse extends ReplicationResponse { + TestResponse() { + setShardInfo(new ShardInfo()); + } } private class TestAction extends TransportReplicationAction { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 70d8022ac43..428d03720cd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport; +import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; @@ -59,7 +60,11 @@ import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; +import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; @@ -68,7 +73,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.cluster.discovery:TRACE") +@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { public void testCanUpdateClusterStateAfterStabilisation() { @@ -101,6 +106,40 @@ public class CoordinatorTests extends ESTestCase { assertEquals(currentTerm, newTerm); } + public void testLeaderDisconnectionDetectedQuickly() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.stabilise(); + + final ClusterNode originalLeader = cluster.getAnyLeader(); + logger.info("--> disconnecting leader {}", originalLeader); + originalLeader.disconnect(); + + synchronized (originalLeader.coordinator.mutex) { + originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated + } + + cluster.stabilise(); + assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); + } + + public void testUnresponsiveLeaderDetectedEventually() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.stabilise(); + + final ClusterNode originalLeader = cluster.getAnyLeader(); + logger.info("--> partitioning leader {}", originalLeader); + originalLeader.partition(); + + synchronized (originalLeader.coordinator.mutex) { + originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated + } + + cluster.stabilise(Cluster.DEFAULT_STABILISATION_TIME + + (LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis()) + * LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY)); + assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); + } + private static String nodeIdFromIndex(int nodeIndex) { return "node" + nodeIndex; } @@ -115,6 +154,9 @@ public class CoordinatorTests extends ESTestCase { Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build()); private final VotingConfiguration initialConfiguration; + private final Set disconnectedNodes = new HashSet<>(); + private final Set blackholedNodes = new HashSet<>(); + Cluster(int initialNodeCount) { logger.info("--> creating cluster of {} nodes", initialNodeCount); @@ -142,8 +184,12 @@ public class CoordinatorTests extends ESTestCase { } void stabilise() { + stabilise(DEFAULT_STABILISATION_TIME); + } + + void stabilise(long stabilisationTime) { final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis(); - while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + DEFAULT_STABILISATION_TIME) { + while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + stabilisationTime) { while (deterministicTaskQueue.hasRunnableTasks()) { try { @@ -182,16 +228,21 @@ public class CoordinatorTests extends ESTestCase { } final String nodeId = clusterNode.getId(); - assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - // TODO assert that all nodes have actually voted for the leader in this term - assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); - assertThat(nodeId + " is at the same accepted version as the leader", - Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion); - assertThat(nodeId + " is at the same committed version as the leader", - clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion); - assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), - equalTo(Optional.of(true))); + if (disconnectedNodes.contains(nodeId) || blackholedNodes.contains(nodeId)) { + assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); + } else { + assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); + // TODO assert that all nodes have actually voted for the leader in this term + + assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); + assertThat(nodeId + " is at the same accepted version as the leader", + Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion); + assertThat(nodeId + " is at the same committed version as the leader", + clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion); + assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), + equalTo(Optional.of(true))); + } } assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize), @@ -204,6 +255,18 @@ public class CoordinatorTests extends ESTestCase { return randomFrom(allLeaders); } + private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) { + ConnectionStatus connectionStatus; + if (blackholedNodes.contains(sender.getId()) || blackholedNodes.contains(destination.getId())) { + connectionStatus = ConnectionStatus.BLACK_HOLE; + } else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) { + connectionStatus = ConnectionStatus.DISCONNECTED; + } else { + connectionStatus = ConnectionStatus.CONNECTED; + } + return connectionStatus; + } + class ClusterNode extends AbstractComponent { private final int nodeIndex; private Coordinator coordinator; @@ -241,7 +304,7 @@ public class CoordinatorTests extends ESTestCase { @Override protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) { - return ConnectionStatus.CONNECTED; + return Cluster.this.getConnectionStatus(sender, destination); } @Override @@ -264,6 +327,17 @@ public class CoordinatorTests extends ESTestCase { deterministicTaskQueue.scheduleNow(onNode(destination, doDelivery)); } } + + @Override + protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNode destination) { + if (action.equals(HANDSHAKE_ACTION_NAME)) { + logger.trace("ignoring blackhole and delivering {}", getRequestDescription(requestId, action, destination)); + // handshakes always have a timeout, and are sent in a blocking fashion, so we must respond with an exception. + sendFromTo(destination, getLocalNode(), action, getDisconnectException(requestId, action, destination)); + } else { + super.onBlackholedDuringSend(requestId, action, destination); + } + } }; masterService = new FakeThreadPoolMasterService("test", @@ -290,7 +364,7 @@ public class CoordinatorTests extends ESTestCase { return localNode.getId(); } - public DiscoveryNode getLocalNode() { + DiscoveryNode getLocalNode() { return localNode; } @@ -316,6 +390,14 @@ public class CoordinatorTests extends ESTestCase { public String toString() { return localNode.toString(); } + + void disconnect() { + disconnectedNodes.add(localNode.getId()); + } + + void partition() { + blackholedNodes.add(localNode.getId()); + } } private List provideUnicastHosts(HostsResolver ignored) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index 5f85a951eef..582751be230 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -294,7 +294,7 @@ public class LeaderCheckerTests extends ESTestCase { = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build(); { - leaderChecker.setLastPublishedDiscoveryNodes(discoveryNodes); + leaderChecker.setCurrentNodes(discoveryNodes); final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); @@ -307,7 +307,7 @@ public class LeaderCheckerTests extends ESTestCase { } { - leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build()); + leaderChecker.setCurrentNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build()); final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); @@ -318,7 +318,7 @@ public class LeaderCheckerTests extends ESTestCase { } { - leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build()); + leaderChecker.setCurrentNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build()); final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 426ad108c7b..b1ba4bfd96a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterServiceTests; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -38,17 +39,18 @@ import org.elasticsearch.node.Node; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.mockito.ArgumentCaptor; import java.util.ArrayList; import java.util.Collections; @@ -66,12 +68,8 @@ import java.util.stream.IntStream; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; +import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.containsString; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; @TestLogging("org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE") public class NodeJoinTests extends ESTestCase { @@ -81,7 +79,7 @@ public class NodeJoinTests extends ESTestCase { private MasterService masterService; private Coordinator coordinator; private DeterministicTaskQueue deterministicTaskQueue; - private TransportRequestHandler transportRequestHandler; + private RequestHandlerRegistry transportRequestHandler; @BeforeClass public static void beforeClass() { @@ -144,20 +142,29 @@ public class NodeJoinTests extends ESTestCase { throw new IllegalStateException("method setupMasterServiceAndCoordinator can only be called once"); } this.masterService = masterService; - TransportService transportService = mock(TransportService.class); - when(transportService.getLocalNode()).thenReturn(initialState.nodes().getLocalNode()); - when(transportService.getThreadPool()).thenReturn(threadPool); - @SuppressWarnings("unchecked") - ArgumentCaptor> joinRequestHandler = ArgumentCaptor.forClass( - (Class) TransportRequestHandler.class); + CapturingTransport capturingTransport = new CapturingTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) { + if (action.equals(HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(), + destination.getVersion())); + } else { + super.onSendRequest(requestId, action, request, destination); + } + } + }; + TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> initialState.nodes().getLocalNode(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), Collections.emptySet()); coordinator = new Coordinator(Settings.EMPTY, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, () -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList(), random); - verify(transportService).registerRequestHandler(eq(JoinHelper.JOIN_ACTION_NAME), eq(ThreadPool.Names.GENERIC), eq(false), eq(false), - anyObject(), joinRequestHandler.capture()); - transportRequestHandler = joinRequestHandler.getValue(); + transportService.start(); + transportService.acceptIncomingRequests(); + transportRequestHandler = capturingTransport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME); coordinator.start(); coordinator.startInitialJoin(); } @@ -202,7 +209,7 @@ public class NodeJoinTests extends ESTestCase { // clone the node before submitting to simulate an incoming join, which is guaranteed to have a new // disco node object serialized off the network try { - transportRequestHandler.messageReceived(joinRequest, new TransportChannel() { + transportRequestHandler.processMessageReceived(joinRequest, new TransportChannel() { @Override public String getProfileName() { return "dummy"; @@ -229,7 +236,7 @@ public class NodeJoinTests extends ESTestCase { logger.error(() -> new ParameterizedMessage("unexpected error for {}", future), e); future.markAsFailed(e); } - }, null); + }); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("unexpected error for {}", future), e); future.markAsFailed(e); diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index c6139f4e733..b12596c8b38 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -51,7 +51,7 @@ public abstract class DisruptableMockTransport extends MockTransport { protected abstract void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery); - private void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) { + protected final void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) { handle(sender, destination, action, new Runnable() { @Override public void run() { @@ -74,10 +74,33 @@ public abstract class DisruptableMockTransport extends MockTransport { assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself"; - final String requestDescription = new ParameterizedMessage("[{}][{}] from {} to {}", - action, requestId, getLocalNode(), destination).getFormattedMessage(); + sendFromTo(getLocalNode(), destination, action, new Runnable() { + @Override + public void run() { + switch (getConnectionStatus(getLocalNode(), destination)) { + case BLACK_HOLE: + onBlackholedDuringSend(requestId, action, destination); + break; - final Runnable returnConnectException = new Runnable() { + case DISCONNECTED: + onDisconnectedDuringSend(requestId, action, destination); + break; + + case CONNECTED: + onConnectedDuringSend(requestId, action, request, destination); + break; + } + } + + @Override + public String toString() { + return getRequestDescription(requestId, action, destination); + } + }); + } + + protected Runnable getDisconnectException(long requestId, String action, DiscoveryNode destination) { + return new Runnable() { @Override public void run() { handleError(requestId, new ConnectTransportException(destination, "disconnected")); @@ -85,111 +108,107 @@ public abstract class DisruptableMockTransport extends MockTransport { @Override public String toString() { - return "disconnection response to " + requestDescription; + return "disconnection response to " + getRequestDescription(requestId, action, destination); + } + }; + } + + protected String getRequestDescription(long requestId, String action, DiscoveryNode destination) { + return new ParameterizedMessage("[{}][{}] from {} to {}", + action, requestId, getLocalNode(), destination).getFormattedMessage(); + } + + protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNode destination) { + logger.trace("dropping {}", getRequestDescription(requestId, action, destination)); + } + + protected void onDisconnectedDuringSend(long requestId, String action, DiscoveryNode destination) { + sendFromTo(destination, getLocalNode(), action, getDisconnectException(requestId, action, destination)); + } + + protected void onConnectedDuringSend(long requestId, String action, TransportRequest request, DiscoveryNode destination) { + Optional destinationTransport = getDisruptedCapturingTransport(destination, action); + assert destinationTransport.isPresent(); + + final RequestHandlerRegistry requestHandler = + destinationTransport.get().getRequestHandler(action); + + final String requestDescription = getRequestDescription(requestId, action, destination); + + final TransportChannel transportChannel = new TransportChannel() { + @Override + public String getProfileName() { + return "default"; + } + + @Override + public String getChannelType() { + return "disruptable-mock-transport-channel"; + } + + @Override + public void sendResponse(final TransportResponse response) { + sendFromTo(destination, getLocalNode(), action, new Runnable() { + @Override + public void run() { + if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) { + logger.trace("dropping response to {}: channel is not CONNECTED", + requestDescription); + } else { + handleResponse(requestId, response); + } + } + + @Override + public String toString() { + return "response to " + requestDescription; + } + }); + } + + @Override + public void sendResponse(TransportResponse response, + TransportResponseOptions options) { + sendResponse(response); + } + + @Override + public void sendResponse(Exception exception) { + sendFromTo(destination, getLocalNode(), action, new Runnable() { + @Override + public void run() { + if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) { + logger.trace("dropping response to {}: channel is not CONNECTED", + requestDescription); + } else { + handleRemoteError(requestId, exception); + } + } + + @Override + public String toString() { + return "error response to " + requestDescription; + } + }); } }; - sendFromTo(getLocalNode(), destination, action, new Runnable() { - @Override - public void run() { - switch (getConnectionStatus(getLocalNode(), destination)) { - case BLACK_HOLE: - logger.trace("dropping {}", requestDescription); - break; + final TransportRequest copiedRequest; + try { + copiedRequest = copyWriteable(request, writeableRegistry(), requestHandler::newRequest); + } catch (IOException e) { + throw new AssertionError("exception de/serializing request", e); + } - case DISCONNECTED: - sendFromTo(destination, getLocalNode(), action, returnConnectException); - break; - - case CONNECTED: - Optional destinationTransport = getDisruptedCapturingTransport(destination, action); - assert destinationTransport.isPresent(); - - final RequestHandlerRegistry requestHandler = - destinationTransport.get().getRequestHandler(action); - - final TransportChannel transportChannel = new TransportChannel() { - @Override - public String getProfileName() { - return "default"; - } - - @Override - public String getChannelType() { - return "disruptable-mock-transport-channel"; - } - - @Override - public void sendResponse(final TransportResponse response) { - sendFromTo(destination, getLocalNode(), action, new Runnable() { - @Override - public void run() { - if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) { - logger.trace("dropping response to {}: channel is not CONNECTED", - requestDescription); - } else { - handleResponse(requestId, response); - } - } - - @Override - public String toString() { - return "response to " + requestDescription; - } - }); - } - - @Override - public void sendResponse(TransportResponse response, - TransportResponseOptions options) { - sendResponse(response); - } - - @Override - public void sendResponse(Exception exception) { - sendFromTo(destination, getLocalNode(), action, new Runnable() { - @Override - public void run() { - if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) { - logger.trace("dropping response to {}: channel is not CONNECTED", - requestDescription); - } else { - handleRemoteError(requestId, exception); - } - } - - @Override - public String toString() { - return "error response to " + requestDescription; - } - }); - } - }; - - final TransportRequest copiedRequest; - try { - copiedRequest = copyWriteable(request, writeableRegistry(), requestHandler::newRequest); - } catch (IOException e) { - throw new AssertionError("exception de/serializing request", e); - } - - try { - requestHandler.processMessageReceived(copiedRequest, transportChannel); - } catch (Exception e) { - try { - transportChannel.sendResponse(e); - } catch (Exception ee) { - logger.warn("failed to send failure", e); - } - } - } + try { + requestHandler.processMessageReceived(copiedRequest, transportChannel); + } catch (Exception e) { + try { + transportChannel.sendResponse(e); + } catch (Exception ee) { + logger.warn("failed to send failure", e); } - - @Override - public String toString() { - return requestDescription; - } - }); + } } private NamedWriteableRegistry writeableRegistry() {