From cac2eec7d298f5e3fcafde73bb975028bfd36741 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Jun 2017 15:16:07 -0500 Subject: [PATCH] Add NioTransport threads to thread name checks (#25477) We have various assertions that check we never block on transport threads. This commit adds the thread names for the NioTransport to these assertions. With this change I had to fix two places where we were calling blocking methods from the transport threads. --- .../org/elasticsearch/transport/Transports.java | 7 ++++++- .../elasticsearch/transport/nio/NioTransport.java | 15 +++++++++++---- .../transport/nio/channel/AbstractNioChannel.java | 5 ----- .../transport/nio/channel/ConnectFuture.java | 4 +++- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/Transports.java b/core/src/main/java/org/elasticsearch/transport/Transports.java index c7a0fe4d4f5..d07846835c2 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transports.java +++ b/core/src/main/java/org/elasticsearch/transport/Transports.java @@ -29,6 +29,9 @@ public enum Transports { /** threads whose name is prefixed by this string will be considered network threads, even though they aren't */ public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; + public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker"; + public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor"; + /** * Utility method to detect whether a thread is a network thread. Typically * used in assertions to make sure that we do not call blocking code from @@ -40,7 +43,9 @@ public enum Transports { HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, - TEST_MOCK_TRANSPORT_THREAD_PREFIX)) { + TEST_MOCK_TRANSPORT_THREAD_PREFIX, + NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX, + NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) { if (threadName.contains(s)) { return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 05c818476a1..8b0d435a08e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -36,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.nio.channel.ChannelFactory; import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; @@ -57,9 +58,8 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class NioTransport extends TcpTransport { - // TODO: Need to add to places where we check if transport thread - public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; - public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor"; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; + public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", @@ -108,7 +108,14 @@ public class NioTransport extends TcpTransport { for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) { try { - channel.closeAsync().awaitClose(); + // If we are currently on the selector thread that handles this channel, we should prefer + // the closeFromSelector method. This method always closes the channel immediately. + ESSelector selector = channel.getSelector(); + if (selector != null && selector.isOnCurrentThread()) { + channel.closeFromSelector(); + } else { + channel.closeAsync().awaitClose(); + } } catch (Exception e) { if (closingExceptions == null) { closingExceptions = new IOException("failed to close channels"); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index be8dbe3f468..9792f9e64cc 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -102,11 +102,6 @@ public abstract class AbstractNioChannel { if (isDone()) { try { // Get should always return without blocking as we already checked 'isDone' - return super.get(); + return super.get(0, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { return null; + } catch (TimeoutException e) { + throw new AssertionError("This should never happen as we only call get() after isDone() is true."); } } else { return null;