Prevent open channel leaks if handshake times out or is interrupted (#22554)

The low level TCP handshake can cause channel / connection leaks if it's interrupted
since the caller doesn't close the channel / connection if the handshake was not successful.
This commit fixes the channel leak and adds general test infrastructure to detect channel leaks
in the future.
This commit is contained in:
Simon Willnauer 2017-01-11 17:02:36 +01:00 committed by GitHub
parent abb7d7841f
commit 6810125a8b
2 changed files with 34 additions and 3 deletions

View File

@ -477,8 +477,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
boolean success = false;
NodeChannels nodeChannels = null;
try {
NodeChannels nodeChannels = connectToChannels(node, connectionProfile);
nodeChannels = connectToChannels(node, connectionProfile);
final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile
final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
defaultConnectionProfile.getConnectTimeout() :
@ -487,13 +489,19 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
connectTimeout : connectionProfile.getHandshakeTimeout();
final Version version = executeHandshake(node, channel, handshakeTimeout);
transportServiceAdapter.onConnectionOpened(node);
return new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
nodeChannels = new NodeChannels(nodeChannels, version);// clone the channels - we now have the correct version
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
// ConnectTransportExceptions are handled specifically on the caller end - we wrap the actual exception to ensure
// only relevant exceptions are logged on the caller end.. this is the same as in connectToNode
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(nodeChannels);
}
}
}
@ -832,7 +840,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
@Override
protected final void doClose() {
protected void doClose() {
}
@Override

View File

@ -49,6 +49,8 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -74,6 +76,8 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
*/
public static final ConnectionProfile LIGHT_PROFILE;
private final Map<MockChannel, Boolean> openChannels = new IdentityHashMap<>();
static {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
@ -284,6 +288,9 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
this.serverSocket = null;
this.profile = profile;
this.onClose = () -> onClose.accept(this);
synchronized (openChannels) {
openChannels.put(this, Boolean.TRUE);
}
}
/**
@ -353,12 +360,17 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
@Override
public void close() throws IOException {
if (isOpen.compareAndSet(true, false)) {
final Boolean removedChannel;
synchronized (openChannels) {
removedChannel = openChannels.remove(this);
}
//establish a happens-before edge between closing and accepting a new connection
synchronized (this) {
onChannelClosed(this);
IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels.keySet()),
() -> cancellableThreads.cancel("channel closed"), onClose);
}
assert removedChannel : "Channel was not removed or removed twice?";
}
}
}
@ -395,5 +407,16 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
return mockVersion;
}
@Override
protected void doClose() {
if (Thread.currentThread().isInterrupted() == false) {
// TCPTransport might be interrupted due to a timeout waiting for connections to be closed.
// in this case the thread is interrupted and we can't tell if we really missed something or if we are
// still closing connections. in such a case we don't assert the open channels
synchronized (openChannels) {
assert openChannels.isEmpty() : "there are still open channels: " + openChannels;
}
}
}
}