From 0166388d7427643d565dc0e0af9a8cc58c804b24 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 2 Nov 2018 16:31:19 -0600 Subject: [PATCH] Use single netty event loop group for transports (#35181) Currently we create a new netty event loop group for client connections and all server profiles. Each new group creates new threads for io processing. This means 2 * num of processors new threads for each group. A single group should be able to handle all io processing (for the transports). This also brings the netty module inline with what we do for nio. Additionally, this PR renames the worker threads to be the same for netty and nio. --- .../transport/netty4/Netty4Transport.java | 47 +++++++------------ .../transport/nio/NioTransport.java | 5 +- .../elasticsearch/transport/TcpTransport.java | 3 +- .../elasticsearch/transport/Transports.java | 8 +--- .../transport/nio/MockNioTransport.java | 5 +- 5 files changed, 21 insertions(+), 47 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 009a75b3e33..a4e5731cd62 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -37,12 +37,10 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; @@ -59,8 +57,6 @@ import org.elasticsearch.transport.TcpTransport; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -101,8 +97,9 @@ public class Netty4Transport extends TcpTransport { private final int workerCount; private final ByteSizeValue receivePredictorMin; private final ByteSizeValue receivePredictorMax; - private volatile Bootstrap clientBootstrap; private final Map serverBootstraps = newConcurrentMap(); + private volatile Bootstrap clientBootstrap; + private volatile NioEventLoopGroup eventLoopGroup; public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { @@ -125,10 +122,12 @@ public class Netty4Transport extends TcpTransport { protected void doStart() { boolean success = false; try { - clientBootstrap = createClientBootstrap(); + ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); + eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory); + clientBootstrap = createClientBootstrap(eventLoopGroup); if (NetworkService.NETWORK_SERVER.get(settings)) { for (ProfileSettings profileSettings : profileSettings) { - createServerBootstrap(profileSettings); + createServerBootstrap(profileSettings, eventLoopGroup); bindServer(profileSettings); } } @@ -141,9 +140,9 @@ public class Netty4Transport extends TcpTransport { } } - private Bootstrap createClientBootstrap() { + private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { final Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX))); + bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); @@ -167,7 +166,7 @@ public class Netty4Transport extends TcpTransport { return bootstrap; } - private void createServerBootstrap(ProfileSettings profileSettings) { + private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) { String name = profileSettings.profileName; if (logger.isDebugEnabled()) { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " @@ -176,12 +175,9 @@ public class Netty4Transport extends TcpTransport { receivePredictorMin, receivePredictorMax); } - - final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name); - final ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory)); + serverBootstrap.group(eventLoopGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(getServerChannelInitializer(name)); @@ -274,25 +270,14 @@ public class Netty4Transport extends TcpTransport { @SuppressForbidden(reason = "debug") protected void stopInternal() { Releasables.close(() -> { - final List>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size()); - for (final Map.Entry entry : serverBootstraps.entrySet()) { - serverBootstrapCloseFutures.add( - Tuple.tuple(entry.getKey(), entry.getValue().config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS))); + Future shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + shutdownFuture.awaitUninterruptibly(); + if (shutdownFuture.isSuccess() == false) { + logger.warn("Error closing netty event loop group", shutdownFuture.cause()); } - for (final Tuple> future : serverBootstrapCloseFutures) { - future.v2().awaitUninterruptibly(); - if (!future.v2().isSuccess()) { - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "Error closing server bootstrap for profile [{}]", future.v1()), future.v2().cause()); - } - } - serverBootstraps.clear(); - if (clientBootstrap != null) { - clientBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); - clientBootstrap = null; - } + serverBootstraps.clear(); + clientBootstrap = null; }); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 129f0ada77d..15f7d1e2894 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -41,7 +41,6 @@ import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.Transports; import java.io.IOException; import java.net.InetSocketAddress; @@ -57,8 +56,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class NioTransport extends TcpTransport { - private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; - public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), @@ -94,7 +91,7 @@ public class NioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 506d1a58649..46067930df1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -118,8 +118,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport { - public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker"; - public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss"; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; public static final Setting> HOST = listSetting("transport.host", emptyList(), Function.identity(), Setting.Property.NodeScope); diff --git a/server/src/main/java/org/elasticsearch/transport/Transports.java b/server/src/main/java/org/elasticsearch/transport/Transports.java index b4579374d9e..c531d33224a 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transports.java +++ b/server/src/main/java/org/elasticsearch/transport/Transports.java @@ -29,8 +29,6 @@ public enum Transports { /** threads whose name is prefixed by this string will be considered network threads, even though they aren't */ public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; - public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker"; - /** * Utility method to detect whether a thread is a network thread. Typically * used in assertions to make sure that we do not call blocking code from @@ -41,10 +39,8 @@ public enum Transports { for (String s : Arrays.asList( HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, HttpServerTransport.HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX, - TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, - TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, - TEST_MOCK_TRANSPORT_THREAD_PREFIX, - NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX)) { + TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX, + TEST_MOCK_TRANSPORT_THREAD_PREFIX)) { if (threadName.contains(s)) { return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index b36685d5645..dc08fbf257d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -46,7 +46,6 @@ import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpServerChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.Transports; import java.io.IOException; import java.net.InetSocketAddress; @@ -65,8 +64,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class MockNioTransport extends TcpTransport { - private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; - private final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private volatile NioGroup nioGroup; @@ -97,7 +94,7 @@ public class MockNioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, (s) -> new TestingSocketEventHandler(this::onNonChannelException, s)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");