diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 6749d782853..7522ab98ed4 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -68,7 +68,6 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import java.io.Closeable; import java.io.IOException; import java.io.StreamCorruptedException; import java.net.BindException; @@ -81,7 +80,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumMap; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -313,13 +311,15 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } } - public final class NodeChannels implements Closeable { + public final class NodeChannels implements Connection { private final Map typeMapping = new EnumMap<>(TransportRequestOptions.Type.class); private final Channel[] channels; - private final AtomicBoolean establishedAllConnections = new AtomicBoolean(false); + private final DiscoveryNode node; + private final AtomicBoolean closed = new AtomicBoolean(false); - public NodeChannels(Channel[] channels, ConnectionProfile connectionProfile) { + public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile) { + this.node = node; this.channels = channels; assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == " + connectionProfile.getNumConnections() + " but was: [" + channels.length + "]"; @@ -329,12 +329,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } } - public void connectionsEstablished() { - if (establishedAllConnections.compareAndSet(false, true) == false) { - throw new AssertionError("connected more than once"); - } - - } public boolean hasChannel(Channel channel) { for (Channel channel1 : channels) { @@ -346,15 +340,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } public List getChannels() { - if (establishedAllConnections.get()) { // don't expose the channels until we are connected - return Arrays.asList(channels); - } else { - return Collections.emptyList(); - } + return Arrays.asList(channels); } public Channel channel(TransportRequestOptions.Type type) { - assert establishedAllConnections.get(); ConnectionProfile.ConnectionTypeHandle connectionTypeHandle = typeMapping.get(type); if (connectionTypeHandle == null) { throw new IllegalArgumentException("no type channel for [" + type + "]"); @@ -364,7 +353,24 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override public synchronized void close() throws IOException { - closeChannels(Arrays.asList(channels).stream().filter(Objects::nonNull).collect(Collectors.toList())); + if (closed.compareAndSet(false, true)) { + closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList())); + } + } + + @Override + public DiscoveryNode getNode() { + return this.node; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + if (closed.get()) { + throw new NodeNotConnectedException(node, "connection already closed"); + } + Channel channel = channel(options.type()); + sendRequestToChannel(this.node, channel, requestId, action, request, options); } } @@ -395,20 +401,18 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } try { try { - nodeChannels = connectToChannels(node, connectionProfile); + nodeChannels = openConnection(node, connectionProfile); } catch (Exception e) { logger.trace( (Supplier) () -> new ParameterizedMessage( "failed to connect to [{}], cleaning dangling connections", node), e); throw e; } - // we acquire a connection lock, so no way there is an existing connection - nodeChannels.connectionsEstablished(); connectedNodes.put(node, nodeChannels); if (logger.isDebugEnabled()) { logger.debug("connected to node [{}]", node); } - transportServiceAdapter.raiseNodeConnected(node); + transportServiceAdapter.onNodeConnected(node); } catch (ConnectTransportException e) { throw e; } catch (Exception e) { @@ -419,6 +423,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i globalLock.readLock().unlock(); } } + + @Override + public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + NodeChannels nodeChannels = connectToChannels(node, profile); + transportServiceAdapter.onConnectionOpened(node); + return nodeChannels; + } + /** * Disconnects from a node, only if the relevant channel is found to be part of the node channels. */ @@ -432,13 +444,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i // check again within the connection lock, if its still applicable to remove it if (nodeChannels != null && nodeChannels.hasChannel(channel)) { connectedNodes.remove(node); - try { - logger.debug("disconnecting from [{}], {}", node, reason); - IOUtils.closeWhileHandlingException(nodeChannels); - } finally { - logger.trace("disconnected from [{}], {}", node, reason); - transportServiceAdapter.raiseNodeDisconnected(node); - } + closeAndNotify(node, nodeChannels, reason); return true; } } @@ -446,6 +452,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return false; } + private void closeAndNotify(DiscoveryNode node, NodeChannels nodeChannels, String reason) { + try { + logger.debug("disconnecting from [{}], {}", node, reason); + IOUtils.closeWhileHandlingException(nodeChannels); + } finally { + logger.trace("disconnected from [{}], {}", node, reason); + transportServiceAdapter.onNodeDisconnected(node); + } + } + /** * Disconnects from a node if a channel is found as part of that nodes channels. */ @@ -469,12 +485,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i }); } - protected Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { + @Override + public Connection getConnection(DiscoveryNode node) { NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels == null) { throw new NodeNotConnectedException(node, "Node not connected"); } - return nodeChannels.channel(options.type()); + return nodeChannels; } @Override @@ -482,13 +499,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i try (Releasable ignored = connectionLock.acquire(node.getId())) { NodeChannels nodeChannels = connectedNodes.remove(node); if (nodeChannels != null) { - try { - logger.debug("disconnecting from [{}] due to explicit disconnect call", node); - IOUtils.closeWhileHandlingException(nodeChannels); - } finally { - logger.trace("disconnected from [{}] due to explicit disconnect call", node); - transportServiceAdapter.raiseNodeDisconnected(node); - } + closeAndNotify(node, nodeChannels, "due to explicit disconnect call"); } } } @@ -883,10 +894,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i return compress && (!(request instanceof BytesTransportRequest)); } - @Override - public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { - Channel targetChannel = nodeChannel(node, options); + + protected void sendRequestToChannel(DiscoveryNode node, Channel targetChannel, final long requestId, final String action, + final TransportRequest request, TransportRequestOptions options) throws IOException, + TransportException { if (compress) { options = TransportRequestOptions.builder(options).withCompress(true).build(); } diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index 96dcd61483d..b0821c609c0 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import java.io.Closeable; import java.io.IOException; import java.net.UnknownHostException; import java.util.List; @@ -35,7 +36,6 @@ import java.util.Map; public interface Transport extends LifecycleComponent { - Setting TRANSPORT_TCP_COMPRESS = Setting.boolSetting("transport.tcp.compress", false, Property.NodeScope); void transportServiceAdapter(TransportServiceAdapter service); @@ -72,13 +72,6 @@ public interface Transport extends LifecycleComponent { */ void disconnectFromNode(DiscoveryNode node); - /** - * Sends the request to the node. - * @throws NodeNotConnectedException if the given node is not connected - */ - void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws - IOException, TransportException; - /** * Returns count of currently open connections */ @@ -89,4 +82,39 @@ public interface Transport extends LifecycleComponent { default CircuitBreaker getInFlightRequestBreaker() { return new NoopCircuitBreaker("in-flight-noop"); } + + /** + * Returns a connection for the given node if the node is connected. + * Connections returned from this method must not be closed. The lifecylce of this connection is maintained by the Transport + * implementation. + * + * @throws NodeNotConnectedException if the node is not connected + * @see #connectToNode(DiscoveryNode, ConnectionProfile) + */ + Connection getConnection(DiscoveryNode node); + + /** + * Opens a new connection to the given node and returns it. In contrast to {@link #connectToNode(DiscoveryNode, ConnectionProfile)} + * the returned connection is not managed by the transport implementation. This connection must be closed once it's not needed anymore. + * This connection type can be used to execute a handshake between two nodes before the node will be published via + * {@link #connectToNode(DiscoveryNode, ConnectionProfile)}. + */ + Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException; + + /** + * A unidirectional connection to a {@link DiscoveryNode} + */ + interface Connection extends Closeable { + /** + * The node this connection is associated with + */ + DiscoveryNode getNode(); + + /** + * Sends the request to the node this connection is associated with + * @throws NodeNotConnectedException if the given node is not connected + */ + void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws + IOException, TransportException; + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java b/core/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java index 32bb9ca4ec2..3f277a0ee11 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java @@ -23,7 +23,18 @@ import org.elasticsearch.cluster.node.DiscoveryNode; public interface TransportConnectionListener { - void onNodeConnected(DiscoveryNode node); + /** + * Called once a node connection is opened and registered. + */ + default void onNodeConnected(DiscoveryNode node) {} - void onNodeDisconnected(DiscoveryNode node); + /** + * Called once a node connection is closed and unregistered. + */ + default void onNodeDisconnected(DiscoveryNode node) {} + + /** + * Called once a node connection is opened. + */ + default void onConnectionOpened(DiscoveryNode node) {} } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportInterceptor.java b/core/src/main/java/org/elasticsearch/transport/TransportInterceptor.java index 7b478ce48e6..8dfcf39bd9d 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportInterceptor.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportInterceptor.java @@ -51,10 +51,11 @@ public interface TransportInterceptor { /** * A simple interface to decorate - * {@link #sendRequest(DiscoveryNode, String, TransportRequest, TransportRequestOptions, TransportResponseHandler)} + * {@link #sendRequest(Transport.Connection, String, TransportRequest, TransportRequestOptions, TransportResponseHandler)} */ interface AsyncSender { - void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, - final TransportRequestOptions options, TransportResponseHandler handler); + void sendRequest(Transport.Connection connection, final String action, + final TransportRequest request, final TransportRequestOptions options, + TransportResponseHandler handler); } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index c15735ea1de..543cb5c940e 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -121,6 +121,22 @@ public class TransportService extends AbstractLifecycleComponent { /** if set will call requests sent to this id to shortcut and executed locally */ volatile DiscoveryNode localNode = null; + private final Transport.Connection localNodeConnection = new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return localNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + sendLocalRequest(requestId, action, request); + } + + @Override + public void close() throws IOException { + } + }; /** * Build the service. @@ -297,22 +313,6 @@ public class TransportService extends AbstractLifecycleComponent { transport.connectToNode(node, connectionProfile); } - /** - * Lightly connect to the specified node, and handshake cluster - * name and version - * - * @param node the node to connect to - * @param handshakeTimeout handshake timeout - * @return the connected node with version set - * @throws ConnectTransportException if the connection or the - * handshake failed - */ - public DiscoveryNode connectToNodeAndHandshake( - final DiscoveryNode node, - final long handshakeTimeout) throws ConnectTransportException { - return connectToNodeAndHandshake(node, handshakeTimeout, true); - } - /** * Lightly connect to the specified node, returning updated node * information. The handshake will fail if the cluster name on the @@ -321,50 +321,45 @@ public class TransportService extends AbstractLifecycleComponent { * * @param node the node to connect to * @param handshakeTimeout handshake timeout - * @param checkClusterName whether or not to ignore cluster name - * mismatches * @return the connected node * @throws ConnectTransportException if the connection failed * @throws IllegalStateException if the handshake failed */ public DiscoveryNode connectToNodeAndHandshake( - final DiscoveryNode node, - final long handshakeTimeout, - final boolean checkClusterName) { + final DiscoveryNode node, + final long handshakeTimeout) throws IOException { if (node.equals(localNode)) { return localNode; } - transport.connectToNode(node, ConnectionProfile.LIGHT_PROFILE); - try { - return handshake(node, handshakeTimeout, checkClusterName); - } catch (ConnectTransportException | IllegalStateException e) { - transport.disconnectFromNode(node); - throw e; + DiscoveryNode handshakeNode; + try (Transport.Connection connection = transport.openConnection(node, ConnectionProfile.LIGHT_PROFILE)) { + handshakeNode = handshake(connection, handshakeTimeout); } + connectToNode(node, ConnectionProfile.LIGHT_PROFILE); + return handshakeNode; } private DiscoveryNode handshake( - final DiscoveryNode node, - final long handshakeTimeout, - final boolean checkClusterName) throws ConnectTransportException { + final Transport.Connection connection, + final long handshakeTimeout) throws ConnectTransportException { final HandshakeResponse response; + final DiscoveryNode node = connection.getNode(); try { - response = this.submitRequest( - node, - HANDSHAKE_ACTION_NAME, - HandshakeRequest.INSTANCE, - TransportRequestOptions.builder().withTimeout(handshakeTimeout).build(), + PlainTransportFuture futureHandler = new PlainTransportFuture<>( new FutureTransportResponseHandler() { - @Override - public HandshakeResponse newInstance() { - return new HandshakeResponse(); - } - }).txGet(); + @Override + public HandshakeResponse newInstance() { + return new HandshakeResponse(); + } + }); + sendRequest(connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE, + TransportRequestOptions.builder().withTimeout(handshakeTimeout).build(), futureHandler); + response = futureHandler.txGet(); } catch (Exception e) { throw new IllegalStateException("handshake failed with " + node, e); } - if (checkClusterName && !Objects.equals(clusterName, response.clusterName)) { + if (!Objects.equals(clusterName, response.clusterName)) { throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node); } else if (response.version.isCompatible((localNode != null ? localNode.getVersion() : Version.CURRENT)) == false) { throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node); @@ -437,21 +432,59 @@ public class TransportService extends AbstractLifecycleComponent { TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { PlainTransportFuture futureHandler = new PlainTransportFuture<>(handler); - sendRequest(node, action, request, options, futureHandler); + try { + Transport.Connection connection = getConnection(node); + sendRequest(connection, action, request, options, futureHandler); + } catch (NodeNotConnectedException ex) { + // the caller might not handle this so we invoke the handler + futureHandler.handleException(ex); + } return futureHandler; } public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportResponseHandler handler) { - sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler); + try { + Transport.Connection connection = getConnection(node); + sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); + } catch (NodeNotConnectedException ex) { + // the caller might not handle this so we invoke the handler + handler.handleException(ex); + } } public final void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { - asyncSender.sendRequest(node, action, request, options, handler); + try { + Transport.Connection connection = getConnection(node); + sendRequest(connection, action, request, options, handler); + } catch (NodeNotConnectedException ex) { + // the caller might not handle this so we invoke the handler + handler.handleException(ex); + } + } + + final void sendRequest(final Transport.Connection connection, final String action, + final TransportRequest request, + final TransportRequestOptions options, + TransportResponseHandler handler) { + + asyncSender.sendRequest(connection, action, request, options, handler); + } + + /** + * Returns either a real transport connection or a local node connection if we are using the local node optimization. + * @throws NodeNotConnectedException if the given node is not connected + */ + private Transport.Connection getConnection(DiscoveryNode node) { + if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) { + return localNodeConnection; + } else { + return transport.getConnection(node); + } } public void sendChildRequest(final DiscoveryNode node, final String action, @@ -467,21 +500,26 @@ public class TransportService extends AbstractLifecycleComponent { request.setParentTask(localNode.getId(), parentTask.getId()); try { taskManager.registerChildTask(parentTask, node.getId()); - sendRequest(node, action, request, options, handler); + final Transport.Connection connection = getConnection(node); + sendRequest(connection, action, request, options, handler); } catch (TaskCancelledException ex) { // The parent task is already cancelled - just fail the request handler.handleException(new TransportException(ex)); + } catch (NodeNotConnectedException ex) { + // the caller might not handle this so we invoke the handler + handler.handleException(ex); } } - private void sendRequestInternal(final DiscoveryNode node, final String action, + private void sendRequestInternal(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { - if (node == null) { - throw new IllegalStateException("can't send request to a null node"); + if (connection == null) { + throw new IllegalStateException("can't send request to a null connection"); } + DiscoveryNode node = connection.getNode(); final long requestId = newRequestId(); final TimeoutHandler timeoutHandler; try { @@ -493,7 +531,7 @@ public class TransportService extends AbstractLifecycleComponent { } TransportResponseHandler responseHandler = new ContextRestoreResponseHandler<>(threadPool.getThreadContext().newStoredContext(), handler); - clientHandlers.put(requestId, new RequestHolder<>(responseHandler, node, action, timeoutHandler)); + clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection.getNode(), action, timeoutHandler)); if (lifecycle.stoppedOrClosed()) { // if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify // the caller. It will only notify if the toStop code hasn't done the work yet. @@ -503,11 +541,7 @@ public class TransportService extends AbstractLifecycleComponent { assert options.timeout() != null; timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler); } - if (node.equals(localNode)) { - sendLocalRequest(requestId, action, request); - } else { - transport.sendRequest(node, requestId, action, request, options); - } + connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream } catch (final Exception e) { // usually happen either because we failed to connect to the node // or because we failed serializing the message @@ -595,7 +629,6 @@ public class TransportService extends AbstractLifecycleComponent { "failed to notify channel of error message for action [{}]", action), inner); } } - } private boolean shouldTraceAction(String action) { @@ -777,7 +810,7 @@ public class TransportService extends AbstractLifecycleComponent { } @Override - public void raiseNodeConnected(final DiscoveryNode node) { + public void onNodeConnected(final DiscoveryNode node) { threadPool.generic().execute(() -> { for (TransportConnectionListener connectionListener : connectionListeners) { connectionListener.onNodeConnected(node); @@ -786,11 +819,22 @@ public class TransportService extends AbstractLifecycleComponent { } @Override - public void raiseNodeDisconnected(final DiscoveryNode node) { - try { - for (final TransportConnectionListener connectionListener : connectionListeners) { - threadPool.generic().execute(() -> connectionListener.onNodeDisconnected(node)); + public void onConnectionOpened(DiscoveryNode node) { + threadPool.generic().execute(() -> { + for (TransportConnectionListener connectionListener : connectionListeners) { + connectionListener.onConnectionOpened(node); } + }); + } + + @Override + public void onNodeDisconnected(final DiscoveryNode node) { + try { + threadPool.generic().execute( () -> { + for (final TransportConnectionListener connectionListener : connectionListeners) { + connectionListener.onNodeDisconnected(node); + } + }); for (Map.Entry entry : clientHandlers.entrySet()) { RequestHolder holder = entry.getValue(); if (holder.node().equals(node)) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java index 34910532a02..70748b01a68 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java @@ -21,7 +21,7 @@ package org.elasticsearch.transport; import org.elasticsearch.cluster.node.DiscoveryNode; -public interface TransportServiceAdapter { +public interface TransportServiceAdapter extends TransportConnectionListener { void addBytesReceived(long size); @@ -50,9 +50,4 @@ public interface TransportServiceAdapter { void onRequestReceived(long requestId, String action); RequestHandlerRegistry getRequestHandler(String action); - - void raiseNodeConnected(DiscoveryNode node); - - void raiseNodeDisconnected(DiscoveryNode node); - } diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 0416f6aff41..fabd0c70f16 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -72,48 +72,68 @@ abstract class FailAndRetryMockTransport imp protected abstract ClusterState getMockClusterState(DiscoveryNode node); @Override - @SuppressWarnings("unchecked") - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) - throws IOException, TransportException { - - //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info - if (connectMode) { - if (TransportLivenessAction.NAME.equals(action)) { - TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); - transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY), - node)); - } else if (ClusterStateAction.NAME.equals(action)) { - TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); - ClusterState clusterState = getMockClusterState(node); - transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState)); - } else { - throw new UnsupportedOperationException("Mock transport does not understand action " + action); + public Connection getConnection(DiscoveryNode node) { + return new Connection() { + @Override + public DiscoveryNode getNode() { + return node; } - return; - } - //once nodes are connected we'll just return errors for each sendRequest call - triedNodes.add(node); + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { - if (random.nextInt(100) > 10) { - connectTransportExceptions.incrementAndGet(); - throw new ConnectTransportException(node, "node not available"); - } else { - if (random.nextBoolean()) { - failures.incrementAndGet(); - //throw whatever exception that is not a subclass of ConnectTransportException - throw new IllegalStateException(); - } else { - TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); - if (random.nextBoolean()) { - successes.incrementAndGet(); - transportResponseHandler.handleResponse(newResponse()); + //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info + if (connectMode) { + if (TransportLivenessAction.NAME.equals(action)) { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING. + getDefault(Settings.EMPTY), + node)); + } else if (ClusterStateAction.NAME.equals(action)) { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + ClusterState clusterState = getMockClusterState(node); + transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState)); + } else { + throw new UnsupportedOperationException("Mock transport does not understand action " + action); + } + return; + } + + //once nodes are connected we'll just return errors for each sendRequest call + triedNodes.add(node); + + if (random.nextInt(100) > 10) { + connectTransportExceptions.incrementAndGet(); + throw new ConnectTransportException(node, "node not available"); } else { - failures.incrementAndGet(); - transportResponseHandler.handleException(new TransportException("transport exception")); + if (random.nextBoolean()) { + failures.incrementAndGet(); + //throw whatever exception that is not a subclass of ConnectTransportException + throw new IllegalStateException(); + } else { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId); + if (random.nextBoolean()) { + successes.incrementAndGet(); + transportResponseHandler.handleResponse(newResponse()); + } else { + failures.incrementAndGet(); + transportResponseHandler.handleException(new TransportException("transport exception")); + } + } } } - } + + @Override + public void close() throws IOException { + + } + }; + } + + @Override + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + return getConnection(node); } protected abstract Response newResponse(); diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index d25ae28cf28..d5ae66c6bd6 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTransportClient; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; @@ -149,12 +150,14 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase { public AsyncSender interceptSender(AsyncSender sender) { return new AsyncSender() { @Override - public void sendRequest(DiscoveryNode node, String action, TransportRequest request, - TransportRequestOptions options, TransportResponseHandler handler) { + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler handler) { if (TransportLivenessAction.NAME.equals(action)) { assertHeaders(threadPool); ((TransportResponseHandler) handler).handleResponse( - new LivenessResponse(new ClusterName("cluster1"), node)); + new LivenessResponse(new ClusterName("cluster1"), connection.getNode())); return; } if (ClusterStateAction.NAME.equals(action)) { diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index f513660de95..eaaafe1e1ec 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; @@ -129,12 +130,15 @@ public class TransportClientNodesServiceTests extends ESTestCase { public AsyncSender interceptSender(AsyncSender sender) { return new AsyncSender() { @Override - public void sendRequest(DiscoveryNode node, String action, TransportRequest request, - TransportRequestOptions options, TransportResponseHandler handler) { + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler handler) { if (TransportLivenessAction.NAME.equals(action)) { - sender.sendRequest(node, action, request, options, wrapLivenessResponseHandler(handler, node, clusterName)); + sender.sendRequest(connection, action, request, options, wrapLivenessResponseHandler(handler, + connection.getNode(), clusterName)); } else { - sender.sendRequest(node, action, request, options, handler); + sender.sendRequest(connection, action, request, options, handler); } } }; diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 3d3e87db083..637b6733e22 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -48,6 +48,8 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -196,15 +198,15 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { for (DiscoveryNode node : internalTestCluster.clusterService().state().getNodes()) { mockTransportService.addDelegate(internalTestCluster.getInstance(TransportService.class, node.getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { if (blockedActions.contains(action)) { if (timeout.get()) { logger.info("dropping [{}] to [{}]", action, node); return; } } - super.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } }); } diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 7372218a9ed..d5d3099c4d0 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -214,9 +214,29 @@ public class NodeConnectionsServiceTests extends ESTestCase { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { + public Connection getConnection(DiscoveryNode node) { + return new Connection() { + @Override + public DiscoveryNode getNode() { + return node; + } + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + + } + + @Override + public void close() throws IOException { + + } + }; + } + + @Override + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + return getConnection(node); } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 9a54419bc5c..ce57e7be05b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -84,7 +84,9 @@ import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.test.disruption.SlowClusterStateProcessing; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -953,13 +955,19 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService .original()) { @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions - options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) { countDownLatch.countDown(); } - super.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } + + @Override + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + return super.openConnection(node, profile); + } + }); countDownLatch.await(); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index c9c1e984182..9886abb900a 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -579,6 +579,10 @@ public class UnicastZenPingTests extends ESTestCase { @Override public void onNodeConnected(DiscoveryNode node) { + } + + @Override + public void onConnectionOpened(DiscoveryNode node) { counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger()); counters.get(node.getAddress()).incrementAndGet(); } diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java index d5a003003ac..e0f5d2a4377 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java +++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java @@ -57,6 +57,8 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -493,14 +495,13 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, - TransportRequest request, TransportRequestOptions options) - throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { if (keepFailing.get() && action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) { logger.info("--> failing translog ops"); throw new ElasticsearchException("failing on purpose"); } - super.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } }); diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index d1be0d77613..1d8534246c0 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -73,6 +73,8 @@ import org.elasticsearch.test.MockIndexEventListener; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -348,7 +350,7 @@ public class CorruptedFileIT extends ESIntegTestCase { mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes; @@ -356,7 +358,7 @@ public class CorruptedFileIT extends ESIntegTestCase { array[i] = (byte) ~array[i]; // flip one byte in the content hasCorrupted.countDown(); } - super.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } }); } @@ -420,7 +422,7 @@ public class CorruptedFileIT extends ESIntegTestCase { mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; if (truncate && req.length() > 1) { @@ -434,7 +436,7 @@ public class CorruptedFileIT extends ESIntegTestCase { array[i] = (byte) ~array[i]; // flip one byte in the content } } - super.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } }); } diff --git a/core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java b/core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java index aa9de8de871..373937784b3 100644 --- a/core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -99,14 +100,13 @@ public class ExceptionRetryIT extends ESIntegTestCase { dataNode.getNode().getName())); mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) { - @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { - super.sendRequest(node, requestId, action, request, options); + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + super.sendRequest(connection, requestId, action, request, options); if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) { logger.debug("Throw ConnectTransportException"); - throw new ConnectTransportException(node, action); + throw new ConnectTransportException(connection.getNode(), action); } } }); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index e0a6111833b..4e3c516aa72 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -640,16 +640,17 @@ public class IndexRecoveryIT extends ESIntegTestCase { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { if (recoveryActionToBlock.equals(action) || requestBlocked.getCount() == 0) { logger.info("--> preventing {} request", action); requestBlocked.countDown(); if (dropRequests) { return; } - throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request"); + throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request"); } - transport.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } } } diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index d8e7e7c4ac3..29845e5396d 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -56,6 +56,7 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -198,13 +199,13 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { final CountDownLatch shardActiveRequestSent = new CountDownLatch(1); transportServiceNode_1.addDelegate(transportServiceNode_2, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) { @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { if (action.equals("internal:index/shard/exists") && shardActiveRequestSent.getCount() > 0) { shardActiveRequestSent.countDown(); logger.info("prevent shard active request from being sent"); - throw new ConnectTransportException(node, "DISCONNECT: simulated"); + throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulated"); } - super.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } }); diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 2017b2796b4..f4de4bdea0b 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -533,20 +533,20 @@ public class RelocationIT extends ESIntegTestCase { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request; if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) { // corrupting the segments_N files in order to make sure future recovery re-send files - logger.debug("corrupting [{}] to {}. file name: [{}]", action, node, chunkRequest.name()); + logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name()); assert chunkRequest.content().toBytesRef().bytes == chunkRequest.content().toBytesRef().bytes : "no internal reference!!"; byte[] array = chunkRequest.content().toBytesRef().bytes; array[0] = (byte) ~array[0]; // flip one byte in the content corruptionCount.countDown(); } - transport.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } else { - transport.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } } } diff --git a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java index b81de043921..564eca54d8e 100644 --- a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java @@ -36,6 +36,8 @@ import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -121,7 +123,8 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk()); @@ -130,7 +133,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { throw new RuntimeException("Caused some truncated files for fun and profit"); } } - super.sendRequest(node, requestId, action, request, options); + super.sendRequest(connection, requestId, action, request, options); } }); } diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index 3df135df236..f081cfa45e4 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -193,7 +193,7 @@ public class TCPTransportTests extends ESTestCase { @Override protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException { - return new NodeChannels(new Object[profile.getNumConnections()], profile); + return new NodeChannels(node, new Object[profile.getNumConnections()], profile); } @Override @@ -207,13 +207,14 @@ public class TCPTransportTests extends ESTestCase { } @Override - protected Object nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { - return new NodeChannels(new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()], + public Connection getConnection(DiscoveryNode node) { + return new NodeChannels(node, new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()], ConnectionProfile.LIGHT_PROFILE); } }; DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT); - transport.sendRequest(node, 42, "foobar", request, TransportRequestOptions.EMPTY); + Transport.Connection connection = transport.getConnection(node); + connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY); assertTrue(called.get()); } finally { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index 45aca7fe2c0..16735f34efe 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -35,6 +35,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -97,7 +98,7 @@ public class TransportServiceHandshakeTests extends ESTestCase { threadPool = null; } - public void testConnectToNodeLight() { + public void testConnectToNodeLight() throws IOException { Settings settings = Settings.builder().put("cluster.name", "test").build(); NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT); @@ -155,29 +156,6 @@ public class TransportServiceHandshakeTests extends ESTestCase { assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } - public void testIgnoreMismatchedClusterName() { - Settings settings = Settings.builder().put("cluster.name", "a").build(); - - NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT); - NetworkHandle handleB = - startServices( - "TS_B", - Settings.builder().put("cluster.name", "b").build(), - VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT) - ); - DiscoveryNode discoveryNode = new DiscoveryNode( - "", - handleB.discoveryNode.getAddress(), - emptyMap(), - emptySet(), - Version.CURRENT.minimumCompatibilityVersion()); - DiscoveryNode connectedNode = handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout, false); - assertNotNull(connectedNode); - assertEquals(connectedNode.getName(), "TS_B"); - assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); - assertTrue(handleA.transportService.nodeConnected(discoveryNode)); - } - private static class NetworkHandle { private TransportService transportService; private DiscoveryNode discoveryNode; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index e1e497a4161..048b1f90015 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -46,7 +46,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; @@ -143,7 +142,6 @@ public class Netty4Transport extends TcpTransport { protected volatile Bootstrap bootstrap; protected final Map serverBootstraps = newConcurrentMap(); - @Inject public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); @@ -342,7 +340,7 @@ public class Netty4Transport extends TcpTransport { @Override protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) { final Channel[] channels = new Channel[profile.getNumConnections()]; - final NodeChannels nodeChannels = new NodeChannels(channels, profile); + final NodeChannels nodeChannels = new NodeChannels(node, channels, profile); boolean success = false; try { final TimeValue connectTimeout; diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 7dd4569dc56..809fff8bc0d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -187,12 +188,26 @@ public class CapturingTransport implements Transport { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) - throws IOException, TransportException { - requests.put(requestId, Tuple.tuple(node, action)); - capturedRequests.add(new CapturedRequest(node, requestId, action, request)); - } + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + return new Connection() { + @Override + public DiscoveryNode getNode() { + return node; + } + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + requests.put(requestId, Tuple.tuple(node, action)); + capturedRequests.add(new CapturedRequest(node, requestId, action, request)); + } + + @Override + public void close() throws IOException { + + } + }; + } @Override public void transportServiceAdapter(TransportServiceAdapter adapter) { @@ -263,4 +278,13 @@ public class CapturingTransport implements Transport { return Collections.emptyList(); } + @Override + public Connection getConnection(DiscoveryNode node) { + try { + return openConnection(node, null); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 2790b548b18..fad7203283a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -183,9 +183,9 @@ public final class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { - throw new ConnectTransportException(node, "DISCONNECT: simulated"); + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulated"); } }); } @@ -226,13 +226,13 @@ public final class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { if (blockedActions.contains(action)) { logger.info("--> preventing {} request", action); - throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request"); + throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request"); } - original.sendRequest(node, requestId, action, request, options); + connection.sendRequest(requestId, action, request, options); } }); } @@ -260,8 +260,8 @@ public final class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { // don't send anything, the receiving node is unresponsive } }); @@ -320,13 +320,12 @@ public final class MockTransportService extends TransportService { } @Override - public void sendRequest(final DiscoveryNode node, final long requestId, final String action, TransportRequest request, - final TransportRequestOptions options) throws IOException, TransportException { + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { // delayed sending - even if larger then the request timeout to simulated a potential late response from target node - TimeValue delay = getDelay(); if (delay.millis() <= 0) { - original.sendRequest(node, requestId, action, request, options); + connection.sendRequest(requestId, action, request, options); return; } @@ -348,7 +347,7 @@ public final class MockTransportService extends TransportService { @Override protected void doRun() throws IOException { if (requestSent.compareAndSet(false, true)) { - original.sendRequest(node, requestId, action, clonedRequest, options); + connection.sendRequest(requestId, action, clonedRequest, options); } } }; @@ -364,7 +363,6 @@ public final class MockTransportService extends TransportService { } } - @Override public void clearRule() { synchronized (this) { @@ -439,9 +437,13 @@ public final class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { - getTransport(node).sendRequest(node, requestId, action, request, options); + public Connection getConnection(DiscoveryNode node) { + return getTransport(node).getConnection(node); + } + + @Override + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + return getTransport(node).openConnection(node, profile); } } @@ -488,12 +490,6 @@ public final class MockTransportService extends TransportService { transport.disconnectFromNode(node); } - @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException, TransportException { - transport.sendRequest(node, requestId, action, request, options); - } - @Override public long serverOpen() { return transport.serverOpen(); @@ -504,6 +500,28 @@ public final class MockTransportService extends TransportService { return transport.getLocalAddresses(); } + @Override + public Connection getConnection(DiscoveryNode node) { + return new FilteredConnection(transport.getConnection(node)) { + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + DelegateTransport.this.sendRequest(connection, requestId, action, request, options); + } + }; + } + + @Override + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + return new FilteredConnection(transport.openConnection(node, profile)) { + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + DelegateTransport.this.sendRequest(connection, requestId, action, request, options); + } + }; + } + @Override public Lifecycle.State lifecycleState() { return transport.lifecycleState(); @@ -538,6 +556,11 @@ public final class MockTransportService extends TransportService { public Map profileBoundAddresses() { return transport.profileBoundAddresses(); } + + protected void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + connection.sendRequest(requestId, action, request, options); + } } /** @@ -641,4 +664,28 @@ public final class MockTransportService extends TransportService { } } } + + private static class FilteredConnection implements Transport.Connection { + protected final Transport.Connection connection; + + private FilteredConnection(Transport.Connection connection) { + this.connection = connection; + } + + @Override + public DiscoveryNode getNode() { + return connection.getNode(); + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + connection.sendRequest(requestId, action, request, options); + } + + @Override + public void close() throws IOException { + connection.close(); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 434990afba9..aa2271c9655 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -48,9 +48,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.sql.Time; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -568,17 +566,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); final String info = sender + "_" + iter; final DiscoveryNode node = nodeB; // capture now - serviceA.sendRequest(node, "test", new TestRequest(info), - new ActionListenerResponseHandler<>(listener, TestResponse::new)); try { - listener.actionGet(); - } catch (ConnectTransportException e) { - // ok! - } catch (Exception e) { - logger.error( - (Supplier) () -> new ParameterizedMessage("caught exception while sending to node {}", node), e); - sendingErrors.add(e); + serviceA.sendRequest(node, "test", new TestRequest(info), + new ActionListenerResponseHandler<>(listener, TestResponse::new)); + try { + listener.actionGet(); + } catch (ConnectTransportException e) { + // ok! + } catch (Exception e) { + logger.error( + (Supplier) () -> new ParameterizedMessage("caught exception while sending to node {}", node), e); + sendingErrors.add(e); + } + } catch (NodeNotConnectedException ex) { + // ok } + } } @@ -1184,16 +1187,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testVersionFrom1to1() throws Exception { serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, - new TransportRequestHandler() { - @Override - public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { - assertThat(request.value1, equalTo(1)); - assertThat(request.value2, equalTo(2)); - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; - channel.sendResponse(response); - } + (request, channel) -> { + assertThat(request.value1, equalTo(1)); + assertThat(request.value2, equalTo(2)); + Version1Response response = new Version1Response(); + response.value1 = 1; + response.value2 = 2; + channel.sendResponse(response); }); Version1Request version1Request = new Version1Request(); @@ -1266,7 +1266,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertThat(version0Response.value1, equalTo(1)); } - public void testMockFailToSendNoConnectRule() { + public void testMockFailToSendNoConnectRule() throws IOException { serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override @@ -1318,12 +1318,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { try { serviceB.connectToNodeAndHandshake(nodeA, 100); fail("exception should be thrown"); - } catch (ConnectTransportException e) { + } catch (IllegalStateException e) { // all is well } } - public void testMockUnresponsiveRule() { + public void testMockUnresponsiveRule() throws IOException { serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override @@ -1376,7 +1376,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { try { serviceB.connectToNodeAndHandshake(nodeA, 100); fail("exception should be thrown"); - } catch (ConnectTransportException e) { + } catch (IllegalStateException e) { // all is well } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java index 37ebebc64a6..0c6c18e1a20 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java @@ -19,7 +19,6 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; @@ -92,11 +91,11 @@ public final class AssertingTransportInterceptor implements TransportInterceptor public AsyncSender interceptSender(final AsyncSender sender) { return new AsyncSender() { @Override - public void sendRequest(DiscoveryNode node, String action, TransportRequest request, + public void sendRequest(Transport.Connection connection, String action, TransportRequest request, TransportRequestOptions options, final TransportResponseHandler handler) { assertVersionSerializable(request); - sender.sendRequest(node, action, request, options, new TransportResponseHandler() { + sender.sendRequest(connection, action, request, options, new TransportResponseHandler() { @Override public T newInstance() { return handler.newInstance(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index e9a97e030b2..0778c344bcc 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -161,7 +161,7 @@ public class MockTcpTransport extends TcpTransport @Override protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException { final MockChannel[] mockChannels = new MockChannel[1]; - final NodeChannels nodeChannels = new NodeChannels(mockChannels, ConnectionProfile.LIGHT_PROFILE); // we always use light here + final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, ConnectionProfile.LIGHT_PROFILE); // we always use light here boolean success = false; final Socket socket = new Socket(); try {