Make `TcpTransport#openConnection` fully async (#36095)

This is a follow-up to #35144. That commit made the underlying
connection opening process in TcpTransport asynchronous. However the
method still blocked on the process being complete before returning.
This commit moves the blocking to the ConnectionManager level. This is
another step towards the top-level TransportService api being async.
This commit is contained in:
Tim Brooks 2018-11-30 11:30:42 -07:00 committed by GitHub
parent 465a65aa57
commit ea7ea51050
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 219 additions and 139 deletions

View File

@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.lease.Releasable;
@ -218,7 +219,18 @@ public class ConnectionManager implements Closeable {
}
private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Transport.Connection connection = transport.openConnection(node, connectionProfile);
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
Releasable pendingConnection = transport.openConnection(node, connectionProfile, future);
Transport.Connection connection;
try {
connection = future.actionGet();
} catch (IllegalStateException e) {
// If the future was interrupted we must cancel the pending connection to avoid channels leaking
if (e.getCause() instanceof InterruptedException) {
pendingConnection.close();
}
throw e;
}
try {
connectionListener.onConnectionOpened(connection);
} finally {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
@ -46,6 +45,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
@ -349,34 +349,24 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
@Override
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Objects.requireNonNull(connectionProfile, "connection profile cannot be null");
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {
Objects.requireNonNull(profile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
connectionProfile = maybeOverrideConnectionProfile(connectionProfile);
ConnectionProfile finalProfile = maybeOverrideConnectionProfile(profile);
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
PlainActionFuture<NodeChannels> connectionFuture = PlainActionFuture.newFuture();
List<TcpChannel> pendingChannels = initiateConnection(node, connectionProfile, connectionFuture);
try {
return connectionFuture.actionGet();
} catch (IllegalStateException e) {
// If the future was interrupted we can close the channels to improve the shutdown of the MockTcpTransport
if (e.getCause() instanceof InterruptedException) {
CloseableChannel.closeChannels(pendingChannels, false);
}
throw e;
}
List<TcpChannel> pendingChannels = initiateConnection(node, finalProfile, listener);
return () -> CloseableChannel.closeChannels(pendingChannels, false);
} finally {
closeLock.readLock().unlock();
}
}
private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
ActionListener<NodeChannels> listener) {
ActionListener<Transport.Connection> listener) {
int numConnections = connectionProfile.getNumConnections();
assert numConnections > 0 : "A connection profile must be configured with at least one connection";
@ -432,7 +422,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected void bindServer(ProfileSettings profileSettings) {
// Bind and start to accept incoming connections.
InetAddress hostAddresses[];
InetAddress[] hostAddresses;
List<String> profileBindHosts = profileSettings.bindHosts;
try {
hostAddresses = networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY));
@ -1581,11 +1571,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final DiscoveryNode node;
private final ConnectionProfile connectionProfile;
private final List<TcpChannel> channels;
private final ActionListener<NodeChannels> listener;
private final ActionListener<Transport.Connection> listener;
private final CountDown countDown;
private ChannelsConnectedListener(DiscoveryNode node, ConnectionProfile connectionProfile, List<TcpChannel> channels,
ActionListener<NodeChannels> listener) {
ActionListener<Transport.Connection> listener) {
this.node = node;
this.connectionProfile = connectionProfile;
this.channels = channels;

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -86,10 +87,12 @@ public interface Transport extends LifecycleComponent {
}
/**
* Opens a new connection to the given node and returns it. The returned connection is not managed by
* the transport implementation. This connection must be closed once it's not needed anymore.
* Opens a new connection to the given node. When the connection is fully connected, the listener is
* called. A {@link Releasable} is returned representing the pending connection. If the caller of this
* method decides to move on before the listener is called with the completed connection, they should
* release the pending connection to prevent hanging connections.
*/
Connection openConnection(DiscoveryNode node, ConnectionProfile profile);
Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener);
TransportStats getStats();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
@ -30,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@ -79,8 +81,8 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
protected abstract ClusterState getMockClusterState(DiscoveryNode node);
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new CloseableConnection() {
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> connectionListener) {
connectionListener.onResponse(new CloseableConnection() {
@Override
public DiscoveryNode getNode() {
@ -134,7 +136,9 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
}
}
}
};
});
return () -> {};
}
protected abstract Response newResponse();

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAct
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -167,7 +168,9 @@ public class TransportClientNodesServiceTests extends ESTestCase {
transportService.addNodeConnectedBehavior((connectionManager, discoveryNode) -> false);
transportService.addGetConnectionBehavior((connectionManager, discoveryNode) -> {
// The FailAndRetryTransport does not use the connection profile
return transport.openConnection(discoveryNode, null);
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
transport.openConnection(discoveryNode, null, future);
return future.actionGet();
});
transportService.start();
transportService.acceptIncomingRequests();
@ -358,11 +361,19 @@ public class TransportClientNodesServiceTests extends ESTestCase {
try (MockTransportService clientService = createNewService(clientSettings, Version.CURRENT, threadPool, null)) {
final List<Transport.Connection> establishedConnections = new CopyOnWriteArrayList<>();
clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile) -> {
Transport.Connection connection = transport.openConnection(discoveryNode, profile);
establishedConnections.add(connection);
return connection;
});
clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) ->
transport.openConnection(discoveryNode, profile, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
establishedConnections.add(connection);
listener.onResponse(connection);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}));
clientService.start();

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -187,8 +188,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
private final class MockTransport implements Transport {
private ResponseHandlers responseHandlers = new ResponseHandlers();
private volatile boolean randomConnectionExceptions = false;
private TransportMessageListener listener = new TransportMessageListener() {
};
@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
@ -201,7 +200,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
@Override
public void addMessageListener(TransportMessageListener listener) {
this.listener = listener;
}
@Override
@ -225,13 +223,14 @@ public class NodeConnectionsServiceTests extends ESTestCase {
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
if (connectionProfile == null) {
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
if (profile == null) {
if (randomConnectionExceptions && randomBoolean()) {
throw new ConnectTransportException(node, "simulated");
listener.onFailure(new ConnectTransportException(node, "simulated"));
return () -> {};
}
}
Connection connection = new Connection() {
listener.onResponse(new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
@ -257,8 +256,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
public boolean isClosed() {
return false;
}
};
return connection;
});
return () -> {};
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.settings.Settings;
@ -35,8 +36,10 @@ import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConnectionManagerTests extends ESTestCase {
@ -82,7 +85,11 @@ public class ConnectionManagerTests extends ESTestCase {
DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
when(transport.openConnection(node, connectionProfile)).thenReturn(connection);
doAnswer(invocationOnMock -> {
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
listener.onResponse(connection);
return null;
}).when(transport).openConnection(eq(node), eq(connectionProfile), any(ActionListener.class));
assertFalse(connectionManager.nodeConnected(node));
@ -126,7 +133,11 @@ public class ConnectionManagerTests extends ESTestCase {
DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
when(transport.openConnection(node, connectionProfile)).thenReturn(connection);
doAnswer(invocationOnMock -> {
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
listener.onResponse(connection);
return null;
}).when(transport).openConnection(eq(node), eq(connectionProfile), any(ActionListener.class));
assertFalse(connectionManager.nodeConnected(node));

View File

@ -1441,7 +1441,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, Version
.CURRENT, threadPool));
stubbableTransport.setDefaultConnectBehavior((t, node, profile) -> {
stubbableTransport.setDefaultConnectBehavior((t, node, profile, listener) -> {
Map<String, DiscoveryNode> proxyMapping = nodeMap.get(node.getAddress().toString());
if (proxyMapping == null) {
throw new IllegalStateException("no proxy mapping for node: " + node);
@ -1455,34 +1455,44 @@ public class RemoteClusterConnectionTests extends ESTestCase {
// route by seed hostname
proxyNode = proxyMapping.get(node.getHostName());
}
Transport.Connection connection = t.openConnection(proxyNode, profile);
return new Transport.Connection() {
return t.openConnection(proxyNode, profile, new ActionListener<Transport.Connection>() {
@Override
public DiscoveryNode getNode() {
return node;
public void onResponse(Transport.Connection connection) {
Transport.Connection proxyConnection = new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return connection.isClosed();
}
@Override
public void close() {
connection.close();
}
};
listener.onResponse(proxyConnection);
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return connection.isClosed();
}
@Override
public void close() {
connection.close();
}
};
});
});
return stubbableTransport;
}

View File

@ -21,12 +21,15 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
@ -205,16 +208,17 @@ public class TcpTransportTests extends ESTestCase {
}
@Override
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
if (compressed) {
assertTrue(connectionProfile.getCompressionEnabled());
assertTrue(profile.getCompressionEnabled());
}
int numConnections = connectionProfile.getNumConnections();
int numConnections = profile.getNumConnections();
ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; ++i) {
fakeChannels.add(new FakeTcpChannel(false, messageCaptor));
}
return new NodeChannels(node, fakeChannels, connectionProfile, Version.CURRENT);
listener.onResponse(new NodeChannels(node, fakeChannels, profile, Version.CURRENT));
return () -> CloseableChannel.closeChannels(fakeChannels, false);
}
};
@ -225,7 +229,9 @@ public class TcpTransportTests extends ESTestCase {
} else {
profileBuilder.setCompressionEnabled(false);
}
Transport.Connection connection = transport.openConnection(node, profileBuilder.build());
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
transport.openConnection(node, profileBuilder.build(), future);
Transport.Connection connection = future.actionGet();
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
BytesReference reference = messageCaptor.get();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -99,7 +100,7 @@ public class CapturingTransport implements Transport {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool),
settings, this, threadPool);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null));
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
connectionManager);
}
@ -223,32 +224,9 @@ public class CapturingTransport implements Transport {
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
onSendRequest(requestId, action, request, node);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
};
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
listener.onResponse(createConnection(node));
return () -> {};
}
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
@ -347,4 +325,31 @@ public class CapturingTransport implements Transport {
return false;
}
private Connection createConnection(DiscoveryNode node) {
return new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
onSendRequest(requestId, action, request, node);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
};
}
}

