MockTransportService should physically disconnect when simulating it (#22977)
This is in order to trigger listeners for disconnect events, most importantly the NodeFaultDetection. MockTransportService now does slightly a better job at mimicking real life failures: connecting to already connected node will be a noop (we don't detect any errors here in production either) and failing to send will cause the target node to be disconnected. This is the cause of failure in https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+5.2+multijob-unix-compatibility/os=debian/72
This commit is contained in:
parent
7513c6e4eb
commit
03ef756539
|
@ -199,13 +199,16 @@ public final class MockTransportService extends TransportService {
|
|||
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
|
||||
throw new ConnectTransportException(node, "DISCONNECT: simulated");
|
||||
if (original.nodeConnected(node) == false) {
|
||||
// connecting to an already connected node is a no-op
|
||||
throw new ConnectTransportException(node, "DISCONNECT: simulated");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException {
|
||||
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulated");
|
||||
simulateDisconnect(connection, original, "DISCONNECT: simulated");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -250,7 +253,7 @@ public final class MockTransportService extends TransportService {
|
|||
TransportRequestOptions options) throws IOException {
|
||||
if (blockedActions.contains(action)) {
|
||||
logger.info("--> preventing {} request", action);
|
||||
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
|
||||
simulateDisconnect(connection, original, "DISCONNECT: prevented " + action + " request");
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
}
|
||||
|
@ -276,7 +279,10 @@ public final class MockTransportService extends TransportService {
|
|||
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
|
||||
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
|
||||
if (original.nodeConnected(node) == false) {
|
||||
// connecting to an already connected node is a no-op
|
||||
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -318,6 +324,10 @@ public final class MockTransportService extends TransportService {
|
|||
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException {
|
||||
if (original.nodeConnected(node)) {
|
||||
// connecting to an already connected node is a no-op
|
||||
return;
|
||||
}
|
||||
TimeValue delay = getDelay();
|
||||
if (delay.millis() <= 0) {
|
||||
original.connectToNode(node, connectionProfile);
|
||||
|
@ -335,7 +345,7 @@ public final class MockTransportService extends TransportService {
|
|||
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new ConnectTransportException(node, "UNRESPONSIVE: interrupted while sleeping", e);
|
||||
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -420,6 +430,37 @@ public final class MockTransportService extends TransportService {
|
|||
return (LookupTestTransport) transport;
|
||||
}
|
||||
|
||||
/**
|
||||
* simulates a disconnect by disconnecting from the underlying transport and throwing a
|
||||
* {@link ConnectTransportException}
|
||||
*/
|
||||
private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason) {
|
||||
simulateDisconnect(node, transport, reason, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* simulates a disconnect by disconnecting from the underlying transport and throwing a
|
||||
* {@link ConnectTransportException}, due to a specific cause exception
|
||||
*/
|
||||
private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason, @Nullable Throwable e) {
|
||||
if (transport.nodeConnected(node)) {
|
||||
// this a connected node, disconnecting from it will be up the exception
|
||||
transport.disconnectFromNode(node);
|
||||
} else {
|
||||
throw new ConnectTransportException(node, reason, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* simulates a disconnect by closing the connection and throwing a
|
||||
* {@link ConnectTransportException}
|
||||
*/
|
||||
private void simulateDisconnect(Transport.Connection connection, Transport transport, String reason) throws IOException {
|
||||
connection.close();
|
||||
simulateDisconnect(connection.getNode(), transport, reason);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A lookup transport that has a list of potential Transport implementations to delegate to for node operations,
|
||||
* if none is registered, then the default one is used.
|
||||
|
|
|
@ -77,7 +77,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -1299,12 +1298,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
public void testMockFailToSendNoConnectRule() throws IOException {
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
}
|
||||
(request, channel) -> {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
});
|
||||
|
||||
serviceB.addFailToSendNoConnectRule(serviceA);
|
||||
|
@ -1328,7 +1324,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat(exp.getCause().getMessage(), endsWith("DISCONNECT: simulated"));
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(exp);
|
||||
assertThat(cause, instanceOf(ConnectTransportException.class));
|
||||
assertThat(((ConnectTransportException)cause).node(), equalTo(nodeA));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -1336,7 +1334,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
res.txGet();
|
||||
fail("exception should be thrown");
|
||||
} catch (Exception e) {
|
||||
assertThat(e.getCause().getMessage(), endsWith("DISCONNECT: simulated"));
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
assertThat(cause, instanceOf(ConnectTransportException.class));
|
||||
assertThat(((ConnectTransportException)cause).node(), equalTo(nodeA));
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -1398,6 +1398,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
try {
|
||||
serviceB.disconnectFromNode(nodeA);
|
||||
serviceB.connectToNode(nodeA);
|
||||
fail("exception should be thrown");
|
||||
} catch (ConnectTransportException e) {
|
||||
|
|
Loading…
Reference in New Issue