Detach handshake from connect to node (#22037)

Today we connect and publish the nodes connection before we execute a
handshake with the node we connect to. In the case of connecting to a node
that won't pass the handshake this connection is already `published` and other
code paths can use it. This commit detaches the connection and the publish of the
connection such that `TransportService` can do a handshake before actually connect
and publish the connection.
This commit is contained in:
Simon Willnauer 2016-12-10 10:03:26 +01:00 committed by GitHub
parent 3adefb7b4a
commit 01d67e09b9
28 changed files with 492 additions and 286 deletions

View File

@ -68,7 +68,6 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.io.StreamCorruptedException;
import java.net.BindException;
@ -81,7 +80,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -313,13 +311,15 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
public final class NodeChannels implements Closeable {
public final class NodeChannels implements Connection {
private final Map<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle> typeMapping
= new EnumMap<>(TransportRequestOptions.Type.class);
private final Channel[] channels;
private final AtomicBoolean establishedAllConnections = new AtomicBoolean(false);
private final DiscoveryNode node;
private final AtomicBoolean closed = new AtomicBoolean(false);
public NodeChannels(Channel[] channels, ConnectionProfile connectionProfile) {
public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile) {
this.node = node;
this.channels = channels;
assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == "
+ connectionProfile.getNumConnections() + " but was: [" + channels.length + "]";
@ -329,12 +329,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
public void connectionsEstablished() {
if (establishedAllConnections.compareAndSet(false, true) == false) {
throw new AssertionError("connected more than once");
}
}
public boolean hasChannel(Channel channel) {
for (Channel channel1 : channels) {
@ -346,15 +340,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
public List<Channel> getChannels() {
if (establishedAllConnections.get()) { // don't expose the channels until we are connected
return Arrays.asList(channels);
} else {
return Collections.emptyList();
}
return Arrays.asList(channels);
}
public Channel channel(TransportRequestOptions.Type type) {
assert establishedAllConnections.get();
ConnectionProfile.ConnectionTypeHandle connectionTypeHandle = typeMapping.get(type);
if (connectionTypeHandle == null) {
throw new IllegalArgumentException("no type channel for [" + type + "]");
@ -364,7 +353,24 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public synchronized void close() throws IOException {
closeChannels(Arrays.asList(channels).stream().filter(Objects::nonNull).collect(Collectors.toList()));
if (closed.compareAndSet(false, true)) {
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()));
}
}
@Override
public DiscoveryNode getNode() {
return this.node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (closed.get()) {
throw new NodeNotConnectedException(node, "connection already closed");
}
Channel channel = channel(options.type());
sendRequestToChannel(this.node, channel, requestId, action, request, options);
}
}
@ -395,20 +401,18 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
try {
try {
nodeChannels = connectToChannels(node, connectionProfile);
nodeChannels = openConnection(node, connectionProfile);
} catch (Exception e) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to connect to [{}], cleaning dangling connections", node), e);
throw e;
}
// we acquire a connection lock, so no way there is an existing connection
nodeChannels.connectionsEstablished();
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
transportServiceAdapter.raiseNodeConnected(node);
transportServiceAdapter.onNodeConnected(node);
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
@ -419,6 +423,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
globalLock.readLock().unlock();
}
}
@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
NodeChannels nodeChannels = connectToChannels(node, profile);
transportServiceAdapter.onConnectionOpened(node);
return nodeChannels;
}
/**
* Disconnects from a node, only if the relevant channel is found to be part of the node channels.
*/
@ -432,13 +444,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
// check again within the connection lock, if its still applicable to remove it
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
try {
logger.debug("disconnecting from [{}], {}", node, reason);
IOUtils.closeWhileHandlingException(nodeChannels);
} finally {
logger.trace("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node);
}
closeAndNotify(node, nodeChannels, reason);
return true;
}
}
@ -446,6 +452,16 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return false;
}
private void closeAndNotify(DiscoveryNode node, NodeChannels nodeChannels, String reason) {
try {
logger.debug("disconnecting from [{}], {}", node, reason);
IOUtils.closeWhileHandlingException(nodeChannels);
} finally {
logger.trace("disconnected from [{}], {}", node, reason);
transportServiceAdapter.onNodeDisconnected(node);
}
}
/**
* Disconnects from a node if a channel is found as part of that nodes channels.
*/
@ -469,12 +485,13 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
});
}
protected Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
@Override
public Connection getConnection(DiscoveryNode node) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
return nodeChannels.channel(options.type());
return nodeChannels;
}
@Override
@ -482,13 +499,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
try (Releasable ignored = connectionLock.acquire(node.getId())) {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
try {
logger.debug("disconnecting from [{}] due to explicit disconnect call", node);
IOUtils.closeWhileHandlingException(nodeChannels);
} finally {
logger.trace("disconnected from [{}] due to explicit disconnect call", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
closeAndNotify(node, nodeChannels, "due to explicit disconnect call");
}
}
}
@ -883,10 +894,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return compress && (!(request instanceof BytesTransportRequest));
}
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
Channel targetChannel = nodeChannel(node, options);
protected void sendRequestToChannel(DiscoveryNode node, Channel targetChannel, final long requestId, final String action,
final TransportRequest request, TransportRequestOptions options) throws IOException,
TransportException {
if (compress) {
options = TransportRequestOptions.builder(options).withCompress(true).build();
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import java.io.Closeable;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;
@ -35,7 +36,6 @@ import java.util.Map;
public interface Transport extends LifecycleComponent {
Setting<Boolean> TRANSPORT_TCP_COMPRESS = Setting.boolSetting("transport.tcp.compress", false, Property.NodeScope);
void transportServiceAdapter(TransportServiceAdapter service);
@ -72,13 +72,6 @@ public interface Transport extends LifecycleComponent {
*/
void disconnectFromNode(DiscoveryNode node);
/**
* Sends the request to the node.
* @throws NodeNotConnectedException if the given node is not connected
*/
void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws
IOException, TransportException;
/**
* Returns count of currently open connections
*/
@ -89,4 +82,39 @@ public interface Transport extends LifecycleComponent {
default CircuitBreaker getInFlightRequestBreaker() {
return new NoopCircuitBreaker("in-flight-noop");
}
/**
* Returns a connection for the given node if the node is connected.
* Connections returned from this method must not be closed. The lifecylce of this connection is maintained by the Transport
* implementation.
*
* @throws NodeNotConnectedException if the node is not connected
* @see #connectToNode(DiscoveryNode, ConnectionProfile)
*/
Connection getConnection(DiscoveryNode node);
/**
* Opens a new connection to the given node and returns it. In contrast to {@link #connectToNode(DiscoveryNode, ConnectionProfile)}
* the returned connection is not managed by the transport implementation. This connection must be closed once it's not needed anymore.
* This connection type can be used to execute a handshake between two nodes before the node will be published via
* {@link #connectToNode(DiscoveryNode, ConnectionProfile)}.
*/
Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException;
/**
* A unidirectional connection to a {@link DiscoveryNode}
*/
interface Connection extends Closeable {
/**
* The node this connection is associated with
*/
DiscoveryNode getNode();
/**
* Sends the request to the node this connection is associated with
* @throws NodeNotConnectedException if the given node is not connected
*/
void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws
IOException, TransportException;
}
}

View File

@ -23,7 +23,18 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
public interface TransportConnectionListener {
void onNodeConnected(DiscoveryNode node);
/**
* Called once a node connection is opened and registered.
*/
default void onNodeConnected(DiscoveryNode node) {}
void onNodeDisconnected(DiscoveryNode node);
/**
* Called once a node connection is closed and unregistered.
*/
default void onNodeDisconnected(DiscoveryNode node) {}
/**
* Called once a node connection is opened.
*/
default void onConnectionOpened(DiscoveryNode node) {}
}

View File

@ -51,10 +51,11 @@ public interface TransportInterceptor {
/**
* A simple interface to decorate
* {@link #sendRequest(DiscoveryNode, String, TransportRequest, TransportRequestOptions, TransportResponseHandler)}
* {@link #sendRequest(Transport.Connection, String, TransportRequest, TransportRequestOptions, TransportResponseHandler)}
*/
interface AsyncSender {
<T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportRequestOptions options, TransportResponseHandler<T> handler);
<T extends TransportResponse> void sendRequest(Transport.Connection connection, final String action,
final TransportRequest request, final TransportRequestOptions options,
TransportResponseHandler<T> handler);
}
}

View File

@ -121,6 +121,22 @@ public class TransportService extends AbstractLifecycleComponent {
/** if set will call requests sent to this id to shortcut and executed locally */
volatile DiscoveryNode localNode = null;
private final Transport.Connection localNodeConnection = new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return localNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
sendLocalRequest(requestId, action, request);
}
@Override
public void close() throws IOException {
}
};
/**
* Build the service.
@ -297,22 +313,6 @@ public class TransportService extends AbstractLifecycleComponent {
transport.connectToNode(node, connectionProfile);
}
/**
* Lightly connect to the specified node, and handshake cluster
* name and version
*
* @param node the node to connect to
* @param handshakeTimeout handshake timeout
* @return the connected node with version set
* @throws ConnectTransportException if the connection or the
* handshake failed
*/
public DiscoveryNode connectToNodeAndHandshake(
final DiscoveryNode node,
final long handshakeTimeout) throws ConnectTransportException {
return connectToNodeAndHandshake(node, handshakeTimeout, true);
}
/**
* Lightly connect to the specified node, returning updated node
* information. The handshake will fail if the cluster name on the
@ -321,50 +321,45 @@ public class TransportService extends AbstractLifecycleComponent {
*
* @param node the node to connect to
* @param handshakeTimeout handshake timeout
* @param checkClusterName whether or not to ignore cluster name
* mismatches
* @return the connected node
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
*/
public DiscoveryNode connectToNodeAndHandshake(
final DiscoveryNode node,
final long handshakeTimeout,
final boolean checkClusterName) {
final DiscoveryNode node,
final long handshakeTimeout) throws IOException {
if (node.equals(localNode)) {
return localNode;
}
transport.connectToNode(node, ConnectionProfile.LIGHT_PROFILE);
try {
return handshake(node, handshakeTimeout, checkClusterName);
} catch (ConnectTransportException | IllegalStateException e) {
transport.disconnectFromNode(node);
throw e;
DiscoveryNode handshakeNode;
try (Transport.Connection connection = transport.openConnection(node, ConnectionProfile.LIGHT_PROFILE)) {
handshakeNode = handshake(connection, handshakeTimeout);
}
connectToNode(node, ConnectionProfile.LIGHT_PROFILE);
return handshakeNode;
}
private DiscoveryNode handshake(
final DiscoveryNode node,
final long handshakeTimeout,
final boolean checkClusterName) throws ConnectTransportException {
final Transport.Connection connection,
final long handshakeTimeout) throws ConnectTransportException {
final HandshakeResponse response;
final DiscoveryNode node = connection.getNode();
try {
response = this.submitRequest(
node,
HANDSHAKE_ACTION_NAME,
HandshakeRequest.INSTANCE,
TransportRequestOptions.builder().withTimeout(handshakeTimeout).build(),
PlainTransportFuture<HandshakeResponse> futureHandler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<HandshakeResponse>() {
@Override
public HandshakeResponse newInstance() {
return new HandshakeResponse();
}
}).txGet();
@Override
public HandshakeResponse newInstance() {
return new HandshakeResponse();
}
});
sendRequest(connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE,
TransportRequestOptions.builder().withTimeout(handshakeTimeout).build(), futureHandler);
response = futureHandler.txGet();
} catch (Exception e) {
throw new IllegalStateException("handshake failed with " + node, e);
}
if (checkClusterName && !Objects.equals(clusterName, response.clusterName)) {
if (!Objects.equals(clusterName, response.clusterName)) {
throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node);
} else if (response.version.isCompatible((localNode != null ? localNode.getVersion() : Version.CURRENT)) == false) {
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
@ -437,21 +432,59 @@ public class TransportService extends AbstractLifecycleComponent {
TransportRequestOptions options,
TransportResponseHandler<T> handler) throws TransportException {
PlainTransportFuture<T> futureHandler = new PlainTransportFuture<>(handler);
sendRequest(node, action, request, options, futureHandler);
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, options, futureHandler);
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
futureHandler.handleException(ex);
}
return futureHandler;
}
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportResponseHandler<T> handler) {
sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
asyncSender.sendRequest(node, action, request, options, handler);
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}
final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
asyncSender.sendRequest(connection, action, request, options, handler);
}
/**
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
* @throws NodeNotConnectedException if the given node is not connected
*/
private Transport.Connection getConnection(DiscoveryNode node) {
if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) {
return localNodeConnection;
} else {
return transport.getConnection(node);
}
}
public <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
@ -467,21 +500,26 @@ public class TransportService extends AbstractLifecycleComponent {
request.setParentTask(localNode.getId(), parentTask.getId());
try {
taskManager.registerChildTask(parentTask, node.getId());
sendRequest(node, action, request, options, handler);
final Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, options, handler);
} catch (TaskCancelledException ex) {
// The parent task is already cancelled - just fail the request
handler.handleException(new TransportException(ex));
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}
private <T extends TransportResponse> void sendRequestInternal(final DiscoveryNode node, final String action,
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
if (node == null) {
throw new IllegalStateException("can't send request to a null node");
if (connection == null) {
throw new IllegalStateException("can't send request to a null connection");
}
DiscoveryNode node = connection.getNode();
final long requestId = newRequestId();
final TimeoutHandler timeoutHandler;
try {
@ -493,7 +531,7 @@ public class TransportService extends AbstractLifecycleComponent {
}
TransportResponseHandler<T> responseHandler =
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().newStoredContext(), handler);
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, node, action, timeoutHandler));
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection.getNode(), action, timeoutHandler));
if (lifecycle.stoppedOrClosed()) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify
// the caller. It will only notify if the toStop code hasn't done the work yet.
@ -503,11 +541,7 @@ public class TransportService extends AbstractLifecycleComponent {
assert options.timeout() != null;
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
}
if (node.equals(localNode)) {
sendLocalRequest(requestId, action, request);
} else {
transport.sendRequest(node, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
} catch (final Exception e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
@ -595,7 +629,6 @@ public class TransportService extends AbstractLifecycleComponent {
"failed to notify channel of error message for action [{}]", action), inner);
}
}
}
private boolean shouldTraceAction(String action) {
@ -777,7 +810,7 @@ public class TransportService extends AbstractLifecycleComponent {
}
@Override
public void raiseNodeConnected(final DiscoveryNode node) {
public void onNodeConnected(final DiscoveryNode node) {
threadPool.generic().execute(() -> {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeConnected(node);
@ -786,11 +819,22 @@ public class TransportService extends AbstractLifecycleComponent {
}
@Override
public void raiseNodeDisconnected(final DiscoveryNode node) {
try {
for (final TransportConnectionListener connectionListener : connectionListeners) {
threadPool.generic().execute(() -> connectionListener.onNodeDisconnected(node));
public void onConnectionOpened(DiscoveryNode node) {
threadPool.generic().execute(() -> {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onConnectionOpened(node);
}
});
}
@Override
public void onNodeDisconnected(final DiscoveryNode node) {
try {
threadPool.generic().execute( () -> {
for (final TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeDisconnected(node);
}
});
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {

View File

@ -21,7 +21,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
public interface TransportServiceAdapter {
public interface TransportServiceAdapter extends TransportConnectionListener {
void addBytesReceived(long size);
@ -50,9 +50,4 @@ public interface TransportServiceAdapter {
void onRequestReceived(long requestId, String action);
RequestHandlerRegistry getRequestHandler(String action);
void raiseNodeConnected(DiscoveryNode node);
void raiseNodeDisconnected(DiscoveryNode node);
}

View File

@ -72,48 +72,68 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
protected abstract ClusterState getMockClusterState(DiscoveryNode node);
@Override
@SuppressWarnings("unchecked")
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
if (TransportLivenessAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY),
node));
} else if (ClusterStateAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
ClusterState clusterState = getMockClusterState(node);
transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState));
} else {
throw new UnsupportedOperationException("Mock transport does not understand action " + action);
public Connection getConnection(DiscoveryNode node) {
return new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
return;
}
//once nodes are connected we'll just return errors for each sendRequest call
triedNodes.add(node);
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (random.nextInt(100) > 10) {
connectTransportExceptions.incrementAndGet();
throw new ConnectTransportException(node, "node not available");
} else {
if (random.nextBoolean()) {
failures.incrementAndGet();
//throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalStateException();
} else {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
if (random.nextBoolean()) {
successes.incrementAndGet();
transportResponseHandler.handleResponse(newResponse());
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
if (TransportLivenessAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.
getDefault(Settings.EMPTY),
node));
} else if (ClusterStateAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
ClusterState clusterState = getMockClusterState(node);
transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState));
} else {
throw new UnsupportedOperationException("Mock transport does not understand action " + action);
}
return;
}
//once nodes are connected we'll just return errors for each sendRequest call
triedNodes.add(node);
if (random.nextInt(100) > 10) {
connectTransportExceptions.incrementAndGet();
throw new ConnectTransportException(node, "node not available");
} else {
failures.incrementAndGet();
transportResponseHandler.handleException(new TransportException("transport exception"));
if (random.nextBoolean()) {
failures.incrementAndGet();
//throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalStateException();
} else {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.onResponseReceived(requestId);
if (random.nextBoolean()) {
successes.incrementAndGet();
transportResponseHandler.handleResponse(newResponse());
} else {
failures.incrementAndGet();
transportResponseHandler.handleException(new TransportException("transport exception"));
}
}
}
}
}
@Override
public void close() throws IOException {
}
};
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return getConnection(node);
}
protected abstract Response newResponse();

View File

@ -41,6 +41,7 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
@ -149,12 +150,14 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler) {
if (TransportLivenessAction.NAME.equals(action)) {
assertHeaders(threadPool);
((TransportResponseHandler<LivenessResponse>) handler).handleResponse(
new LivenessResponse(new ClusterName("cluster1"), node));
new LivenessResponse(new ClusterName("cluster1"), connection.getNode()));
return;
}
if (ClusterStateAction.NAME.equals(action)) {

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
@ -129,12 +130,15 @@ public class TransportClientNodesServiceTests extends ESTestCase {
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler) {
if (TransportLivenessAction.NAME.equals(action)) {
sender.sendRequest(node, action, request, options, wrapLivenessResponseHandler(handler, node, clusterName));
sender.sendRequest(connection, action, request, options, wrapLivenessResponseHandler(handler,
connection.getNode(), clusterName));
} else {
sender.sendRequest(node, action, request, options, handler);
sender.sendRequest(connection, action, request, options, handler);
}
}
};

View File

@ -48,6 +48,8 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -196,15 +198,15 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
for (DiscoveryNode node : internalTestCluster.clusterService().state().getNodes()) {
mockTransportService.addDelegate(internalTestCluster.getInstance(TransportService.class, node.getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (blockedActions.contains(action)) {
if (timeout.get()) {
logger.info("dropping [{}] to [{}]", action, node);
return;
}
}
super.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
});
}

View File

@ -214,9 +214,29 @@ public class NodeConnectionsServiceTests extends ESTestCase {
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
public Connection getConnection(DiscoveryNode node) {
return new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
}
@Override
public void close() throws IOException {
}
};
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return getConnection(node);
}
@Override

