Fix potential race during TcpTransport close (#39031)
Fixed two potential causes for leaked threads during tests: 1. When adding a channel to serverChannels, we add it under a monitor that we do not use when reading from it. This is potentially unsafe if there is no other happens-before relationship ensuring the safety of this. 2. Long-shot but if the thread pool was shutdown before entering this code, we would silently forget about closing server channels so added assert. Strengthened the locking to ensure that once we stop the transport, no new server channels can be made. Relates to CI failure issue: #37543
This commit is contained in:
parent
ae9243ad0a
commit
99b2bc3461
|
@ -388,18 +388,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
||||||
PortsRange portsRange = new PortsRange(port);
|
PortsRange portsRange = new PortsRange(port);
|
||||||
final AtomicReference<Exception> lastException = new AtomicReference<>();
|
final AtomicReference<Exception> lastException = new AtomicReference<>();
|
||||||
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
|
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
|
||||||
|
closeLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
if (lifecycle.initialized() == false && lifecycle.started() == false) {
|
||||||
|
throw new IllegalStateException("transport has been stopped");
|
||||||
|
}
|
||||||
boolean success = portsRange.iterate(portNumber -> {
|
boolean success = portsRange.iterate(portNumber -> {
|
||||||
try {
|
try {
|
||||||
TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
|
TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
|
||||||
synchronized (serverChannels) {
|
serverChannels.computeIfAbsent(name, k -> new ArrayList<>()).add(channel);
|
||||||
List<TcpServerChannel> list = serverChannels.get(name);
|
|
||||||
if (list == null) {
|
|
||||||
list = new ArrayList<>();
|
|
||||||
serverChannels.put(name, list);
|
|
||||||
}
|
|
||||||
list.add(channel);
|
|
||||||
boundSocket.set(channel.getLocalAddress());
|
boundSocket.set(channel.getLocalAddress());
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
lastException.set(e);
|
lastException.set(e);
|
||||||
return false;
|
return false;
|
||||||
|
@ -409,7 +407,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
||||||
if (!success) {
|
if (!success) {
|
||||||
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
|
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
closeLock.writeLock().unlock();
|
||||||
|
}
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
|
logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
|
||||||
}
|
}
|
||||||
|
@ -553,6 +553,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
||||||
protected final void doStop() {
|
protected final void doStop() {
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
// make sure we run it on another thread than a possible IO handler thread
|
// make sure we run it on another thread than a possible IO handler thread
|
||||||
|
assert threadPool.generic().isShutdown() == false : "Must stop transport before terminating underlying threadpool";
|
||||||
threadPool.generic().execute(() -> {
|
threadPool.generic().execute(() -> {
|
||||||
closeLock.writeLock().lock();
|
closeLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue