Allow extension of CapturingTransport by subclasses (#33012)

Today, CapturingTransport#createCapturingTransportService creates a transport
service with a connection manager with reasonable default behaviours, but
overriding this behaviour in a consumer is a litle tricky. Additionally, the
default behaviour for opening a connection duplicates the content of the
CapturingTransport#openConnection() method.

This change removes this duplication by delegating to openConnection() and
introduces overridable nodeConnected() and onSendRequest() methods so that
consumers can alter this behaviour more easily.

Relates #32246 in which we test the mechanisms for opening connections to
unknown (and possibly unreachable) nodes.
This commit is contained in:
David Turner 2018-08-22 09:09:08 +01:00 committed by GitHub
parent ffb1a5d5b7
commit ab000323fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 37 additions and 50 deletions

View File

@ -51,7 +51,6 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -66,11 +65,13 @@ import java.util.function.Function;
import static org.apache.lucene.util.LuceneTestCase.rarely;
/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */
/**
* A transport class that doesn't send anything but rather captures all requests for inspection from tests
*/
public class CapturingTransport implements Transport {
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
final Object requestHandlerMutex = new Object();
private final Object requestHandlerMutex = new Object();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private TransportMessageListener listener;
@ -80,7 +81,7 @@ public class CapturingTransport implements Transport {
public final String action;
public final TransportRequest request;
public CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) {
CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) {
this.node = node;
this.requestId = requestId;
this.action = action;
@ -96,41 +97,15 @@ public class CapturingTransport implements Transport {
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool),
settings, this, threadPool);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> true);
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> new Connection() {
@Override
public DiscoveryNode getNode() {
return discoveryNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
requests.put(requestId, Tuple.tuple(discoveryNode, action));
capturedRequests.add(new CapturedRequest(discoveryNode, requestId, action, request));
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
});
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null));
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
connectionManager);
}
/** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */
/**
* returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()}
*/
public CapturedRequest[] capturedRequests() {
return capturedRequests.toArray(new CapturedRequest[0]);
}
@ -178,12 +153,16 @@ public class CapturingTransport implements Transport {
return groupRequestsByTargetNode(requests);
}
/** clears captured requests */
/**
* clears captured requests
*/
public void clear() {
capturedRequests.clear();
}
/** simulate a response for the given requestId */
/**
* simulate a response for the given requestId
*/
public void handleResponse(final long requestId, final TransportResponse response) {
responseHandlers.onResponseReceived(requestId, listener).handleResponse(response);
}
@ -194,7 +173,7 @@ public class CapturingTransport implements Transport {
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
* @param t the failure to wrap
*/
public void handleLocalError(final long requestId, final Throwable t) {
Tuple<DiscoveryNode, String> request = requests.get(requestId);
@ -208,7 +187,7 @@ public class CapturingTransport implements Transport {
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
* @param t the failure to wrap
*/
public void handleRemoteError(final long requestId, final Throwable t) {
final RemoteTransportException remoteException;
@ -234,7 +213,7 @@ public class CapturingTransport implements Transport {
*
* @param requestId the id corresponding to the captured send
* request
* @param e the failure
* @param e the failure
*/
public void handleError(final long requestId, final TransportException e) {
responseHandlers.onResponseReceived(requestId, listener).handleException(e);
@ -251,13 +230,11 @@ public class CapturingTransport implements Transport {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
onSendRequest(requestId, action, request, node);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
@ -267,11 +244,19 @@ public class CapturingTransport implements Transport {
@Override
public void close() {
}
};
}
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
protected boolean nodeConnected(DiscoveryNode discoveryNode) {
return true;
}
@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
@ -288,7 +273,7 @@ public class CapturingTransport implements Transport {
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
return new TransportAddress[0];
}
@ -299,22 +284,23 @@ public class CapturingTransport implements Transport {
@Override
public void addLifecycleListener(LifecycleListener listener) {
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
}
@Override
public void start() {}
public void start() {
}
@Override
public void stop() {}
public void stop() {
}
@Override
public void close() {}
public void close() {
}
@Override
public List<String> getLocalAddresses() {
@ -330,6 +316,7 @@ public class CapturingTransport implements Transport {
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
}
}
@Override
public ResponseHandlers getResponseHandlers() {
return responseHandlers;