View File

@ -84,7 +84,9 @@ import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -953,13 +955,19 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService
.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions
options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
countDownLatch.countDown();
}
super.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return super.openConnection(node, profile);
}
});
countDownLatch.await();

View File

@ -579,6 +579,10 @@ public class UnicastZenPingTests extends ESTestCase {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onConnectionOpened(DiscoveryNode node) {
counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger());
counters.get(node.getAddress()).incrementAndGet();
}

View File

@ -57,6 +57,8 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -493,14 +495,13 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action,
TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (keepFailing.get() && action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) {
logger.info("--> failing translog ops");
throw new ElasticsearchException("failing on purpose");
}
super.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
});

View File

@ -73,6 +73,8 @@ import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -348,7 +350,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes;
@ -356,7 +358,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
array[i] = (byte) ~array[i]; // flip one byte in the content
hasCorrupted.countDown();
}
super.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
});
}
@ -420,7 +422,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
if (truncate && req.length() > 1) {
@ -434,7 +436,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
array[i] = (byte) ~array[i]; // flip one byte in the content
}
}
super.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
});
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -99,14 +100,13 @@ public class ExceptionRetryIT extends ESIntegTestCase {
dataNode.getNode().getName()));
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
super.sendRequest(node, requestId, action, request, options);
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
super.sendRequest(connection, requestId, action, request, options);
if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) {
logger.debug("Throw ConnectTransportException");
throw new ConnectTransportException(node, action);
throw new ConnectTransportException(connection.getNode(), action);
}
}
});

