Drop node if asymmetrically partitioned from master (#39598)

When a node is joining the cluster we ensure that it can send requests to the
master _at that time_. If it joins the cluster and _then_ loses the ability to
send requests to the master then it should be removed from the cluster. Today
this is not the case: the master can still receive responses to its follower
checks, and receives acknowledgements to cluster state publications, so has no
reason to remove the node.

This commit changes the handling of follower checks so that they fail if they
come from a master that the other node was following but which it now believes
to have failed.
This commit is contained in:
David Turner 2019-03-06 09:23:41 +00:00
parent 77dd711847
commit 295e39a8c8
6 changed files with 191 additions and 38 deletions

View File

@ -229,6 +229,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
// where we would possibly have to remove the NO_MASTER_BLOCK from the applierState when turning a candidate back to follower. // where we would possibly have to remove the NO_MASTER_BLOCK from the applierState when turning a candidate back to follower.
if (getLastAcceptedState().term() < getCurrentTerm()) { if (getLastAcceptedState().term() < getCurrentTerm()) {
becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender()); becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender());
} else if (mode == Mode.FOLLOWER) {
logger.trace("onFollowerCheckRequest: responding successfully to {}", followerCheckRequest);
} else if (joinHelper.isJoinPending()) {
logger.trace("onFollowerCheckRequest: rejoining master, responding successfully to {}", followerCheckRequest);
} else {
logger.trace("onFollowerCheckRequest: received check from faulty master, rejecting {}", followerCheckRequest);
throw new CoordinationStateRejectedException(
"onFollowerCheckRequest: received check from faulty master, rejecting " + followerCheckRequest);
} }
} }
} }

View File

@ -189,6 +189,11 @@ public class JoinHelper {
}; };
} }
boolean isJoinPending() {
// cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized.
return pendingOutgoingJoins.iterator().hasNext();
}
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) { public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
sendJoinRequest(destination, optionalJoin, () -> { sendJoinRequest(destination, optionalJoin, () -> {
}); });

View File

@ -919,8 +919,9 @@ public class CoordinatorTests extends ESTestCase {
nonLeader.coordinator.becomeCandidate("forced"); nonLeader.coordinator.becomeCandidate("forced");
} }
logger.debug("simulate follower check coming through from {} to {}", leader.getId(), nonLeader.getId()); logger.debug("simulate follower check coming through from {} to {}", leader.getId(), nonLeader.getId());
nonLeader.coordinator.onFollowerCheckRequest(new FollowersChecker.FollowerCheckRequest(leader.coordinator.getCurrentTerm(), expectThrows(CoordinationStateRejectedException.class, () -> nonLeader.coordinator.onFollowerCheckRequest(
leader.getLocalNode())); new FollowersChecker.FollowerCheckRequest(leader.coordinator.getCurrentTerm(), leader.getLocalNode())));
assertThat(nonLeader.coordinator.getMode(), equalTo(CANDIDATE));
}).run(); }).run();
cluster.stabilise(); cluster.stabilise();
} }
@ -1081,6 +1082,38 @@ public class CoordinatorTests extends ESTestCase {
cluster.stabilise(); cluster.stabilise();
} }
public void testFollowerRemovedIfUnableToSendRequestsToMaster() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode otherNode = cluster.getAnyNodeExcept(leader);
cluster.blackholeConnectionsFrom(otherNode, leader);
cluster.runFor(
(defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
+ (defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + DEFAULT_DELAY_VARIABILITY)
* defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
"awaiting removal of asymmetrically-partitioned node");
assertThat(leader.getLastAppliedClusterState().nodes().toString(),
leader.getLastAppliedClusterState().nodes().getSize(), equalTo(2));
cluster.clearBlackholedConnections();
cluster.stabilise(
// time for the disconnected node to find the master again
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2
// time for joining
+ 4 * DEFAULT_DELAY_VARIABILITY
// Then a commit of the updated cluster state
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
}
private static long defaultMillis(Setting<TimeValue> setting) { private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
} }
@ -1143,6 +1176,7 @@ public class CoordinatorTests extends ESTestCase {
private final Set<String> disconnectedNodes = new HashSet<>(); private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> blackholedNodes = new HashSet<>(); private final Set<String> blackholedNodes = new HashSet<>();
private final Set<Tuple<String,String>> blackholedConnections = new HashSet<>();
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>(); private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker(); private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
private final History history = new History(); private final History history = new History();
@ -1510,6 +1544,8 @@ public class CoordinatorTests extends ESTestCase {
connectionStatus = ConnectionStatus.BLACK_HOLE; connectionStatus = ConnectionStatus.BLACK_HOLE;
} else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) { } else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) {
connectionStatus = ConnectionStatus.DISCONNECTED; connectionStatus = ConnectionStatus.DISCONNECTED;
} else if (blackholedConnections.contains(Tuple.tuple(sender.getId(), destination.getId()))) {
connectionStatus = ConnectionStatus.BLACK_HOLE_REQUESTS_ONLY;
} else if (nodeExists(sender) && nodeExists(destination)) { } else if (nodeExists(sender) && nodeExists(destination)) {
connectionStatus = ConnectionStatus.CONNECTED; connectionStatus = ConnectionStatus.CONNECTED;
} else { } else {
@ -1560,6 +1596,14 @@ public class CoordinatorTests extends ESTestCase {
seedHostsList = emptyList(); seedHostsList = emptyList();
} }
void blackholeConnectionsFrom(ClusterNode sender, ClusterNode destination) {
blackholedConnections.add(Tuple.tuple(sender.getId(), destination.getId()));
}
void clearBlackholedConnections() {
blackholedConnections.clear();
}
class MockPersistedState implements PersistedState { class MockPersistedState implements PersistedState {
private final PersistedState delegate; private final PersistedState delegate;
private final NodeEnvironment nodeEnvironment; private final NodeEnvironment nodeEnvironment;

View File

@ -51,6 +51,8 @@ public class JoinHelperTests extends ESTestCase {
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT);
assertFalse(joinHelper.isJoinPending());
// check that sending a join to node1 works // check that sending a join to node1 works
Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() : Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
@ -60,6 +62,8 @@ public class JoinHelperTests extends ESTestCase {
CapturedRequest capturedRequest1 = capturedRequests1[0]; CapturedRequest capturedRequest1 = capturedRequests1[0];
assertEquals(node1, capturedRequest1.node); assertEquals(node1, capturedRequest1.node);
assertTrue(joinHelper.isJoinPending());
// check that sending a join to node2 works // check that sending a join to node2 works
Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() : Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
@ -95,5 +99,12 @@ public class JoinHelperTests extends ESTestCase {
assertThat(capturedRequests2a.length, equalTo(1)); assertThat(capturedRequests2a.length, equalTo(1));
CapturedRequest capturedRequest2a = capturedRequests2a[0]; CapturedRequest capturedRequest2a = capturedRequests2a[0];
assertEquals(node2, capturedRequest2a.node); assertEquals(node2, capturedRequest2a.node);
// complete all the joins and check that isJoinPending is updated
assertTrue(joinHelper.isJoinPending());
capturingTransport.handleRemoteError(capturedRequest2.requestId, new CoordinationStateRejectedException("dummy"));
capturingTransport.handleRemoteError(capturedRequest1a.requestId, new CoordinationStateRejectedException("dummy"));
capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy"));
assertFalse(joinHelper.isJoinPending());
} }
} }

