diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 60133a16a10..132a07d5b7f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -51,7 +51,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; import java.io.IOException; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -66,11 +65,13 @@ import java.util.function.Function; import static org.apache.lucene.util.LuceneTestCase.rarely; -/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */ +/** + * A transport class that doesn't send anything but rather captures all requests for inspection from tests + */ public class CapturingTransport implements Transport { private volatile Map requestHandlers = Collections.emptyMap(); - final Object requestHandlerMutex = new Object(); + private final Object requestHandlerMutex = new Object(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); private TransportMessageListener listener; @@ -80,7 +81,7 @@ public class CapturingTransport implements Transport { public final String action; public final TransportRequest request; - public CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) { + CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) { this.node = node; this.requestId = requestId; this.action = action; @@ -96,41 +97,15 @@ public class CapturingTransport implements Transport { @Nullable ClusterSettings clusterSettings, Set taskHeaders) { StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool), settings, this, threadPool); - connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> true); - connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> new Connection() { - @Override - public DiscoveryNode getNode() { - return discoveryNode; - } - - @Override - public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) - throws TransportException { - requests.put(requestId, Tuple.tuple(discoveryNode, action)); - capturedRequests.add(new CapturedRequest(discoveryNode, requestId, action, request)); - } - - @Override - public void addCloseListener(ActionListener listener) { - - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close() { - - } - }); + connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode)); + connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null)); return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, connectionManager); - } - /** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */ + /** + * returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} + */ public CapturedRequest[] capturedRequests() { return capturedRequests.toArray(new CapturedRequest[0]); } @@ -178,12 +153,16 @@ public class CapturingTransport implements Transport { return groupRequestsByTargetNode(requests); } - /** clears captured requests */ + /** + * clears captured requests + */ public void clear() { capturedRequests.clear(); } - /** simulate a response for the given requestId */ + /** + * simulate a response for the given requestId + */ public void handleResponse(final long requestId, final TransportResponse response) { responseHandlers.onResponseReceived(requestId, listener).handleResponse(response); } @@ -194,7 +173,7 @@ public class CapturingTransport implements Transport { * * @param requestId the id corresponding to the captured send * request - * @param t the failure to wrap + * @param t the failure to wrap */ public void handleLocalError(final long requestId, final Throwable t) { Tuple request = requests.get(requestId); @@ -208,7 +187,7 @@ public class CapturingTransport implements Transport { * * @param requestId the id corresponding to the captured send * request - * @param t the failure to wrap + * @param t the failure to wrap */ public void handleRemoteError(final long requestId, final Throwable t) { final RemoteTransportException remoteException; @@ -234,7 +213,7 @@ public class CapturingTransport implements Transport { * * @param requestId the id corresponding to the captured send * request - * @param e the failure + * @param e the failure */ public void handleError(final long requestId, final TransportException e) { responseHandlers.onResponseReceived(requestId, listener).handleException(e); @@ -251,13 +230,11 @@ public class CapturingTransport implements Transport { @Override public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws TransportException { - requests.put(requestId, Tuple.tuple(node, action)); - capturedRequests.add(new CapturedRequest(node, requestId, action, request)); + onSendRequest(requestId, action, request, node); } @Override public void addCloseListener(ActionListener listener) { - } @Override @@ -267,11 +244,19 @@ public class CapturingTransport implements Transport { @Override public void close() { - } }; } + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + requests.put(requestId, Tuple.tuple(node, action)); + capturedRequests.add(new CapturedRequest(node, requestId, action, request)); + } + + protected boolean nodeConnected(DiscoveryNode discoveryNode) { + return true; + } + @Override public TransportStats getStats() { throw new UnsupportedOperationException(); @@ -288,7 +273,7 @@ public class CapturingTransport implements Transport { } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) { return new TransportAddress[0]; } @@ -299,22 +284,23 @@ public class CapturingTransport implements Transport { @Override public void addLifecycleListener(LifecycleListener listener) { - } @Override public void removeLifecycleListener(LifecycleListener listener) { - } @Override - public void start() {} + public void start() { + } @Override - public void stop() {} + public void stop() { + } @Override - public void close() {} + public void close() { + } @Override public List getLocalAddresses() { @@ -330,6 +316,7 @@ public class CapturingTransport implements Transport { requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); } } + @Override public ResponseHandlers getResponseHandlers() { return responseHandlers;