diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 009a75b3e33..a4e5731cd62 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -37,12 +37,10 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; @@ -59,8 +57,6 @@ import org.elasticsearch.transport.TcpTransport; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -101,8 +97,9 @@ public class Netty4Transport extends TcpTransport { private final int workerCount; private final ByteSizeValue receivePredictorMin; private final ByteSizeValue receivePredictorMax; - private volatile Bootstrap clientBootstrap; private final Map serverBootstraps = newConcurrentMap(); + private volatile Bootstrap clientBootstrap; + private volatile NioEventLoopGroup eventLoopGroup; public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { @@ -125,10 +122,12 @@ public class Netty4Transport extends TcpTransport { protected void doStart() { boolean success = false; try { - clientBootstrap = createClientBootstrap(); + ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); + eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory); + clientBootstrap = createClientBootstrap(eventLoopGroup); if (NetworkService.NETWORK_SERVER.get(settings)) { for (ProfileSettings profileSettings : profileSettings) { - createServerBootstrap(profileSettings); + createServerBootstrap(profileSettings, eventLoopGroup); bindServer(profileSettings); } } @@ -141,9 +140,9 @@ public class Netty4Transport extends TcpTransport { } } - private Bootstrap createClientBootstrap() { + private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { final Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX))); + bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); @@ -167,7 +166,7 @@ public class Netty4Transport extends TcpTransport { return bootstrap; } - private void createServerBootstrap(ProfileSettings profileSettings) { + private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) { String name = profileSettings.profileName; if (logger.isDebugEnabled()) { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " @@ -176,12 +175,9 @@ public class Netty4Transport extends TcpTransport { receivePredictorMin, receivePredictorMax); } - - final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name); - final ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory)); + serverBootstrap.group(eventLoopGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(getServerChannelInitializer(name)); @@ -274,25 +270,14 @@ public class Netty4Transport extends TcpTransport { @SuppressForbidden(reason = "debug") protected void stopInternal() { Releasables.close(() -> { - final List>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size()); - for (final Map.Entry entry : serverBootstraps.entrySet()) { - serverBootstrapCloseFutures.add( - Tuple.tuple(entry.getKey(), entry.getValue().config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS))); + Future shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + shutdownFuture.awaitUninterruptibly(); + if (shutdownFuture.isSuccess() == false) { + logger.warn("Error closing netty event loop group", shutdownFuture.cause()); } - for (final Tuple> future : serverBootstrapCloseFutures) { - future.v2().awaitUninterruptibly(); - if (!future.v2().isSuccess()) { - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "Error closing server bootstrap for profile [{}]", future.v1()), future.v2().cause()); - } - } - serverBootstraps.clear(); - if (clientBootstrap != null) { - clientBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); - clientBootstrap = null; - } + serverBootstraps.clear(); + clientBootstrap = null; }); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 129f0ada77d..15f7d1e2894 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -41,7 +41,6 @@ import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.Transports; import java.io.IOException; import java.net.InetSocketAddress; @@ -57,8 +56,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class NioTransport extends TcpTransport { - private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; - public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), @@ -94,7 +91,7 @@ public class NioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 506d1a58649..46067930df1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -118,8 +118,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport { - public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker"; - public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss"; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; public static final Setting> HOST = listSetting("transport.host", emptyList(), Function.identity(), Setting.Property.NodeScope); diff --git a/server/src/main/java/org/elasticsearch/transport/Transports.java b/server/src/main/java/org/elasticsearch/transport/Transports.java index b4579374d9e..c531d33224a 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transports.java +++ b/server/src/main/java/org/elasticsearch/transport/Transports.java @@ -29,8 +29,6 @@ public enum Transports { /** threads whose name is prefixed by this string will be considered network threads, even though they aren't */ public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; - public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker"; - /** * Utility method to detect whether a thread is a network thread. Typically * used in assertions to make sure that we do not call blocking code from @@ -41,10 +39,8 @@ public enum Transports { for (String s : Arrays.asList( HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, HttpServerTransport.HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX, - TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, - TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, - TEST_MOCK_TRANSPORT_THREAD_PREFIX, - NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX)) { + TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX, + TEST_MOCK_TRANSPORT_THREAD_PREFIX)) { if (threadName.contains(s)) { return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index b36685d5645..dc08fbf257d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -46,7 +46,6 @@ import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpServerChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.Transports; import java.io.IOException; import java.net.InetSocketAddress; @@ -65,8 +64,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class MockNioTransport extends TcpTransport { - private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; - private final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private volatile NioGroup nioGroup; @@ -97,7 +94,7 @@ public class MockNioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, (s) -> new TestingSocketEventHandler(this::onNonChannelException, s)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");