diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 97849bfc40c..aaae6afc06c 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -862,13 +862,14 @@ public class SnapshotResiliencyTests extends ESTestCase { } public void clearNetworkDisruptions() { - disruptedLinks.disconnected.forEach(nodeName -> { + final Set disconnectedNodes = new HashSet<>(disruptedLinks.disconnected); + disruptedLinks.clear(); + disconnectedNodes.forEach(nodeName -> { if (testClusterNodes.nodes.containsKey(nodeName)) { final DiscoveryNode node = testClusterNodes.nodes.get(nodeName).node; testClusterNodes.nodes.values().forEach(n -> n.transportService.getConnectionManager().openConnection(node, null)); } }); - disruptedLinks.clear(); } private NetworkDisruption.DisruptedLinks getDisruption() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index c523aa15e58..eb39b1c16d0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -87,8 +87,14 @@ public abstract class DisruptableMockTransport extends MockTransport { @Override public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener listener) { - final Optional matchingTransport = getDisruptableMockTransport(node.getAddress()); - if (matchingTransport.isPresent()) { + final Optional optionalMatchingTransport = getDisruptableMockTransport(node.getAddress()); + if (optionalMatchingTransport.isPresent()) { + final DisruptableMockTransport matchingTransport = optionalMatchingTransport.get(); + final ConnectionStatus connectionStatus = getConnectionStatus(matchingTransport.getLocalNode()); + if (connectionStatus != ConnectionStatus.CONNECTED) { + throw new ConnectTransportException(node, "node [" + node + "] is [" + connectionStatus + "] not [CONNECTED]"); + } + listener.onResponse(new CloseableConnection() { @Override public DiscoveryNode getNode() { @@ -98,12 +104,12 @@ public abstract class DisruptableMockTransport extends MockTransport { @Override public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws TransportException { - onSendRequest(requestId, action, request, matchingTransport.get()); + onSendRequest(requestId, action, request, matchingTransport); } }); return () -> {}; } else { - throw new ConnectTransportException(node, "node " + node + " does not exist"); + throw new ConnectTransportException(node, "node [" + node + "] does not exist"); } } diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java index 4060b7f5cd8..90a47d09e6c 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -53,6 +54,7 @@ import java.util.function.Consumer; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; public class DisruptableMockTransportTests extends ESTestCase { @@ -399,4 +401,27 @@ public class DisruptableMockTransportTests extends ESTestCase { deterministicTaskQueue.runAllRunnableTasks(); assertTrue(responseHandlerCalled.get()); } + + public void testBrokenLinkFailsToConnect() { + service1.disconnectFromNode(node2); + + disconnectedLinks.add(Tuple.tuple(node1, node2)); + assertThat(expectThrows(ConnectTransportException.class, () -> service1.connectToNode(node2)).getMessage(), + endsWith("is [DISCONNECTED] not [CONNECTED]")); + disconnectedLinks.clear(); + + blackholedLinks.add(Tuple.tuple(node1, node2)); + assertThat(expectThrows(ConnectTransportException.class, () -> service1.connectToNode(node2)).getMessage(), + endsWith("is [BLACK_HOLE] not [CONNECTED]")); + blackholedLinks.clear(); + + blackholedRequestLinks.add(Tuple.tuple(node1, node2)); + assertThat(expectThrows(ConnectTransportException.class, () -> service1.connectToNode(node2)).getMessage(), + endsWith("is [BLACK_HOLE_REQUESTS_ONLY] not [CONNECTED]")); + blackholedRequestLinks.clear(); + + final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(expectThrows(ConnectTransportException.class, () -> service1.connectToNode(node3)).getMessage(), + endsWith("does not exist")); + } }