From 6810125a8b332c1d3ed83d15f30b1125a0ba6162 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 11 Jan 2017 17:02:36 +0100 Subject: [PATCH] Prevent open channel leaks if handshake times out or is interrupted (#22554) The low level TCP handshake can cause channel / connection leaks if it's interrupted since the caller doesn't close the channel / connection if the handshake was not successful. This commit fixes the channel leak and adds general test infrastructure to detect channel leaks in the future. --- .../elasticsearch/transport/TcpTransport.java | 14 ++++++++--- .../transport/MockTcpTransport.java | 23 +++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index c2f0832b75e..f2b29706caf 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -477,8 +477,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i @Override public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException { + boolean success = false; + NodeChannels nodeChannels = null; try { - NodeChannels nodeChannels = connectToChannels(node, connectionProfile); + nodeChannels = connectToChannels(node, connectionProfile); final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ? defaultConnectionProfile.getConnectTimeout() : @@ -487,13 +489,19 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i connectTimeout : connectionProfile.getHandshakeTimeout(); final Version version = executeHandshake(node, channel, handshakeTimeout); transportServiceAdapter.onConnectionOpened(node); - return new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version + nodeChannels = new NodeChannels(nodeChannels, version);// clone the channels - we now have the correct version + success = true; + return nodeChannels; } catch (ConnectTransportException e) { throw e; } catch (Exception e) { // ConnectTransportExceptions are handled specifically on the caller end - we wrap the actual exception to ensure // only relevant exceptions are logged on the caller end.. this is the same as in connectToNode throw new ConnectTransportException(node, "general node connection failure", e); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(nodeChannels); + } } } @@ -832,7 +840,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } @Override - protected final void doClose() { + protected void doClose() { } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 3b5b430f606..a08660bb388 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -49,6 +49,8 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; +import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -74,6 +76,8 @@ public class MockTcpTransport extends TcpTransport */ public static final ConnectionProfile LIGHT_PROFILE; + private final Map openChannels = new IdentityHashMap<>(); + static { ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); builder.addConnections(1, @@ -284,6 +288,9 @@ public class MockTcpTransport extends TcpTransport this.serverSocket = null; this.profile = profile; this.onClose = () -> onClose.accept(this); + synchronized (openChannels) { + openChannels.put(this, Boolean.TRUE); + } } /** @@ -353,12 +360,17 @@ public class MockTcpTransport extends TcpTransport @Override public void close() throws IOException { if (isOpen.compareAndSet(true, false)) { + final Boolean removedChannel; + synchronized (openChannels) { + removedChannel = openChannels.remove(this); + } //establish a happens-before edge between closing and accepting a new connection synchronized (this) { onChannelClosed(this); IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels.keySet()), () -> cancellableThreads.cancel("channel closed"), onClose); } + assert removedChannel : "Channel was not removed or removed twice?"; } } } @@ -395,5 +407,16 @@ public class MockTcpTransport extends TcpTransport return mockVersion; } + @Override + protected void doClose() { + if (Thread.currentThread().isInterrupted() == false) { + // TCPTransport might be interrupted due to a timeout waiting for connections to be closed. + // in this case the thread is interrupted and we can't tell if we really missed something or if we are + // still closing connections. in such a case we don't assert the open channels + synchronized (openChannels) { + assert openChannels.isEmpty() : "there are still open channels: " + openChannels; + } + } + } }