Integrate LeaderChecker with Coordinator (#34049)

This change ensures that follower nodes periodically check that their leader is
healthy, and that they elect a new leader if not.
This commit is contained in:
David Turner 2018-09-26 12:18:13 +01:00 committed by GitHub
parent f886eebd99
commit d995fc85c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 308 additions and 147 deletions

View File

@ -78,10 +78,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final UnicastConfiguredHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
@Nullable
private Releasable electionScheduler;
@Nullable
private Releasable prevotingRound;
@Nullable
private Releasable leaderCheckScheduler;
private AtomicLong maxTermSeen = new AtomicLong();
private Mode mode;
@ -108,10 +111,27 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
masterService.setClusterStateSupplier(this::getStateForMasterService);
}
private Runnable getOnLeaderFailure() {
return new Runnable() {
@Override
public void run() {
synchronized (mutex) {
becomeCandidate("onLeaderFailure");
}
}
@Override
public String toString() {
return "notification of leader failure";
}
};
}
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
synchronized (mutex) {
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);
@ -233,6 +253,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
joinAccumulator = joinHelper.new CandidateJoinAccumulator();
peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
leaderCheckScheduler = null;
}
}
preVoteCollector.update(getPreVoteResponse(), null);
@ -251,16 +277,21 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
peerFinder.deactivate(getLocalNode());
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
assert leaderCheckScheduler == null : leaderCheckScheduler;
}
void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader);
final boolean restartLeaderChecker = (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) == false;
if (mode != Mode.FOLLOWER) {
mode = Mode.FOLLOWER;
joinAccumulator.close(mode);
joinAccumulator = new JoinHelper.FollowerJoinAccumulator();
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
}
lastKnownLeader = Optional.of(leaderNode);
@ -268,6 +299,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
closePrevotingAndElectionScheduler();
cancelActivePublication();
preVoteCollector.update(getPreVoteResponse(), leaderNode);
if (restartLeaderChecker) {
if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
}
leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode);
}
}
private PreVoteResponse getPreVoteResponse() {
@ -339,6 +377,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert getStateForMasterService().nodes().getMasterNodeId() != null
|| getStateForMasterService().term() != getCurrentTerm() :
getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
@ -347,12 +386,16 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler != null;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
assert peerFinderLeader.isPresent() == false : peerFinderLeader;
assert prevotingRound == null || electionScheduler != null;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler == null : leaderCheckScheduler;
}
}
}
@ -577,6 +620,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return "scheduled timeout for " + publication;
}
});
leaderChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes());
publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here
}
} catch (Exception e) {

View File

@ -77,7 +77,7 @@ public class LeaderChecker extends AbstractComponent {
private final TransportService transportService;
private final Runnable onLeaderFailure;
private volatile DiscoveryNodes lastPublishedDiscoveryNodes;
private volatile DiscoveryNodes discoveryNodes;
public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
super(settings);
@ -111,19 +111,24 @@ public class LeaderChecker extends AbstractComponent {
* isLocalNodeElectedMaster() should reflect whether this node is a leader, and nodeExists()
* should indicate whether nodes are known publication targets or not.
*/
public void setLastPublishedDiscoveryNodes(DiscoveryNodes discoveryNodes) {
logger.trace("updating last-published nodes: {}", discoveryNodes);
lastPublishedDiscoveryNodes = discoveryNodes;
public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
logger.trace("setCurrentNodes: {}", discoveryNodes);
this.discoveryNodes = discoveryNodes;
}
// For assertions
boolean currentNodeIsMaster() {
return discoveryNodes.isLocalNodeElectedMaster();
}
private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException {
final DiscoveryNodes lastPublishedDiscoveryNodes = this.lastPublishedDiscoveryNodes;
assert lastPublishedDiscoveryNodes != null;
final DiscoveryNodes discoveryNodes = this.discoveryNodes;
assert discoveryNodes != null;
if (lastPublishedDiscoveryNodes.isLocalNodeElectedMaster() == false) {
if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("non-master handling {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check"));
} else if (lastPublishedDiscoveryNodes.nodeExists(request.getSender()) == false) {
} else if (discoveryNodes.nodeExists(request.getSender()) == false) {
logger.debug("leader check from unknown node: {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node"));
} else {

View File

@ -1105,6 +1105,9 @@ public class TransportReplicationActionTests extends ESTestCase {
}
static class TestResponse extends ReplicationResponse {
TestResponse() {
setShardInfo(new ShardInfo());
}
}
private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {

View File

@ -41,6 +41,7 @@ import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
@ -59,7 +60,11 @@ import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
@ -68,7 +73,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.cluster.discovery:TRACE")
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
public class CoordinatorTests extends ESTestCase {
public void testCanUpdateClusterStateAfterStabilisation() {
@ -101,6 +106,40 @@ public class CoordinatorTests extends ESTestCase {
assertEquals(currentTerm, newTerm);
}
public void testLeaderDisconnectionDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.stabilise();
final ClusterNode originalLeader = cluster.getAnyLeader();
logger.info("--> disconnecting leader {}", originalLeader);
originalLeader.disconnect();
synchronized (originalLeader.coordinator.mutex) {
originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated
}
cluster.stabilise();
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}
public void testUnresponsiveLeaderDetectedEventually() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.stabilise();
final ClusterNode originalLeader = cluster.getAnyLeader();
logger.info("--> partitioning leader {}", originalLeader);
originalLeader.partition();
synchronized (originalLeader.coordinator.mutex) {
originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated
}
cluster.stabilise(Cluster.DEFAULT_STABILISATION_TIME
+ (LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY));
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}
private static String nodeIdFromIndex(int nodeIndex) {
return "node" + nodeIndex;
}
@ -115,6 +154,9 @@ public class CoordinatorTests extends ESTestCase {
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
private final VotingConfiguration initialConfiguration;
private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> blackholedNodes = new HashSet<>();
Cluster(int initialNodeCount) {
logger.info("--> creating cluster of {} nodes", initialNodeCount);
@ -142,8 +184,12 @@ public class CoordinatorTests extends ESTestCase {
}
void stabilise() {
stabilise(DEFAULT_STABILISATION_TIME);
}
void stabilise(long stabilisationTime) {
final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis();
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + DEFAULT_STABILISATION_TIME) {
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + stabilisationTime) {
while (deterministicTaskQueue.hasRunnableTasks()) {
try {
@ -182,16 +228,21 @@ public class CoordinatorTests extends ESTestCase {
}
final String nodeId = clusterNode.getId();
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that all nodes have actually voted for the leader in this term
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " is at the same accepted version as the leader",
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
assertThat(nodeId + " is at the same committed version as the leader",
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(true)));
if (disconnectedNodes.contains(nodeId) || blackholedNodes.contains(nodeId)) {
assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE));
} else {
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that all nodes have actually voted for the leader in this term
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " is at the same accepted version as the leader",
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
assertThat(nodeId + " is at the same committed version as the leader",
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(true)));
}
}
assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize),
@ -204,6 +255,18 @@ public class CoordinatorTests extends ESTestCase {
return randomFrom(allLeaders);
}
private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
ConnectionStatus connectionStatus;
if (blackholedNodes.contains(sender.getId()) || blackholedNodes.contains(destination.getId())) {
connectionStatus = ConnectionStatus.BLACK_HOLE;
} else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) {
connectionStatus = ConnectionStatus.DISCONNECTED;
} else {
connectionStatus = ConnectionStatus.CONNECTED;
}
return connectionStatus;
}
class ClusterNode extends AbstractComponent {
private final int nodeIndex;
private Coordinator coordinator;
@ -241,7 +304,7 @@ public class CoordinatorTests extends ESTestCase {
@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return ConnectionStatus.CONNECTED;
return Cluster.this.getConnectionStatus(sender, destination);
}
@Override
@ -264,6 +327,17 @@ public class CoordinatorTests extends ESTestCase {
deterministicTaskQueue.scheduleNow(onNode(destination, doDelivery));
}
}
@Override
protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNode destination) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
logger.trace("ignoring blackhole and delivering {}", getRequestDescription(requestId, action, destination));
// handshakes always have a timeout, and are sent in a blocking fashion, so we must respond with an exception.
sendFromTo(destination, getLocalNode(), action, getDisconnectException(requestId, action, destination));
} else {
super.onBlackholedDuringSend(requestId, action, destination);
}
}
};
masterService = new FakeThreadPoolMasterService("test",
@ -290,7 +364,7 @@ public class CoordinatorTests extends ESTestCase {
return localNode.getId();
}
public DiscoveryNode getLocalNode() {
DiscoveryNode getLocalNode() {
return localNode;
}
@ -316,6 +390,14 @@ public class CoordinatorTests extends ESTestCase {
public String toString() {
return localNode.toString();
}
void disconnect() {
disconnectedNodes.add(localNode.getId());
}
void partition() {
blackholedNodes.add(localNode.getId());
}
}
private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {

View File

@ -294,7 +294,7 @@ public class LeaderCheckerTests extends ESTestCase {
= DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build();
{
leaderChecker.setLastPublishedDiscoveryNodes(discoveryNodes);
leaderChecker.setCurrentNodes(discoveryNodes);
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
@ -307,7 +307,7 @@ public class LeaderCheckerTests extends ESTestCase {
}
{
leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build());
leaderChecker.setCurrentNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build());
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
@ -318,7 +318,7 @@ public class LeaderCheckerTests extends ESTestCase {
}
{
leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build());
leaderChecker.setCurrentNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build());
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.MasterServiceTests;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.FutureUtils;
@ -38,17 +39,18 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Collections;
@ -66,12 +68,8 @@ import java.util.stream.IntStream;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@TestLogging("org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE")
public class NodeJoinTests extends ESTestCase {
@ -81,7 +79,7 @@ public class NodeJoinTests extends ESTestCase {
private MasterService masterService;
private Coordinator coordinator;
private DeterministicTaskQueue deterministicTaskQueue;
private TransportRequestHandler<JoinRequest> transportRequestHandler;
private RequestHandlerRegistry<TransportRequest> transportRequestHandler;
@BeforeClass
public static void beforeClass() {
@ -144,20 +142,29 @@ public class NodeJoinTests extends ESTestCase {
throw new IllegalStateException("method setupMasterServiceAndCoordinator can only be called once");
}
this.masterService = masterService;
TransportService transportService = mock(TransportService.class);
when(transportService.getLocalNode()).thenReturn(initialState.nodes().getLocalNode());
when(transportService.getThreadPool()).thenReturn(threadPool);
@SuppressWarnings("unchecked")
ArgumentCaptor<TransportRequestHandler<JoinRequest>> joinRequestHandler = ArgumentCaptor.forClass(
(Class) TransportRequestHandler.class);
CapturingTransport capturingTransport = new CapturingTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(),
destination.getVersion()));
} else {
super.onSendRequest(requestId, action, request, destination);
}
}
};
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> initialState.nodes().getLocalNode(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), Collections.emptySet());
coordinator = new Coordinator(Settings.EMPTY,
transportService,
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService,
() -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList(), random);
verify(transportService).registerRequestHandler(eq(JoinHelper.JOIN_ACTION_NAME), eq(ThreadPool.Names.GENERIC), eq(false), eq(false),
anyObject(), joinRequestHandler.capture());
transportRequestHandler = joinRequestHandler.getValue();
transportService.start();
transportService.acceptIncomingRequests();
transportRequestHandler = capturingTransport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME);
coordinator.start();
coordinator.startInitialJoin();
}
@ -202,7 +209,7 @@ public class NodeJoinTests extends ESTestCase {
// clone the node before submitting to simulate an incoming join, which is guaranteed to have a new
// disco node object serialized off the network
try {
transportRequestHandler.messageReceived(joinRequest, new TransportChannel() {
transportRequestHandler.processMessageReceived(joinRequest, new TransportChannel() {
@Override
public String getProfileName() {
return "dummy";
@ -229,7 +236,7 @@ public class NodeJoinTests extends ESTestCase {
logger.error(() -> new ParameterizedMessage("unexpected error for {}", future), e);
future.markAsFailed(e);
}
}, null);
});
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected error for {}", future), e);
future.markAsFailed(e);

View File

@ -51,7 +51,7 @@ public abstract class DisruptableMockTransport extends MockTransport {
protected abstract void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery);
private void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
protected final void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
handle(sender, destination, action, new Runnable() {
@Override
public void run() {
@ -74,10 +74,33 @@ public abstract class DisruptableMockTransport extends MockTransport {
assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself";
final String requestDescription = new ParameterizedMessage("[{}][{}] from {} to {}",
action, requestId, getLocalNode(), destination).getFormattedMessage();
sendFromTo(getLocalNode(), destination, action, new Runnable() {
@Override
public void run() {
switch (getConnectionStatus(getLocalNode(), destination)) {
case BLACK_HOLE:
onBlackholedDuringSend(requestId, action, destination);
break;
final Runnable returnConnectException = new Runnable() {
case DISCONNECTED:
onDisconnectedDuringSend(requestId, action, destination);
break;
case CONNECTED:
onConnectedDuringSend(requestId, action, request, destination);
break;
}
}
@Override
public String toString() {
return getRequestDescription(requestId, action, destination);
}
});
}
protected Runnable getDisconnectException(long requestId, String action, DiscoveryNode destination) {
return new Runnable() {
@Override
public void run() {
handleError(requestId, new ConnectTransportException(destination, "disconnected"));
@ -85,111 +108,107 @@ public abstract class DisruptableMockTransport extends MockTransport {
@Override
public String toString() {
return "disconnection response to " + requestDescription;
return "disconnection response to " + getRequestDescription(requestId, action, destination);
}
};
}
protected String getRequestDescription(long requestId, String action, DiscoveryNode destination) {
return new ParameterizedMessage("[{}][{}] from {} to {}",
action, requestId, getLocalNode(), destination).getFormattedMessage();
}
protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNode destination) {
logger.trace("dropping {}", getRequestDescription(requestId, action, destination));
}
protected void onDisconnectedDuringSend(long requestId, String action, DiscoveryNode destination) {
sendFromTo(destination, getLocalNode(), action, getDisconnectException(requestId, action, destination));
}
protected void onConnectedDuringSend(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
Optional<DisruptableMockTransport> destinationTransport = getDisruptedCapturingTransport(destination, action);
assert destinationTransport.isPresent();
final RequestHandlerRegistry<TransportRequest> requestHandler =
destinationTransport.get().getRequestHandler(action);
final String requestDescription = getRequestDescription(requestId, action, destination);
final TransportChannel transportChannel = new TransportChannel() {
@Override
public String getProfileName() {
return "default";
}
@Override
public String getChannelType() {
return "disruptable-mock-transport-channel";
}
@Override
public void sendResponse(final TransportResponse response) {
sendFromTo(destination, getLocalNode(), action, new Runnable() {
@Override
public void run() {
if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) {
logger.trace("dropping response to {}: channel is not CONNECTED",
requestDescription);
} else {
handleResponse(requestId, response);
}
}
@Override
public String toString() {
return "response to " + requestDescription;
}
});
}
@Override
public void sendResponse(TransportResponse response,
TransportResponseOptions options) {
sendResponse(response);
}
@Override
public void sendResponse(Exception exception) {
sendFromTo(destination, getLocalNode(), action, new Runnable() {
@Override
public void run() {
if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) {
logger.trace("dropping response to {}: channel is not CONNECTED",
requestDescription);
} else {
handleRemoteError(requestId, exception);
}
}
@Override
public String toString() {
return "error response to " + requestDescription;
}
});
}
};
sendFromTo(getLocalNode(), destination, action, new Runnable() {
@Override
public void run() {
switch (getConnectionStatus(getLocalNode(), destination)) {
case BLACK_HOLE:
logger.trace("dropping {}", requestDescription);
break;
final TransportRequest copiedRequest;
try {
copiedRequest = copyWriteable(request, writeableRegistry(), requestHandler::newRequest);
} catch (IOException e) {
throw new AssertionError("exception de/serializing request", e);
}
case DISCONNECTED:
sendFromTo(destination, getLocalNode(), action, returnConnectException);
break;
case CONNECTED:
Optional<DisruptableMockTransport> destinationTransport = getDisruptedCapturingTransport(destination, action);
assert destinationTransport.isPresent();
final RequestHandlerRegistry<TransportRequest> requestHandler =
destinationTransport.get().getRequestHandler(action);
final TransportChannel transportChannel = new TransportChannel() {
@Override
public String getProfileName() {
return "default";
}
@Override
public String getChannelType() {
return "disruptable-mock-transport-channel";
}
@Override
public void sendResponse(final TransportResponse response) {
sendFromTo(destination, getLocalNode(), action, new Runnable() {
@Override
public void run() {
if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) {
logger.trace("dropping response to {}: channel is not CONNECTED",
requestDescription);
} else {
handleResponse(requestId, response);
}
}
@Override
public String toString() {
return "response to " + requestDescription;
}
});
}
@Override
public void sendResponse(TransportResponse response,
TransportResponseOptions options) {
sendResponse(response);
}
@Override
public void sendResponse(Exception exception) {
sendFromTo(destination, getLocalNode(), action, new Runnable() {
@Override
public void run() {
if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) {
logger.trace("dropping response to {}: channel is not CONNECTED",
requestDescription);
} else {
handleRemoteError(requestId, exception);
}
}
@Override
public String toString() {
return "error response to " + requestDescription;
}
});
}
};
final TransportRequest copiedRequest;
try {
copiedRequest = copyWriteable(request, writeableRegistry(), requestHandler::newRequest);
} catch (IOException e) {
throw new AssertionError("exception de/serializing request", e);
}
try {
requestHandler.processMessageReceived(copiedRequest, transportChannel);
} catch (Exception e) {
try {
transportChannel.sendResponse(e);
} catch (Exception ee) {
logger.warn("failed to send failure", e);
}
}
}
try {
requestHandler.processMessageReceived(copiedRequest, transportChannel);
} catch (Exception e) {
try {
transportChannel.sendResponse(e);
} catch (Exception ee) {
logger.warn("failed to send failure", e);
}
@Override
public String toString() {
return requestDescription;
}
});
}
}
private NamedWriteableRegistry writeableRegistry() {