View File

@ -640,16 +640,17 @@ public class IndexRecoveryIT extends ESIntegTestCase {
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (recoveryActionToBlock.equals(action) || requestBlocked.getCount() == 0) {
logger.info("--> preventing {} request", action);
requestBlocked.countDown();
if (dropRequests) {
return;
}
throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request");
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
}
transport.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
}
}

View File

@ -56,6 +56,7 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -198,13 +199,13 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
final CountDownLatch shardActiveRequestSent = new CountDownLatch(1);
transportServiceNode_1.addDelegate(transportServiceNode_2, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
if (action.equals("internal:index/shard/exists") && shardActiveRequestSent.getCount() > 0) {
shardActiveRequestSent.countDown();
logger.info("prevent shard active request from being sent");
throw new ConnectTransportException(node, "DISCONNECT: simulated");
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulated");
}
super.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
});

View File

@ -533,20 +533,20 @@ public class RelocationIT extends ESIntegTestCase {
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
// corrupting the segments_N files in order to make sure future recovery re-send files
logger.debug("corrupting [{}] to {}. file name: [{}]", action, node, chunkRequest.name());
logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name());
assert chunkRequest.content().toBytesRef().bytes == chunkRequest.content().toBytesRef().bytes : "no internal reference!!";
byte[] array = chunkRequest.content().toBytesRef().bytes;
array[0] = (byte) ~array[0]; // flip one byte in the content
corruptionCount.countDown();
}
transport.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
} else {
transport.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
}
}

