From 99b2bc3461eac54dc7ecdc0706bd43fabcc5f904 Mon Sep 17 00:00:00 2001 From: Henning Andersen <33268011+henningandersen@users.noreply.github.com> Date: Mon, 18 Feb 2019 18:10:00 +0100 Subject: [PATCH] Fix potential race during TcpTransport close (#39031) Fixed two potential causes for leaked threads during tests: 1. When adding a channel to serverChannels, we add it under a monitor that we do not use when reading from it. This is potentially unsafe if there is no other happens-before relationship ensuring the safety of this. 2. Long-shot but if the thread pool was shutdown before entering this code, we would silently forget about closing server channels so added assert. Strengthened the locking to ensure that once we stop the transport, no new server channels can be made. Relates to CI failure issue: #37543 --- .../elasticsearch/transport/TcpTransport.java | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 2ff5ae1583e..d5a524105dd 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -388,28 +388,28 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements PortsRange portsRange = new PortsRange(port); final AtomicReference lastException = new AtomicReference<>(); final AtomicReference boundSocket = new AtomicReference<>(); - boolean success = portsRange.iterate(portNumber -> { - try { - TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); - synchronized (serverChannels) { - List list = serverChannels.get(name); - if (list == null) { - list = new ArrayList<>(); - serverChannels.put(name, list); - } - list.add(channel); - boundSocket.set(channel.getLocalAddress()); - } - } catch (Exception e) { - lastException.set(e); - return false; + closeLock.writeLock().lock(); + try { + if (lifecycle.initialized() == false && lifecycle.started() == false) { + throw new IllegalStateException("transport has been stopped"); } - return true; - }); - if (!success) { - throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); + boolean success = portsRange.iterate(portNumber -> { + try { + TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); + serverChannels.computeIfAbsent(name, k -> new ArrayList<>()).add(channel); + boundSocket.set(channel.getLocalAddress()); + } catch (Exception e) { + lastException.set(e); + return false; + } + return true; + }); + if (!success) { + throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); + } + } finally { + closeLock.writeLock().unlock(); } - if (logger.isDebugEnabled()) { logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get())); } @@ -553,6 +553,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final void doStop() { final CountDownLatch latch = new CountDownLatch(1); // make sure we run it on another thread than a possible IO handler thread + assert threadPool.generic().isShutdown() == false : "Must stop transport before terminating underlying threadpool"; threadPool.generic().execute(() -> { closeLock.writeLock().lock(); try {