diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 8d628ace2ee..9d6f016086c 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.transport.netty4; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -87,13 +86,6 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase return transportService; } - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - final Netty4Transport t = (Netty4Transport) transport; - final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - public void testConnectException() throws UnknownHostException { try { serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876), diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 9322bfd7122..baae00f81a3 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -93,12 +92,6 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { return transportService; } - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - public void testConnectException() throws UnknownHostException { try { serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876), diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 1ff8b701a83..84c337399d5 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -91,7 +91,8 @@ public class ConnectionManager implements Closeable { } public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { - return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile)); + ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); + return internalOpenConnection(node, resolvedProfile); } /** @@ -115,7 +116,7 @@ public class ConnectionManager implements Closeable { } boolean success = false; try { - connection = transport.openConnection(node, resolvedProfile); + connection = internalOpenConnection(node, resolvedProfile); connectionValidator.accept(connection, resolvedProfile); // we acquire a connection lock, so no way there is an existing connection connectedNodes.put(node, connection); @@ -227,6 +228,19 @@ public class ConnectionManager implements Closeable { } } + private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { + Transport.Connection connection = transport.openConnection(node, connectionProfile); + try { + connectionListener.onConnectionOpened(connection); + } finally { + connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection))); + } + if (connection.isClosed()) { + throw new ConnectTransportException(node, "a channel closed while connecting"); + } + return connection; + } + private void ensureOpen() { if (lifecycle.started() == false) { throw new IllegalStateException("connection manager is closed"); @@ -289,6 +303,20 @@ public class ConnectionManager implements Closeable { listener.onNodeConnected(node); } } + + @Override + public void onConnectionOpened(Transport.Connection connection) { + for (TransportConnectionListener listener : listeners) { + listener.onConnectionOpened(connection); + } + } + + @Override + public void onConnectionClosed(Transport.Connection connection) { + for (TransportConnectionListener listener : listeners) { + listener.onConnectionClosed(connection); + } + } } static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 0b82417cfaa..6d4ab80a892 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -184,7 +184,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final NetworkService networkService; protected final Set profileSettings; - private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener(); + private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); private final Map> serverChannels = newConcurrentMap(); @@ -248,14 +248,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected void doStart() { } - @Override - public void addConnectionListener(TransportConnectionListener listener) { - transportListener.listeners.add(listener); + public void addMessageListener(TransportMessageListener listener) { + messageListener.listeners.add(listener); } - @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { - return transportListener.listeners.remove(listener); + public boolean removeMessageListener(TransportMessageListener listener) { + return messageListener.listeners.remove(listener); } @Override @@ -344,10 +342,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements return connectionTypeHandle.getChannel(channels); } - boolean allChannelsOpen() { - return channels.stream().allMatch(TcpChannel::isOpen); - } - @Override public boolean sendPing() { for (TcpChannel channel : channels) { @@ -481,11 +475,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements // underlying channels. nodeChannels = new NodeChannels(node, channels, connectionProfile, version); final NodeChannels finalNodeChannels = nodeChannels; - try { - transportListener.onConnectionOpened(nodeChannels); - } finally { - nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels))); - } Consumer onClose = c -> { assert c.isOpen() == false : "channel is still open when onClose is called"; @@ -493,10 +482,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements }; nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch)))); - - if (nodeChannels.allChannelsOpen() == false) { - throw new ConnectTransportException(node, "a channel closed while connecting"); - } success = true; return nodeChannels; } catch (ConnectTransportException e) { @@ -907,7 +892,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements final TransportRequestOptions finalOptions = options; // this might be called in a different thread SendListener onRequestSent = new SendListener(channel, stream, - () -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length()); + () -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length()); internalSendMessage(channel, message, onRequestSent); addedReleaseListener = true; } finally { @@ -961,7 +946,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length()); CompositeBytesReference message = new CompositeBytesReference(header, bytes); SendListener onResponseSent = new SendListener(channel, null, - () -> transportListener.onResponseSent(requestId, action, error), message.length()); + () -> messageListener.onResponseSent(requestId, action, error), message.length()); internalSendMessage(channel, message, onResponseSent); } } @@ -1010,7 +995,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements final TransportResponseOptions finalOptions = options; // this might be called in a different thread SendListener listener = new SendListener(channel, stream, - () -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length()); + () -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length()); internalSendMessage(channel, message, listener); addedReleaseListener = true; } finally { @@ -1266,7 +1251,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements if (isHandshake) { handler = pendingHandshakes.remove(requestId); } else { - TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener); + TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener); if (theHandler == null && TransportStatus.isError(status)) { handler = pendingHandshakes.remove(requestId); } else { @@ -1373,7 +1358,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements features = Collections.emptySet(); } final String action = stream.readString(); - transportListener.onRequestReceived(requestId, action); + messageListener.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { if (TransportStatus.isHandshake(status)) { @@ -1682,26 +1667,27 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } - private static final class DelegatingTransportConnectionListener implements TransportConnectionListener { - private final List listeners = new CopyOnWriteArrayList<>(); + private static final class DelegatingTransportMessageListener implements TransportMessageListener { + + private final List listeners = new CopyOnWriteArrayList<>(); @Override public void onRequestReceived(long requestId, String action) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onRequestReceived(requestId, action); } } @Override public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onResponseSent(requestId, action, response, finalOptions); } } @Override public void onResponseSent(long requestId, String action, Exception error) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onResponseSent(requestId, action, error); } } @@ -1709,42 +1695,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements @Override public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions finalOptions) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onRequestSent(node, requestId, action, request, finalOptions); } } - @Override - public void onNodeDisconnected(DiscoveryNode key) { - for (TransportConnectionListener listener : listeners) { - listener.onNodeDisconnected(key); - } - } - - @Override - public void onConnectionOpened(Connection nodeChannels) { - for (TransportConnectionListener listener : listeners) { - listener.onConnectionOpened(nodeChannels); - } - } - - @Override - public void onNodeConnected(DiscoveryNode node) { - for (TransportConnectionListener listener : listeners) { - listener.onNodeConnected(node); - } - } - - @Override - public void onConnectionClosed(Connection nodeChannels) { - for (TransportConnectionListener listener : listeners) { - listener.onConnectionClosed(nodeChannels); - } - } - @Override public void onResponseReceived(long requestId, ResponseContext holder) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onResponseReceived(requestId, holder); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index 9538119f43b..90adf2ab9e7 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -56,18 +56,9 @@ public interface Transport extends LifecycleComponent { */ RequestHandlerRegistry getRequestHandler(String action); - /** - * Adds a new event listener - * @param listener the listener to add - */ - void addConnectionListener(TransportConnectionListener listener); + void addMessageListener(TransportMessageListener listener); - /** - * Removes an event listener - * @param listener the listener to remove - * @return true iff the listener was removed otherwise false - */ - boolean removeConnectionListener(TransportConnectionListener listener); + boolean removeMessageListener(TransportMessageListener listener); /** * The address the transport is bound on. @@ -254,7 +245,7 @@ public interface Transport extends LifecycleComponent { * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not * found. */ - public TransportResponseHandler onResponseReceived(final long requestId, TransportConnectionListener listener) { + public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) { ResponseContext context = handlers.remove(requestId); listener.onResponseReceived(requestId, context); if (context == null) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java index 0ee2ed5828d..c41a328637c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java @@ -28,42 +28,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; */ public interface TransportConnectionListener { - /** - * Called once a request is received - * @param requestId the internal request ID - * @param action the request action - * - */ - default void onRequestReceived(long requestId, String action) {} - - /** - * Called for every action response sent after the response has been passed to the underlying network implementation. - * @param requestId the request ID (unique per client) - * @param action the request action - * @param response the response send - * @param finalOptions the response options - */ - default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {} - - /*** - * Called for every failed action response after the response has been passed to the underlying network implementation. - * @param requestId the request ID (unique per client) - * @param action the request action - * @param error the error sent back to the caller - */ - default void onResponseSent(long requestId, String action, Exception error) {} - - /** - * Called for every request sent to a server after the request has been passed to the underlying network implementation - * @param node the node the request was sent to - * @param requestId the internal request id - * @param action the action name - * @param request the actual request - * @param finalOptions the request options - */ - default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions finalOptions) {} - /** * Called once a connection was opened * @param connection the connection @@ -76,13 +40,6 @@ public interface TransportConnectionListener { */ default void onConnectionClosed(Transport.Connection connection) {} - /** - * Called for every response received - * @param requestId the request id for this reponse - * @param context the response context or null if the context was already processed ie. due to a timeout. - */ - default void onResponseReceived(long requestId, Transport.ResponseContext context) {} - /** * Called once a node connection is opened and registered. */ diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java new file mode 100644 index 00000000000..a872c761b36 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.node.DiscoveryNode; + +public interface TransportMessageListener { + + /** + * Called once a request is received + * @param requestId the internal request ID + * @param action the request action + * + */ + default void onRequestReceived(long requestId, String action) {} + + /** + * Called for every action response sent after the response has been passed to the underlying network implementation. + * @param requestId the request ID (unique per client) + * @param action the request action + * @param response the response send + * @param finalOptions the response options + */ + default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {} + + /*** + * Called for every failed action response after the response has been passed to the underlying network implementation. + * @param requestId the request ID (unique per client) + * @param action the request action + * @param error the error sent back to the caller + */ + default void onResponseSent(long requestId, String action, Exception error) {} + + /** + * Called for every request sent to a server after the request has been passed to the underlying network implementation + * @param node the node the request was sent to + * @param requestId the internal request id + * @param action the action name + * @param request the actual request + * @param finalOptions the request options + */ + default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions finalOptions) {} + + /** + * Called for every response received + * @param requestId the request id for this reponse + * @param context the response context or null if the context was already processed ie. due to a timeout. + */ + default void onResponseReceived(long requestId, Transport.ResponseContext context) {} +} diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 8eca6504b70..fb14ae96dbf 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -77,7 +77,7 @@ import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.settings.Setting.listSetting; import static org.elasticsearch.common.settings.Setting.timeSetting; -public class TransportService extends AbstractLifecycleComponent implements TransportConnectionListener { +public class TransportService extends AbstractLifecycleComponent implements TransportMessageListener, TransportConnectionListener { public static final Setting CONNECTIONS_PER_NODE_RECOVERY = intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope); @@ -248,7 +248,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran @Override protected void doStart() { - transport.addConnectionListener(this); + transport.addMessageListener(this); + connectionManager.addListener(this); transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { logger.info("{}", transport.boundAddress()); @@ -506,12 +507,10 @@ public class TransportService extends AbstractLifecycleComponent implements Tran } public void addConnectionListener(TransportConnectionListener listener) { - transport.addConnectionListener(listener); connectionManager.addListener(listener); } public void removeConnectionListener(TransportConnectionListener listener) { - transport.removeConnectionListener(listener); connectionManager.removeListener(listener); } diff --git a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 513f07b733c..e9ff4048bfe 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -38,8 +38,8 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -62,7 +62,7 @@ abstract class FailAndRetryMockTransport imp private volatile Map requestHandlers = Collections.emptyMap(); private final Object requestHandlerMutex = new Object(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); - private TransportConnectionListener listener; + private TransportMessageListener listener; private boolean connectMode = true; @@ -223,13 +223,15 @@ abstract class FailAndRetryMockTransport imp return requestHandlers.get(action); } + @Override - public void addConnectionListener(TransportConnectionListener listener) { + public void addMessageListener(TransportMessageListener listener) { this.listener = listener; } @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { + public boolean removeMessageListener(TransportMessageListener listener) { throw new UnsupportedOperationException(); } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 92c1016d3e6..473f5152e8f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -37,9 +37,9 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -107,7 +107,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { assertConnectedExactlyToNodes(event.state()); } - public void testReconnect() { List nodes = generateNodes(); NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); @@ -188,7 +187,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { private final class MockTransport implements Transport { private ResponseHandlers responseHandlers = new ResponseHandlers(); private volatile boolean randomConnectionExceptions = false; - private TransportConnectionListener listener = new TransportConnectionListener() { + private TransportMessageListener listener = new TransportMessageListener() { }; @Override @@ -201,12 +200,12 @@ public class NodeConnectionsServiceTests extends ESTestCase { } @Override - public void addConnectionListener(TransportConnectionListener listener) { + public void addMessageListener(TransportMessageListener listener) { this.listener = listener; } @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { + public boolean removeMessageListener(TransportMessageListener listener) { throw new UnsupportedOperationException(); } @@ -231,7 +230,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { if (randomConnectionExceptions && randomBoolean()) { throw new ConnectTransportException(node, "simulated"); } - listener.onNodeConnected(node); } Connection connection = new Connection() { @Override @@ -260,7 +258,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { return false; } }; - listener.onConnectionOpened(connection); return connection; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index a7d3e85d300..60133a16a10 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -41,9 +41,9 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -72,7 +72,7 @@ public class CapturingTransport implements Transport { private volatile Map requestHandlers = Collections.emptyMap(); final Object requestHandlerMutex = new Object(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); - private TransportConnectionListener listener; + private TransportMessageListener listener; public static class CapturedRequest { public final DiscoveryNode node; @@ -341,7 +341,7 @@ public class CapturingTransport implements Transport { } @Override - public void addConnectionListener(TransportConnectionListener listener) { + public void addMessageListener(TransportMessageListener listener) { if (this.listener != null) { throw new IllegalStateException("listener already set"); } @@ -349,11 +349,12 @@ public class CapturingTransport implements Transport { } @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { + public boolean removeMessageListener(TransportMessageListener listener) { if (listener == this.listener) { this.listener = null; return true; } return false; } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 5a0dd3b7f6d..2e78f8a9a4f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -29,8 +29,8 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportStats; @@ -86,13 +86,13 @@ public class StubbableTransport implements Transport { } @Override - public void addConnectionListener(TransportConnectionListener listener) { - delegate.addConnectionListener(listener); + public void addMessageListener(TransportMessageListener listener) { + delegate.addMessageListener(listener); } @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { - return delegate.removeConnectionListener(listener); + public boolean removeMessageListener(TransportMessageListener listener) { + return delegate.removeMessageListener(listener); } @Override @@ -179,7 +179,7 @@ public class StubbableTransport implements Transport { return delegate.profileBoundAddresses(); } - private class WrappedConnection implements Transport.Connection { + public class WrappedConnection implements Transport.Connection { private final Transport.Connection connection; @@ -234,6 +234,10 @@ public class StubbableTransport implements Transport { public void close() { connection.close(); } + + public Transport.Connection getConnection() { + return connection; + } } @FunctionalInterface diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 29997b16ba0..4e59aaecf8d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.ClusterSettings; @@ -52,6 +53,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; @@ -2642,15 +2644,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testChannelCloseWhileConnecting() { try (MockTransportService service = build(Settings.builder().put("name", "close").build(), version0, null, true)) { - service.transport.addConnectionListener(new TransportConnectionListener() { + AtomicBoolean connectionClosedListenerCalled = new AtomicBoolean(false); + service.addConnectionListener(new TransportConnectionListener() { @Override public void onConnectionOpened(final Transport.Connection connection) { + closeConnectionChannel(connection); try { - closeConnectionChannel(service.getOriginalTransport(), connection); - } catch (final IOException e) { + assertBusy(connection::isClosed); + } catch (Exception e) { throw new AssertionError(e); } } + + @Override + public void onConnectionClosed(Transport.Connection connection) { + connectionClosedListenerCalled.set(true); + } }); final ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); builder.addConnections(1, @@ -2662,10 +2671,15 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { final ConnectTransportException e = expectThrows(ConnectTransportException.class, () -> service.openConnection(nodeA, builder.build())); assertThat(e, hasToString(containsString(("a channel closed while connecting")))); + assertTrue(connectionClosedListenerCalled.get()); } } - protected abstract void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException; + private void closeConnectionChannel(Transport.Connection connection) { + StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection; + TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection(); + CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); + } @SuppressForbidden(reason = "need local ephemeral port") private InetSocketAddress getLocalEphemeral() throws UnknownHostException { diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 6d1e5116474..42658b1d9a6 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -60,13 +59,4 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase { public int channelsPerNodeConnection() { return 1; } - - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - final MockTcpTransport t = (MockTcpTransport) transport; - final TcpTransport.NodeChannels channels = - (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index 3a78f366bd8..8b3e7dce367 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -99,12 +98,6 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase return 3; } - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - public void testConnectException() throws UnknownHostException { try { serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876), diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 70ab085fcf7..7397ebc8c7d 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -10,7 +10,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.MockSecureSettings; @@ -35,6 +34,9 @@ import org.elasticsearch.xpack.core.common.socket.SocketAccess; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; +import javax.net.SocketFactory; +import javax.net.ssl.HandshakeCompletedListener; +import javax.net.ssl.SSLSocket; import java.io.IOException; import java.net.InetAddress; import java.net.SocketTimeoutException; @@ -44,10 +46,6 @@ import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import javax.net.SocketFactory; -import javax.net.ssl.HandshakeCompletedListener; -import javax.net.ssl.SSLSocket; - import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; @@ -118,12 +116,6 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleTransportTest return transportService; } - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - public void testConnectException() throws UnknownHostException { try { serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),