View File

@ -36,6 +36,8 @@ import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -121,7 +123,8 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk());
@ -130,7 +133,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
throw new RuntimeException("Caused some truncated files for fun and profit");
}
}
super.sendRequest(node, requestId, action, request, options);
super.sendRequest(connection, requestId, action, request, options);
}
});
}

View File

@ -193,7 +193,7 @@ public class TCPTransportTests extends ESTestCase {
@Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return new NodeChannels(new Object[profile.getNumConnections()], profile);
return new NodeChannels(node, new Object[profile.getNumConnections()], profile);
}
@Override
@ -207,13 +207,14 @@ public class TCPTransportTests extends ESTestCase {
}
@Override
protected Object nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
return new NodeChannels(new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()],
public Connection getConnection(DiscoveryNode node) {
return new NodeChannels(node, new Object[ConnectionProfile.LIGHT_PROFILE.getNumConnections()],
ConnectionProfile.LIGHT_PROFILE);
}
};
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
transport.sendRequest(node, 42, "foobar", request, TransportRequestOptions.EMPTY);
Transport.Connection connection = transport.getConnection(node);
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
assertTrue(called.get());
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);

View File

@ -35,6 +35,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -97,7 +98,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
threadPool = null;
}
public void testConnectToNodeLight() {
public void testConnectToNodeLight() throws IOException {
Settings settings = Settings.builder().put("cluster.name", "test").build();
NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT);
@ -155,29 +156,6 @@ public class TransportServiceHandshakeTests extends ESTestCase {
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
public void testIgnoreMismatchedClusterName() {
Settings settings = Settings.builder().put("cluster.name", "a").build();
NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT);
NetworkHandle handleB =
startServices(
"TS_B",
Settings.builder().put("cluster.name", "b").build(),
VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT)
);
DiscoveryNode discoveryNode = new DiscoveryNode(
"",
handleB.discoveryNode.getAddress(),
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode connectedNode = handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout, false);
assertNotNull(connectedNode);
assertEquals(connectedNode.getName(), "TS_B");
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
}
private static class NetworkHandle {
private TransportService transportService;
private DiscoveryNode discoveryNode;

View File

@ -46,7 +46,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService;
@ -143,7 +142,6 @@ public class Netty4Transport extends TcpTransport<Channel> {
protected volatile Bootstrap bootstrap;
protected final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
@Inject
public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
@ -342,7 +340,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
@Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) {
final Channel[] channels = new Channel[profile.getNumConnections()];
final NodeChannels nodeChannels = new NodeChannels(channels, profile);
final NodeChannels nodeChannels = new NodeChannels(node, channels, profile);
boolean success = false;
try {
final TimeValue connectTimeout;

View File

@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportServiceAdapter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
@ -187,12 +188,26 @@ public class CapturingTransport implements Transport {
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
@Override
public void close() throws IOException {
}
};
}
@Override
public void transportServiceAdapter(TransportServiceAdapter adapter) {
@ -263,4 +278,13 @@ public class CapturingTransport implements Transport {
return Collections.emptyList();
}
@Override
public Connection getConnection(DiscoveryNode node) {
try {
return openConnection(node, null);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

View File

@ -183,9 +183,9 @@ public final class MockTransportService extends TransportService {
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
throw new ConnectTransportException(node, "DISCONNECT: simulated");
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulated");
}
});
}
@ -226,13 +226,13 @@ public final class MockTransportService extends TransportService {
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (blockedActions.contains(action)) {
logger.info("--> preventing {} request", action);
throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request");
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
}
original.sendRequest(node, requestId, action, request, options);
connection.sendRequest(requestId, action, request, options);
}
});
}
@ -260,8 +260,8 @@ public final class MockTransportService extends TransportService {
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
// don't send anything, the receiving node is unresponsive
}
});
@ -320,13 +320,12 @@ public final class MockTransportService extends TransportService {
}
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, TransportRequest request,
final TransportRequestOptions options) throws IOException, TransportException {
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
// delayed sending - even if larger then the request timeout to simulated a potential late response from target node
TimeValue delay = getDelay();
if (delay.millis() <= 0) {
original.sendRequest(node, requestId, action, request, options);
connection.sendRequest(requestId, action, request, options);
return;
}
@ -348,7 +347,7 @@ public final class MockTransportService extends TransportService {
@Override
protected void doRun() throws IOException {
if (requestSent.compareAndSet(false, true)) {
original.sendRequest(node, requestId, action, clonedRequest, options);
connection.sendRequest(requestId, action, clonedRequest, options);
}
}
};
@ -364,7 +363,6 @@ public final class MockTransportService extends TransportService {
}
}
@Override
public void clearRule() {
synchronized (this) {
@ -439,9 +437,13 @@ public final class MockTransportService extends TransportService {
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
getTransport(node).sendRequest(node, requestId, action, request, options);
public Connection getConnection(DiscoveryNode node) {
return getTransport(node).getConnection(node);
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return getTransport(node).openConnection(node, profile);
}
}
@ -488,12 +490,6 @@ public final class MockTransportService extends TransportService {
transport.disconnectFromNode(node);
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
transport.sendRequest(node, requestId, action, request, options);
}
@Override
public long serverOpen() {
return transport.serverOpen();
@ -504,6 +500,28 @@ public final class MockTransportService extends TransportService {
return transport.getLocalAddresses();
}
@Override
public Connection getConnection(DiscoveryNode node) {
return new FilteredConnection(transport.getConnection(node)) {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
DelegateTransport.this.sendRequest(connection, requestId, action, request, options);
}
};
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return new FilteredConnection(transport.openConnection(node, profile)) {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
DelegateTransport.this.sendRequest(connection, requestId, action, request, options);
}
};
}
@Override
public Lifecycle.State lifecycleState() {
return transport.lifecycleState();
@ -538,6 +556,11 @@ public final class MockTransportService extends TransportService {
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return transport.profileBoundAddresses();
}
protected void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
connection.sendRequest(requestId, action, request, options);
}
}
/**
@ -641,4 +664,28 @@ public final class MockTransportService extends TransportService {
}
}
}
private static class FilteredConnection implements Transport.Connection {
protected final Transport.Connection connection;
private FilteredConnection(Transport.Connection connection) {
this.connection = connection;
}
@Override
public DiscoveryNode getNode() {
return connection.getNode();
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void close() throws IOException {
connection.close();
}
}
}

View File

@ -48,9 +48,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -568,17 +566,22 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final String info = sender + "_" + iter;
final DiscoveryNode node = nodeB; // capture now
serviceA.sendRequest(node, "test", new TestRequest(info),
new ActionListenerResponseHandler<>(listener, TestResponse::new));
try {
listener.actionGet();
} catch (ConnectTransportException e) {
// ok!
} catch (Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage("caught exception while sending to node {}", node), e);
sendingErrors.add(e);
serviceA.sendRequest(node, "test", new TestRequest(info),
new ActionListenerResponseHandler<>(listener, TestResponse::new));
try {
listener.actionGet();
} catch (ConnectTransportException e) {
// ok!
} catch (Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage("caught exception while sending to node {}", node), e);
sendingErrors.add(e);
}
} catch (NodeNotConnectedException ex) {
// ok
}
}
}
@ -1184,16 +1187,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testVersionFrom1to1() throws Exception {
serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME,
new TransportRequestHandler<Version1Request>() {
@Override
public void messageReceived(Version1Request request, TransportChannel channel) throws Exception {
assertThat(request.value1, equalTo(1));
assertThat(request.value2, equalTo(2));
Version1Response response = new Version1Response();
response.value1 = 1;
response.value2 = 2;
channel.sendResponse(response);
}
(request, channel) -> {
assertThat(request.value1, equalTo(1));
assertThat(request.value2, equalTo(2));
Version1Response response = new Version1Response();
response.value1 = 1;
response.value2 = 2;
channel.sendResponse(response);
});
Version1Request version1Request = new Version1Request();
@ -1266,7 +1266,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertThat(version0Response.value1, equalTo(1));
}
public void testMockFailToSendNoConnectRule() {
public void testMockFailToSendNoConnectRule() throws IOException {
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
@ -1318,12 +1318,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
try {
serviceB.connectToNodeAndHandshake(nodeA, 100);
fail("exception should be thrown");
} catch (ConnectTransportException e) {
} catch (IllegalStateException e) {
// all is well
}
}
public void testMockUnresponsiveRule() {
public void testMockUnresponsiveRule() throws IOException {
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
@ -1376,7 +1376,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
try {
serviceB.connectToNodeAndHandshake(nodeA, 100);
fail("exception should be thrown");
} catch (ConnectTransportException e) {
} catch (IllegalStateException e) {
// all is well
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
@ -92,11 +91,11 @@ public final class AssertingTransportInterceptor implements TransportInterceptor
public AsyncSender interceptSender(final AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request,
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
assertVersionSerializable(request);
sender.sendRequest(node, action, request, options, new TransportResponseHandler<T>() {
sender.sendRequest(connection, action, request, options, new TransportResponseHandler<T>() {
@Override
public T newInstance() {
return handler.newInstance();

View File

@ -161,7 +161,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
@Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
final MockChannel[] mockChannels = new MockChannel[1];
final NodeChannels nodeChannels = new NodeChannels(mockChannels, ConnectionProfile.LIGHT_PROFILE); // we always use light here
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, ConnectionProfile.LIGHT_PROFILE); // we always use light here
boolean success = false;
final Socket socket = new Socket();
try {