Update MockTransportService to the age of Transport.Connection (#25320)
MockTransportServices allows us to simulate network disruptions in our testing infra. Sadly it wasn't updated to the state of the art in Transport land. This PR brings it up to speed. Specifically: 1) Opening a connection is now also blocked (before only node connections were blocked) 2) Simplifies things using the latest connection based notification between TcpTransport and TransportService for when a disconnect happens. 3) By 2, it fixes a race condition where we may fail to respond to a sent request when it is sent concurrently with the closing of a connection. The old code relied on a node based bridge between tcp transport and transport service. Sadly, the following doesn't work any more: ``` if (transport.nodeConnected(node)) { // this a connected node, disconnecting from it will be up the exception transport.disconnectFromNode(node); <-- this may now be a noop and it doesn't mean that the transport service was notified of the disconnect between the nodeConnected check and here. } else { throw new ConnectTransportException(node, reason, e); } ```
This commit is contained in:
parent
8274cd67ab
commit
7013cbd927
|
@ -218,10 +218,17 @@ public final class MockTransportService extends TransportService {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
|
||||
throw new ConnectTransportException(node, "DISCONNECT: simulated");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException {
|
||||
simulateDisconnect(connection, original, "DISCONNECT: simulated");
|
||||
connection.close();
|
||||
// send the request, which will blow up
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -256,19 +263,12 @@ public final class MockTransportService extends TransportService {
|
|||
|
||||
addDelegate(transportAddress, new DelegateTransport(original) {
|
||||
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
|
||||
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
|
||||
throws ConnectTransportException {
|
||||
original.connectToNode(node, connectionProfile, connectionValidator);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException {
|
||||
if (blockedActions.contains(action)) {
|
||||
logger.info("--> preventing {} request", action);
|
||||
simulateDisconnect(connection, original, "DISCONNECT: prevented " + action + " request");
|
||||
connection.close();
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
}
|
||||
|
@ -302,6 +302,11 @@ public final class MockTransportService extends TransportService {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
|
||||
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException {
|
||||
|
@ -368,6 +373,28 @@ public final class MockTransportService extends TransportService {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
|
||||
TimeValue delay = getDelay();
|
||||
if (delay.millis() <= 0) {
|
||||
return original.openConnection(node, profile);
|
||||
}
|
||||
|
||||
// TODO: Replace with proper setting
|
||||
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
|
||||
try {
|
||||
if (delay.millis() < connectingTimeout.millis()) {
|
||||
Thread.sleep(delay.millis());
|
||||
return original.openConnection(node, profile);
|
||||
} else {
|
||||
Thread.sleep(connectingTimeout.millis());
|
||||
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException {
|
||||
|
@ -449,37 +476,6 @@ public final class MockTransportService extends TransportService {
|
|||
return (LookupTestTransport) transport;
|
||||
}
|
||||
|
||||
/**
|
||||
* simulates a disconnect by disconnecting from the underlying transport and throwing a
|
||||
* {@link ConnectTransportException}
|
||||
*/
|
||||
private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason) {
|
||||
simulateDisconnect(node, transport, reason, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* simulates a disconnect by disconnecting from the underlying transport and throwing a
|
||||
* {@link ConnectTransportException}, due to a specific cause exception
|
||||
*/
|
||||
private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason, @Nullable Throwable e) {
|
||||
if (transport.nodeConnected(node)) {
|
||||
// this a connected node, disconnecting from it will be up the exception
|
||||
transport.disconnectFromNode(node);
|
||||
} else {
|
||||
throw new ConnectTransportException(node, reason, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* simulates a disconnect by closing the connection and throwing a
|
||||
* {@link ConnectTransportException}
|
||||
*/
|
||||
private void simulateDisconnect(Transport.Connection connection, Transport transport, String reason) throws IOException {
|
||||
connection.close();
|
||||
simulateDisconnect(connection.getNode(), transport, reason);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A lookup transport that has a list of potential Transport implementations to delegate to for node operations,
|
||||
* if none is registered, then the default one is used.
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.mocksocket.MockServerSocket;
|
||||
import org.elasticsearch.node.Node;
|
||||
|
@ -61,7 +60,6 @@ import java.net.ServerSocket;
|
|||
import java.net.Socket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -1463,12 +1461,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
// all is well
|
||||
}
|
||||
|
||||
try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)) {
|
||||
serviceB.handshake(connection, 100);
|
||||
fail("exception should be thrown");
|
||||
} catch (IllegalStateException e) {
|
||||
// all is well
|
||||
}
|
||||
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE));
|
||||
}
|
||||
|
||||
public void testMockUnresponsiveRule() throws IOException {
|
||||
|
@ -1519,12 +1512,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
// all is well
|
||||
}
|
||||
|
||||
try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)) {
|
||||
serviceB.handshake(connection, 100);
|
||||
fail("exception should be thrown");
|
||||
} catch (IllegalStateException e) {
|
||||
// all is well
|
||||
}
|
||||
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE));
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue