Move connection listener to ConnectionManager (#32956)

This is a followup to #31886. After that commit the
TransportConnectionListener had to be propogated to both the
Transport and the ConnectionManager. This commit moves that listener
to completely live in the ConnectionManager. The request and response
related methods are moved to a TransportMessageListener. That listener
continues to live in the Transport class.
This commit is contained in:
Tim Brooks 2018-08-18 10:09:24 -06:00 committed by GitHub
parent f82bb64feb
commit de92d2ef1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 167 additions and 189 deletions

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.netty4;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -87,13 +86,6 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
return transportService;
}
@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
final Netty4Transport t = (Netty4Transport) transport;
final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}
public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.nio;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -93,12 +92,6 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
return transportService;
}
@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}
public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),

View File

@ -91,7 +91,8 @@ public class ConnectionManager implements Closeable {
}
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
return internalOpenConnection(node, resolvedProfile);
}
/**
@ -115,7 +116,7 @@ public class ConnectionManager implements Closeable {
}
boolean success = false;
try {
connection = transport.openConnection(node, resolvedProfile);
connection = internalOpenConnection(node, resolvedProfile);
connectionValidator.accept(connection, resolvedProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
@ -227,6 +228,19 @@ public class ConnectionManager implements Closeable {
}
}
private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Transport.Connection connection = transport.openConnection(node, connectionProfile);
try {
connectionListener.onConnectionOpened(connection);
} finally {
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
}
if (connection.isClosed()) {
throw new ConnectTransportException(node, "a channel closed while connecting");
}
return connection;
}
private void ensureOpen() {
if (lifecycle.started() == false) {
throw new IllegalStateException("connection manager is closed");
@ -289,6 +303,20 @@ public class ConnectionManager implements Closeable {
listener.onNodeConnected(node);
}
}
@Override
public void onConnectionOpened(Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(connection);
}
}
@Override
public void onConnectionClosed(Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(connection);
}
}
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {

View File

@ -184,7 +184,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;
private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
@ -248,14 +248,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected void doStart() {
}
@Override
public void addConnectionListener(TransportConnectionListener listener) {
transportListener.listeners.add(listener);
public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}
@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
return transportListener.listeners.remove(listener);
public boolean removeMessageListener(TransportMessageListener listener) {
return messageListener.listeners.remove(listener);
}
@Override
@ -344,10 +342,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
return connectionTypeHandle.getChannel(channels);
}
boolean allChannelsOpen() {
return channels.stream().allMatch(TcpChannel::isOpen);
}
@Override
public boolean sendPing() {
for (TcpChannel channel : channels) {
@ -481,11 +475,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// underlying channels.
nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
final NodeChannels finalNodeChannels = nodeChannels;
try {
transportListener.onConnectionOpened(nodeChannels);
} finally {
nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels)));
}
Consumer<TcpChannel> onClose = c -> {
assert c.isOpen() == false : "channel is still open when onClose is called";
@ -493,10 +482,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
};
nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch))));
if (nodeChannels.allChannelsOpen() == false) {
throw new ConnectTransportException(node, "a channel closed while connecting");
}
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
@ -907,7 +892,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final TransportRequestOptions finalOptions = options;
// this might be called in a different thread
SendListener onRequestSent = new SendListener(channel, stream,
() -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
() -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
internalSendMessage(channel, message, onRequestSent);
addedReleaseListener = true;
} finally {
@ -961,7 +946,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
SendListener onResponseSent = new SendListener(channel, null,
() -> transportListener.onResponseSent(requestId, action, error), message.length());
() -> messageListener.onResponseSent(requestId, action, error), message.length());
internalSendMessage(channel, message, onResponseSent);
}
}
@ -1010,7 +995,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
SendListener listener = new SendListener(channel, stream,
() -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length());
() -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length());
internalSendMessage(channel, message, listener);
addedReleaseListener = true;
} finally {
@ -1266,7 +1251,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
if (isHandshake) {
handler = pendingHandshakes.remove(requestId);
} else {
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener);
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
if (theHandler == null && TransportStatus.isError(status)) {
handler = pendingHandshakes.remove(requestId);
} else {
@ -1373,7 +1358,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
features = Collections.emptySet();
}
final String action = stream.readString();
transportListener.onRequestReceived(requestId, action);
messageListener.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status)) {
@ -1682,26 +1667,27 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
private final List<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
@Override
public void onRequestReceived(long requestId, String action) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}
@Override
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response, finalOptions);
}
}
@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}
@ -1709,42 +1695,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}
@Override
public void onNodeDisconnected(DiscoveryNode key) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key);
}
}
@Override
public void onConnectionOpened(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(nodeChannels);
}
}
@Override
public void onNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeConnected(node);
}
}
@Override
public void onConnectionClosed(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(nodeChannels);
}
}
@Override
public void onResponseReceived(long requestId, ResponseContext holder) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}

View File

@ -56,18 +56,9 @@ public interface Transport extends LifecycleComponent {
*/
RequestHandlerRegistry getRequestHandler(String action);
/**
* Adds a new event listener
* @param listener the listener to add
*/
void addConnectionListener(TransportConnectionListener listener);
void addMessageListener(TransportMessageListener listener);
/**
* Removes an event listener
* @param listener the listener to remove
* @return <code>true</code> iff the listener was removed otherwise <code>false</code>
*/
boolean removeConnectionListener(TransportConnectionListener listener);
boolean removeMessageListener(TransportMessageListener listener);
/**
* The address the transport is bound on.
@ -254,7 +245,7 @@ public interface Transport extends LifecycleComponent {
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
* found.
*/
public TransportResponseHandler onResponseReceived(final long requestId, TransportConnectionListener listener) {
public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) {
ResponseContext context = handlers.remove(requestId);
listener.onResponseReceived(requestId, context);
if (context == null) {

View File

@ -28,42 +28,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
*/
public interface TransportConnectionListener {
/**
* Called once a request is received
* @param requestId the internal request ID
* @param action the request action
*
*/
default void onRequestReceived(long requestId, String action) {}
/**
* Called for every action response sent after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
* @param finalOptions the response options
*/
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}
/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param error the error sent back to the caller
*/
default void onResponseSent(long requestId, String action, Exception error) {}
/**
* Called for every request sent to a server after the request has been passed to the underlying network implementation
* @param node the node the request was sent to
* @param requestId the internal request id
* @param action the action name
* @param request the actual request
* @param finalOptions the request options
*/
default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {}
/**
* Called once a connection was opened
* @param connection the connection
@ -76,13 +40,6 @@ public interface TransportConnectionListener {
*/
default void onConnectionClosed(Transport.Connection connection) {}
/**
* Called for every response received
* @param requestId the request id for this reponse
* @param context the response context or null if the context was already processed ie. due to a timeout.
*/
default void onResponseReceived(long requestId, Transport.ResponseContext context) {}
/**
* Called once a node connection is opened and registered.
*/

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
public interface TransportMessageListener {
/**
* Called once a request is received
* @param requestId the internal request ID
* @param action the request action
*
*/
default void onRequestReceived(long requestId, String action) {}
/**
* Called for every action response sent after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
* @param finalOptions the response options
*/
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}
/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param error the error sent back to the caller
*/
default void onResponseSent(long requestId, String action, Exception error) {}
/**
* Called for every request sent to a server after the request has been passed to the underlying network implementation
* @param node the node the request was sent to
* @param requestId the internal request id
* @param action the action name
* @param request the actual request
* @param finalOptions the request options
*/
default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {}
/**
* Called for every response received
* @param requestId the request id for this reponse
* @param context the response context or null if the context was already processed ie. due to a timeout.
*/
default void onResponseReceived(long requestId, Transport.ResponseContext context) {}
}

View File

@ -77,7 +77,7 @@ import static org.elasticsearch.common.settings.Setting.intSetting;
import static org.elasticsearch.common.settings.Setting.listSetting;
import static org.elasticsearch.common.settings.Setting.timeSetting;
public class TransportService extends AbstractLifecycleComponent implements TransportConnectionListener {
public class TransportService extends AbstractLifecycleComponent implements TransportMessageListener, TransportConnectionListener {
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
@ -248,7 +248,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
@Override
protected void doStart() {
transport.addConnectionListener(this);
transport.addMessageListener(this);
connectionManager.addListener(this);
transport.start();
if (transport.boundAddress() != null && logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress());
@ -506,12 +507,10 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
}
public void addConnectionListener(TransportConnectionListener listener) {
transport.addConnectionListener(listener);
connectionManager.addListener(listener);
}
public void removeConnectionListener(TransportConnectionListener listener) {
transport.removeConnectionListener(listener);
connectionManager.removeListener(listener);
}

View File

@ -38,8 +38,8 @@ import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
@ -62,7 +62,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
private final Object requestHandlerMutex = new Object();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private TransportConnectionListener listener;
private TransportMessageListener listener;
private boolean connectMode = true;
@ -223,13 +223,15 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
return requestHandlers.get(action);
}
@Override
public void addConnectionListener(TransportConnectionListener listener) {
public void addMessageListener(TransportMessageListener listener) {
this.listener = listener;
}
@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
public boolean removeMessageListener(TransportMessageListener listener) {
throw new UnsupportedOperationException();
}
}

View File

@ -37,9 +37,9 @@ import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -107,7 +107,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
assertConnectedExactlyToNodes(event.state());
}
public void testReconnect() {
List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
@ -188,7 +187,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
private final class MockTransport implements Transport {
private ResponseHandlers responseHandlers = new ResponseHandlers();
private volatile boolean randomConnectionExceptions = false;
private TransportConnectionListener listener = new TransportConnectionListener() {
private TransportMessageListener listener = new TransportMessageListener() {
};
@Override
@ -201,12 +200,12 @@ public class NodeConnectionsServiceTests extends ESTestCase {
}
@Override
public void addConnectionListener(TransportConnectionListener listener) {
public void addMessageListener(TransportMessageListener listener) {
this.listener = listener;
}
@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
public boolean removeMessageListener(TransportMessageListener listener) {
throw new UnsupportedOperationException();
}
@ -231,7 +230,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
if (randomConnectionExceptions && randomBoolean()) {
throw new ConnectTransportException(node, "simulated");
}
listener.onNodeConnected(node);
}
Connection connection = new Connection() {
@Override
@ -260,7 +258,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
return false;
}
};
listener.onConnectionOpened(connection);
return connection;
}

View File

@ -41,9 +41,9 @@ import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
@ -72,7 +72,7 @@ public class CapturingTransport implements Transport {
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
final Object requestHandlerMutex = new Object();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private TransportConnectionListener listener;
private TransportMessageListener listener;
public static class CapturedRequest {
public final DiscoveryNode node;
@ -341,7 +341,7 @@ public class CapturingTransport implements Transport {
}
@Override
public void addConnectionListener(TransportConnectionListener listener) {
public void addMessageListener(TransportMessageListener listener) {
if (this.listener != null) {
throw new IllegalStateException("listener already set");
}
@ -349,11 +349,12 @@ public class CapturingTransport implements Transport {
}
@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
public boolean removeMessageListener(TransportMessageListener listener) {
if (listener == this.listener) {
this.listener = null;
return true;
}
return false;
}
}

View File

@ -29,8 +29,8 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportStats;
@ -86,13 +86,13 @@ public class StubbableTransport implements Transport {
}
@Override
public void addConnectionListener(TransportConnectionListener listener) {
delegate.addConnectionListener(listener);
public void addMessageListener(TransportMessageListener listener) {
delegate.addMessageListener(listener);
}
@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
return delegate.removeConnectionListener(listener);
public boolean removeMessageListener(TransportMessageListener listener) {
return delegate.removeMessageListener(listener);
}
@Override
@ -179,7 +179,7 @@ public class StubbableTransport implements Transport {
return delegate.profileBoundAddresses();
}
private class WrappedConnection implements Transport.Connection {
public class WrappedConnection implements Transport.Connection {
private final Transport.Connection connection;
@ -234,6 +234,10 @@ public class StubbableTransport implements Transport {
public void close() {
connection.close();
}
public Transport.Connection getConnection() {
return connection;
}
}
@FunctionalInterface

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.ClusterSettings;
@ -52,6 +53,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
@ -2642,15 +2644,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testChannelCloseWhileConnecting() {
try (MockTransportService service = build(Settings.builder().put("name", "close").build(), version0, null, true)) {
service.transport.addConnectionListener(new TransportConnectionListener() {
AtomicBoolean connectionClosedListenerCalled = new AtomicBoolean(false);
service.addConnectionListener(new TransportConnectionListener() {
@Override
public void onConnectionOpened(final Transport.Connection connection) {
closeConnectionChannel(connection);
try {
closeConnectionChannel(service.getOriginalTransport(), connection);
} catch (final IOException e) {
assertBusy(connection::isClosed);
} catch (Exception e) {
throw new AssertionError(e);
}
}
@Override
public void onConnectionClosed(Transport.Connection connection) {
connectionClosedListenerCalled.set(true);
}
});
final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
@ -2662,10 +2671,15 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final ConnectTransportException e =
expectThrows(ConnectTransportException.class, () -> service.openConnection(nodeA, builder.build()));
assertThat(e, hasToString(containsString(("a channel closed while connecting"))));
assertTrue(connectionClosedListenerCalled.get());
}
}
protected abstract void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException;
private void closeConnectionChannel(Transport.Connection connection) {
StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection;
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection();
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}
@SuppressForbidden(reason = "need local ephemeral port")
private InetSocketAddress getLocalEphemeral() throws UnknownHostException {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -60,13 +59,4 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
public int channelsPerNodeConnection() {
return 1;
}
@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
final MockTcpTransport t = (MockTcpTransport) transport;
final TcpTransport.NodeChannels channels =
(TcpTransport.NodeChannels) connection;
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.nio;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -99,12 +98,6 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase
return 3;
}
@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}
public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),

View File

@ -10,7 +10,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.MockSecureSettings;
@ -35,6 +34,9 @@ import org.elasticsearch.xpack.core.common.socket.SocketAccess;
import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
import org.elasticsearch.xpack.core.ssl.SSLService;
import javax.net.SocketFactory;
import javax.net.ssl.HandshakeCompletedListener;
import javax.net.ssl.SSLSocket;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
@ -44,10 +46,6 @@ import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import javax.net.ssl.HandshakeCompletedListener;
import javax.net.ssl.SSLSocket;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
@ -118,12 +116,6 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleTransportTest
return transportService;
}
@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}
public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),