diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index a40284463a6..0617ecd64bb 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -20,8 +20,6 @@ package org.elasticsearch.transport; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; -import com.carrotsearch.hppc.LongObjectHashMap; -import com.carrotsearch.hppc.LongObjectMap; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; @@ -88,7 +86,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -180,7 +180,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i private final String transportName; protected final ConnectionProfile defaultConnectionProfile; - private final LongObjectMap> pendingHandshakes = new LongObjectHashMap<>(); + private final ConcurrentMap pendingHandshakes = new ConcurrentHashMap<>(); private final AtomicLong requestIdGenerator = new AtomicLong(); private final CounterMetric numHandshakes = new CounterMetric(); private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake"; @@ -242,6 +242,51 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i this.transportServiceAdapter = service; } + private static class HandshakeResponseHandler implements TransportResponseHandler { + final AtomicReference versionRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean handshakeNotSupported = new AtomicBoolean(false); + final AtomicReference exceptionRef = new AtomicReference<>(); + final Channel channel; + + public HandshakeResponseHandler(Channel channel) { + this.channel = channel; + } + + @Override + public VersionHandshakeResponse newInstance() { + return new VersionHandshakeResponse(); + } + + @Override + public void handleResponse(VersionHandshakeResponse response) { + final boolean success = versionRef.compareAndSet(null, response.version); + assert success; + latch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + Throwable cause = exp.getCause(); + if (cause != null + && cause instanceof ActionNotFoundTransportException + // this will happen if we talk to a node (pre 5.2) that doesn't have a handshake handler + // we will just treat the node as a 5.0.0 node unless the discovery node that is used to connect has a higher version. + && cause.getMessage().equals("No handler for action [internal:tcp/handshake]")) { + handshakeNotSupported.set(true); + } else { + final boolean success = exceptionRef.compareAndSet(null, exp); + assert success; + } + latch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + public class ScheduledPing extends AbstractLifecycleRunnable { /** @@ -462,9 +507,17 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { - NodeChannels nodeChannels = connectToChannels(node, profile); - transportServiceAdapter.onConnectionOpened(node); - return nodeChannels; + try { + NodeChannels nodeChannels = connectToChannels(node, profile); + transportServiceAdapter.onConnectionOpened(node); + return nodeChannels; + } catch (ConnectTransportException e) { + throw e; + } catch (Exception e) { + // ConnectTransportExceptions are handled specifically on the caller end - we wrap the actual exception to ensure + // only relevant exceptions are logged on the caller end.. this is the same as in connectToNode + throw new ConnectTransportException(node, "general node connection failure", e); + } } /** @@ -1466,47 +1519,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i // pkg private for testing final Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference versionRef = new AtomicReference<>(); - AtomicReference exceptionRef = new AtomicReference<>(); - AtomicBoolean handshakeNotSupported = new AtomicBoolean(false); numHandshakes.inc(); final long requestId = newRequestId(); - pendingHandshakes.put(requestId, new TransportResponseHandler() { - - @Override - public VersionHandshakeResponse newInstance() { - return new VersionHandshakeResponse(); - } - - @Override - public void handleResponse(VersionHandshakeResponse response) { - final boolean success = versionRef.compareAndSet(null, response.version); - assert success; - latch.countDown(); - } - - @Override - public void handleException(TransportException exp) { - Throwable cause = exp.getCause(); - if (cause != null - && cause instanceof ActionNotFoundTransportException - // this will happen if we talk to a node (pre 5.2) that doesn't haven a handshake handler - // we will just treat the node as a 5.0.0 node unless the discovery node that is used to connect has a higher version. - && cause.getMessage().equals("No handler for action [internal:tcp/handshake]")) { - handshakeNotSupported.set(true); - } else { - final boolean success = exceptionRef.compareAndSet(null, exp); - assert success; - } - latch.countDown(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); + final HandshakeResponseHandler handler = new HandshakeResponseHandler(channel); + AtomicReference versionRef = handler.versionRef; + AtomicReference exceptionRef = handler.exceptionRef; + pendingHandshakes.put(requestId, handler); boolean success = false; try { // for the request we use the minCompatVersion since we don't know what's the version of the node we talk to @@ -1515,11 +1533,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i final Version minCompatVersion = getCurrentVersion().minimumCompatibilityVersion(); sendRequestToChannel(node, channel, requestId, HANDSHAKE_ACTION_NAME, TransportRequest.Empty.INSTANCE, TransportRequestOptions.EMPTY, minCompatVersion, TransportStatus.setHandshake((byte)0)); - if (latch.await(timeout.millis(), TimeUnit.MILLISECONDS) == false) { + if (handler.latch.await(timeout.millis(), TimeUnit.MILLISECONDS) == false) { throw new ConnectTransportException(node, "handshake_timeout[" + timeout + "]"); } success = true; - if (handshakeNotSupported.get()) { + if (handler.handshakeNotSupported.get()) { // this is a BWC layer, if we talk to a pre 5.2 node then the handshake is not supported // this will go away in master once it's all ported to 5.2 but for now we keep this to make // the backport straight forward @@ -1555,4 +1573,18 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i public long newRequestId() { return requestIdGenerator.incrementAndGet(); } + + /** + * Called by sub-classes for each channel that is closed + */ + protected final void onChannelClosed(Channel channel) { + Optional> first = pendingHandshakes.entrySet().stream() + .filter((entry) -> entry.getValue().channel == channel).findFirst(); + if(first.isPresent()) { + final Long requestId = first.get().getKey(); + HandshakeResponseHandler handler = first.get().getValue(); + pendingHandshakes.remove(requestId); + handler.handleException(new TransportException("connection reset")); + } + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 048b1f90015..d92313fb0bb 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -385,7 +385,6 @@ public class Netty4Transport extends TcpTransport { } throw e; } - onAfterChannelsConnected(nodeChannels); success = true; } finally { if (success == false) { @@ -399,14 +398,6 @@ public class Netty4Transport extends TcpTransport { return nodeChannels; } - /** - * Allows for logic to be executed after a connection has been made on all channels. While this method is being executed, the node is - * not listed as being connected to. - * @param nodeChannels the {@link NodeChannels} that have been connected - */ - protected void onAfterChannelsConnected(NodeChannels nodeChannels) { - } - private class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; @@ -417,6 +408,7 @@ public class Netty4Transport extends TcpTransport { @Override public void operationComplete(final ChannelFuture future) throws Exception { + onChannelClosed(future.channel()); NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels != null && nodeChannels.hasChannel(future.channel())) { threadPool.generic().execute(() -> disconnectFromNode(node, future.channel(), "channel closed event")); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 529013cbaca..1d19e538807 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -46,9 +46,11 @@ import org.junit.After; import org.junit.Before; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -1847,4 +1849,39 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertEquals("[][" + dummy.getAddress() +"] handshake_timeout[1ms]", ex.getMessage()); } } + + public void testTcpHandshakeConnectionReset() throws IOException, InterruptedException { + try (ServerSocket socket = new ServerSocket()) { + socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1); + socket.setReuseAddress(true); + DiscoveryNode dummy = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(), + socket.getLocalPort()), emptyMap(), + emptySet(), version0); + Thread t = new Thread() { + @Override + public void run() { + try { + Socket accept = socket.accept(); + accept.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }; + t.start(); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + builder.setHandshakeTimeout(TimeValue.timeValueHours(1)); + ConnectTransportException ex = expectThrows(ConnectTransportException.class, + () -> serviceA.connectToNode(dummy, builder.build())); + assertEquals("[][" + dummy.getAddress() +"] general node connection failure", ex.getMessage()); + assertEquals("handshake failed", ex.getCause().getMessage()); + t.join(); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 056daf417bf..acda061a704 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -336,6 +336,7 @@ public class MockTcpTransport extends TcpTransport if (isOpen.compareAndSet(true, false)) { //establish a happens-before edge between closing and accepting a new connection synchronized (this) { + onChannelClosed(this); IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels.keySet()), () -> cancellableThreads.cancel("channel closed"), onClose); }