View File

@ -116,8 +116,10 @@ public abstract class DisruptableMockTransport extends MockTransport {
destinationTransport.execute(action, new Runnable() { destinationTransport.execute(action, new Runnable() {
@Override @Override
public void run() { public void run() {
switch (getConnectionStatus(destinationTransport.getLocalNode())) { final ConnectionStatus connectionStatus = getConnectionStatus(destinationTransport.getLocalNode());
switch (connectionStatus) {
case BLACK_HOLE: case BLACK_HOLE:
case BLACK_HOLE_REQUESTS_ONLY:
onBlackholedDuringSend(requestId, action, destinationTransport); onBlackholedDuringSend(requestId, action, destinationTransport);
break; break;
@ -128,6 +130,9 @@ public abstract class DisruptableMockTransport extends MockTransport {
case CONNECTED: case CONNECTED:
onConnectedDuringSend(requestId, action, request, destinationTransport); onConnectedDuringSend(requestId, action, request, destinationTransport);
break; break;
default:
throw new AssertionError("unexpected status: " + connectionStatus);
} }
} }
@ -197,11 +202,20 @@ public abstract class DisruptableMockTransport extends MockTransport {
execute(action, new Runnable() { execute(action, new Runnable() {
@Override @Override
public void run() { public void run() {
if (destinationTransport.getConnectionStatus(getLocalNode()) != ConnectionStatus.CONNECTED) { final ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(getLocalNode());
logger.trace("dropping response to {}: channel is not CONNECTED", switch (connectionStatus) {
requestDescription); case CONNECTED:
} else { case BLACK_HOLE_REQUESTS_ONLY:
handleResponse(requestId, response); handleResponse(requestId, response);
break;
case BLACK_HOLE:
case DISCONNECTED:
logger.trace("dropping response to {}: channel is {}", requestDescription, connectionStatus);
break;
default:
throw new AssertionError("unexpected status: " + connectionStatus);
} }
} }
@ -217,11 +231,20 @@ public abstract class DisruptableMockTransport extends MockTransport {
execute(action, new Runnable() { execute(action, new Runnable() {
@Override @Override
public void run() { public void run() {
if (destinationTransport.getConnectionStatus(getLocalNode()) != ConnectionStatus.CONNECTED) { final ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(getLocalNode());
logger.trace("dropping response to {}: channel is not CONNECTED", switch (connectionStatus) {
requestDescription); case CONNECTED:
} else { case BLACK_HOLE_REQUESTS_ONLY:
handleRemoteError(requestId, exception); handleRemoteError(requestId, exception);
break;
case BLACK_HOLE:
case DISCONNECTED:
logger.trace("dropping exception response to {}: channel is {}", requestDescription, connectionStatus);
break;
default:
throw new AssertionError("unexpected status: " + connectionStatus);
} }
} }
@ -251,9 +274,29 @@ public abstract class DisruptableMockTransport extends MockTransport {
} }
} }
/**
* Response type from {@link DisruptableMockTransport#getConnectionStatus(DiscoveryNode)} indicating whether, and how, messages should
* be disrupted on this transport.
*/
public enum ConnectionStatus { public enum ConnectionStatus {
/**
* No disruption: deliver messages normally.
*/
CONNECTED, CONNECTED,
DISCONNECTED, // network requests to or from this node throw a ConnectTransportException
BLACK_HOLE // network traffic to or from the corresponding node is silently discarded /**
* Simulate disconnection: inbound and outbound messages throw a {@link ConnectTransportException}.
*/
DISCONNECTED,
/**
* Simulate a blackhole partition: inbound and outbound messages are silently discarded.
*/
BLACK_HOLE,
/**
* Simulate an asymmetric partition: outbound messages are silently discarded, but inbound messages are delivered normally.
*/
BLACK_HOLE_REQUESTS_ONLY
} }
} }

View File

@ -56,29 +56,32 @@ import static org.hamcrest.Matchers.containsString;
public class DisruptableMockTransportTests extends ESTestCase { public class DisruptableMockTransportTests extends ESTestCase {
DiscoveryNode node1; private DiscoveryNode node1;
DiscoveryNode node2; private DiscoveryNode node2;
DisruptableMockTransport transport1; private TransportService service1;
DisruptableMockTransport transport2; private TransportService service2;
TransportService service1; private DeterministicTaskQueue deterministicTaskQueue;
TransportService service2;
DeterministicTaskQueue deterministicTaskQueue; private Set<Tuple<DiscoveryNode, DiscoveryNode>> disconnectedLinks;
private Set<Tuple<DiscoveryNode, DiscoveryNode>> blackholedLinks;
private Set<Tuple<DiscoveryNode, DiscoveryNode>> blackholedRequestLinks;
Set<Tuple<DiscoveryNode, DiscoveryNode>> disconnectedLinks; private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
Set<Tuple<DiscoveryNode, DiscoveryNode>> blackholedLinks;
ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
Tuple<DiscoveryNode, DiscoveryNode> link = Tuple.tuple(sender, destination); Tuple<DiscoveryNode, DiscoveryNode> link = Tuple.tuple(sender, destination);
if (disconnectedLinks.contains(link)) { if (disconnectedLinks.contains(link)) {
assert blackholedLinks.contains(link) == false; assert blackholedLinks.contains(link) == false;
assert blackholedRequestLinks.contains(link) == false;
return ConnectionStatus.DISCONNECTED; return ConnectionStatus.DISCONNECTED;
} }
if (blackholedLinks.contains(link)) { if (blackholedLinks.contains(link)) {
assert blackholedRequestLinks.contains(link) == false;
return ConnectionStatus.BLACK_HOLE; return ConnectionStatus.BLACK_HOLE;
} }
if (blackholedRequestLinks.contains(link)) {
return ConnectionStatus.BLACK_HOLE_REQUESTS_ONLY;
}
return ConnectionStatus.CONNECTED; return ConnectionStatus.CONNECTED;
} }
@ -89,13 +92,14 @@ public class DisruptableMockTransportTests extends ESTestCase {
disconnectedLinks = new HashSet<>(); disconnectedLinks = new HashSet<>();
blackholedLinks = new HashSet<>(); blackholedLinks = new HashSet<>();
blackholedRequestLinks = new HashSet<>();
List<DisruptableMockTransport> transports = new ArrayList<>(); List<DisruptableMockTransport> transports = new ArrayList<>();
deterministicTaskQueue = new DeterministicTaskQueue( deterministicTaskQueue = new DeterministicTaskQueue(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random()); Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random());
transport1 = new DisruptableMockTransport(node1, logger) { final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger) {
@Override @Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) { protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination); return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@ -112,7 +116,7 @@ public class DisruptableMockTransportTests extends ESTestCase {
} }
}; };
transport2 = new DisruptableMockTransport(node2, logger) { final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger) {
@Override @Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) { protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination); return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@ -144,7 +148,6 @@ public class DisruptableMockTransportTests extends ESTestCase {
service2.connectToNode(node1); service2.connectToNode(node1);
} }
private TransportRequestHandler<TransportRequest.Empty> requestHandlerShouldNotBeCalled() { private TransportRequestHandler<TransportRequest.Empty> requestHandlerShouldNotBeCalled() {
return (request, channel, task) -> { return (request, channel, task) -> {
throw new AssertionError("should not be called"); throw new AssertionError("should not be called");
@ -293,15 +296,21 @@ public class DisruptableMockTransportTests extends ESTestCase {
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
} }
public void testUnavailableOnRequestOnly() {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
registerRequestHandler(service2, requestHandlerShouldNotBeCalled());
blackholedRequestLinks.add(Tuple.tuple(node1, node2));
send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks();
}
public void testDisconnectedOnSuccessfulResponse() throws IOException { public void testDisconnectedOnSuccessfulResponse() throws IOException {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled()); registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>(); AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldNotBeCalled()); send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
assertNull(responseHandlerException.get());
assertNotNull(responseHandlerChannel.get()); assertNotNull(responseHandlerChannel.get());
disconnectedLinks.add(Tuple.tuple(node2, node1)); disconnectedLinks.add(Tuple.tuple(node2, node1));
@ -314,10 +323,8 @@ public class DisruptableMockTransportTests extends ESTestCase {
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>(); AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldNotBeCalled()); send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
assertNull(responseHandlerException.get());
assertNotNull(responseHandlerChannel.get()); assertNotNull(responseHandlerChannel.get());
disconnectedLinks.add(Tuple.tuple(node2, node1)); disconnectedLinks.add(Tuple.tuple(node2, node1));
@ -330,10 +337,8 @@ public class DisruptableMockTransportTests extends ESTestCase {
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>(); AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldNotBeCalled()); send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
assertNull(responseHandlerException.get());
assertNotNull(responseHandlerChannel.get()); assertNotNull(responseHandlerChannel.get());
blackholedLinks.add(Tuple.tuple(node2, node1)); blackholedLinks.add(Tuple.tuple(node2, node1));
@ -346,10 +351,8 @@ public class DisruptableMockTransportTests extends ESTestCase {
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>(); AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldNotBeCalled()); send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
assertNull(responseHandlerException.get());
assertNotNull(responseHandlerChannel.get()); assertNotNull(responseHandlerChannel.get());
blackholedLinks.add(Tuple.tuple(node2, node1)); blackholedLinks.add(Tuple.tuple(node2, node1));
@ -357,4 +360,43 @@ public class DisruptableMockTransportTests extends ESTestCase {
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
} }
public void testUnavailableOnRequestOnlyReceivesSuccessfulResponse() throws IOException {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicBoolean responseHandlerCalled = new AtomicBoolean();
send(service1, node2, responseHandlerShouldBeCalledNormally(() -> responseHandlerCalled.set(true)));
deterministicTaskQueue.runAllTasks();
assertNotNull(responseHandlerChannel.get());
assertFalse(responseHandlerCalled.get());
blackholedRequestLinks.add(Tuple.tuple(node1, node2));
blackholedRequestLinks.add(Tuple.tuple(node2, node1));
responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE);
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(responseHandlerCalled.get());
}
public void testUnavailableOnRequestOnlyReceivesExceptionalResponse() throws IOException {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicBoolean responseHandlerCalled = new AtomicBoolean();
send(service1, node2, responseHandlerShouldBeCalledExceptionally(e -> responseHandlerCalled.set(true)));
deterministicTaskQueue.runAllTasks();
assertNotNull(responseHandlerChannel.get());
assertFalse(responseHandlerCalled.get());
blackholedRequestLinks.add(Tuple.tuple(node1, node2));
blackholedRequestLinks.add(Tuple.tuple(node2, node1));
responseHandlerChannel.get().sendResponse(new Exception());
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(responseHandlerCalled.get());
}
} }