View File

@ -217,8 +217,9 @@ public final class MockTransportService extends TransportService {
* is added to fail as well.
*/
public void addFailToSendNoConnectRule(TransportAddress transportAddress) {
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile) -> {
throw new ConnectTransportException(discoveryNode, "DISCONNECT: simulated");
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile, listener) -> {
listener.onFailure(new ConnectTransportException(discoveryNode, "DISCONNECT: simulated"));
return () -> {};
});
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
@ -279,8 +280,9 @@ public final class MockTransportService extends TransportService {
* and failing to connect once the rule was added.
*/
public void addUnresponsiveRule(TransportAddress transportAddress) {
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile) -> {
throw new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated");
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile, listener) -> {
listener.onFailure(new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated"));
return () -> {};
});
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
@ -311,10 +313,10 @@ public final class MockTransportService extends TransportService {
Supplier<TimeValue> delaySupplier = () -> new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime));
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile) -> {
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile, listener) -> {
TimeValue delay = delaySupplier.get();
if (delay.millis() <= 0) {
return original.openConnection(discoveryNode, profile);
return original.openConnection(discoveryNode, profile, listener);
}
// TODO: Replace with proper setting
@ -322,13 +324,15 @@ public final class MockTransportService extends TransportService {
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());
return original.openConnection(discoveryNode, profile);
return original.openConnection(discoveryNode, profile, listener);
} else {
Thread.sleep(connectingTimeout.millis());
throw new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated");
listener.onFailure(new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated"));
return () -> {};
}
} catch (InterruptedException e) {
throw new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated");
listener.onFailure(new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated"));
return () -> {};
}
});
@ -468,7 +472,7 @@ public final class MockTransportService extends TransportService {
* @return {@code true} if no default get connection behavior was registered.
*/
public boolean addGetConnectionBehavior(StubbableConnectionManager.GetConnectionBehavior behavior) {
return connectionManager().setDefaultConnectBehavior(behavior);
return connectionManager().setDefaultGetConnectionBehavior(behavior);
}
/**

View File

@ -52,7 +52,7 @@ public class StubbableConnectionManager extends ConnectionManager {
return getConnectionBehaviors.put(transportAddress, connectBehavior) == null;
}
public boolean setDefaultConnectBehavior(GetConnectionBehavior behavior) {
public boolean setDefaultGetConnectionBehavior(GetConnectionBehavior behavior) {
GetConnectionBehavior prior = defaultGetConnectionBehavior;
defaultGetConnectionBehavior = behavior;
return prior == null;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.ConnectionProfile;
@ -127,17 +128,28 @@ public final class StubbableTransport implements Transport {
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
TransportAddress address = node.getAddress();
OpenConnectionBehavior behavior = connectBehaviors.getOrDefault(address, defaultConnectBehavior);
Connection connection;
if (behavior == null) {
connection = delegate.openConnection(node, profile);
} else {
connection = behavior.openConnection(delegate, node, profile);
}
return new WrappedConnection(connection);
ActionListener<Connection> wrappedListener = new ActionListener<Connection>() {
@Override
public void onResponse(Connection connection) {
listener.onResponse(new WrappedConnection(connection));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
if (behavior == null) {
return delegate.openConnection(node, profile, wrappedListener);
} else {
return behavior.openConnection(delegate, node, profile, wrappedListener);
}
}
@Override
@ -243,7 +255,9 @@ public final class StubbableTransport implements Transport {
@FunctionalInterface
public interface OpenConnectionBehavior {
Connection openConnection(Transport transport, DiscoveryNode discoveryNode, ConnectionProfile profile);
Releasable openConnection(Transport transport, DiscoveryNode discoveryNode, ConnectionProfile profile,
ActionListener<Connection> listener);
}
@FunctionalInterface

View File

@ -2014,14 +2014,17 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
ConnectionProfile connectionProfile = new ConnectionProfile.Builder(defaultProfile)
.setPingInterval(TimeValue.timeValueMillis(50))
.build();
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
connectionProfile)) {
assertBusy(() -> {
assertTrue(originalTransport.getKeepAlive().successfulPingCount() > 30);
});
assertEquals(0, originalTransport.getKeepAlive().failedPingCount());
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null)) {
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(),
version0);
originalTransport.openConnection(node, connectionProfile, future);
try (Transport.Connection connection = future.actionGet()) {
assertBusy(() -> {
assertTrue(originalTransport.getKeepAlive().successfulPingCount() > 30);
});
assertEquals(0, originalTransport.getKeepAlive().failedPingCount());
}
}
}
@ -2054,11 +2057,14 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
connectionProfile)) {
assertEquals(connection.getVersion(), Version.CURRENT);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null)) {
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(),
version0);
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
originalTransport.openConnection(node, connectionProfile, future);
try (Transport.Connection connection = future.actionGet()) {
assertEquals(connection.getVersion(), Version.CURRENT);
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.security.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
@ -22,6 +23,7 @@ import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.common.socket.SocketAccess;
@ -136,11 +138,14 @@ public abstract class AbstractSimpleSecurityTransportTestCase extends AbstractSi
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
connectionProfile)) {
assertEquals(connection.getVersion(), Version.CURRENT);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null)) {
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(),
version0);
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
originalTransport.openConnection(node, connectionProfile, future);
try (TcpTransport.NodeChannels connection = (TcpTransport.NodeChannels) future.actionGet()) {
assertEquals(connection.getVersion(), Version.CURRENT);
}
}
}