Use single netty event loop group for transports (#35181)

Currently we create a new netty event loop group for client connections
and all server profiles. Each new group creates new threads for io
processing. This means 2 * num of processors new threads for each group.
A single group should be able to handle all io processing (for the
transports). This also brings the netty module inline with what we do
for nio.

Additionally, this PR renames the worker threads to be the same for
netty and nio.
This commit is contained in:
Tim Brooks 2018-11-02 16:31:19 -06:00 committed by GitHub
parent 3c36ba1f5e
commit 0166388d74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 21 additions and 47 deletions

View File

@ -37,12 +37,10 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
@ -59,8 +57,6 @@ import org.elasticsearch.transport.TcpTransport;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -101,8 +97,9 @@ public class Netty4Transport extends TcpTransport {
private final int workerCount; private final int workerCount;
private final ByteSizeValue receivePredictorMin; private final ByteSizeValue receivePredictorMin;
private final ByteSizeValue receivePredictorMax; private final ByteSizeValue receivePredictorMax;
private volatile Bootstrap clientBootstrap;
private final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap(); private final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
private volatile Bootstrap clientBootstrap;
private volatile NioEventLoopGroup eventLoopGroup;
public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
@ -125,10 +122,12 @@ public class Netty4Transport extends TcpTransport {
protected void doStart() { protected void doStart() {
boolean success = false; boolean success = false;
try { try {
clientBootstrap = createClientBootstrap(); ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory);
clientBootstrap = createClientBootstrap(eventLoopGroup);
if (NetworkService.NETWORK_SERVER.get(settings)) { if (NetworkService.NETWORK_SERVER.get(settings)) {
for (ProfileSettings profileSettings : profileSettings) { for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings); createServerBootstrap(profileSettings, eventLoopGroup);
bindServer(profileSettings); bindServer(profileSettings);
} }
} }
@ -141,9 +140,9 @@ public class Netty4Transport extends TcpTransport {
} }
} }
private Bootstrap createClientBootstrap() { private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) {
final Bootstrap bootstrap = new Bootstrap(); final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX))); bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class); bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
@ -167,7 +166,7 @@ public class Netty4Transport extends TcpTransport {
return bootstrap; return bootstrap;
} }
private void createServerBootstrap(ProfileSettings profileSettings) { private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) {
String name = profileSettings.profileName; String name = profileSettings.profileName;
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
@ -176,12 +175,9 @@ public class Netty4Transport extends TcpTransport {
receivePredictorMin, receivePredictorMax); receivePredictorMin, receivePredictorMax);
} }
final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name);
final ServerBootstrap serverBootstrap = new ServerBootstrap(); final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory)); serverBootstrap.group(eventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(getServerChannelInitializer(name)); serverBootstrap.childHandler(getServerChannelInitializer(name));
@ -274,25 +270,14 @@ public class Netty4Transport extends TcpTransport {
@SuppressForbidden(reason = "debug") @SuppressForbidden(reason = "debug")
protected void stopInternal() { protected void stopInternal() {
Releasables.close(() -> { Releasables.close(() -> {
final List<Tuple<String, Future<?>>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size()); Future<?> shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
for (final Map.Entry<String, ServerBootstrap> entry : serverBootstraps.entrySet()) { shutdownFuture.awaitUninterruptibly();
serverBootstrapCloseFutures.add( if (shutdownFuture.isSuccess() == false) {
Tuple.tuple(entry.getKey(), entry.getValue().config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS))); logger.warn("Error closing netty event loop group", shutdownFuture.cause());
} }
for (final Tuple<String, Future<?>> future : serverBootstrapCloseFutures) {
future.v2().awaitUninterruptibly();
if (!future.v2().isSuccess()) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"Error closing server bootstrap for profile [{}]", future.v1()), future.v2().cause());
}
}
serverBootstraps.clear();
if (clientBootstrap != null) { serverBootstraps.clear();
clientBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); clientBootstrap = null;
clientBootstrap = null;
}
}); });
} }

View File

@ -41,7 +41,6 @@ import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -57,8 +56,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class NioTransport extends TcpTransport { public class NioTransport extends TcpTransport {
private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;
public static final Setting<Integer> NIO_WORKER_COUNT = public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count", new Setting<>("transport.nio.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
@ -94,7 +91,7 @@ public class NioTransport extends TcpTransport {
protected void doStart() { protected void doStart() {
boolean success = false; boolean success = false;
try { try {
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX),
NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s)); NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s));
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");

View File

@ -118,8 +118,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport { public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport {
public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker"; public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker";
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
public static final Setting<List<String>> HOST = public static final Setting<List<String>> HOST =
listSetting("transport.host", emptyList(), Function.identity(), Setting.Property.NodeScope); listSetting("transport.host", emptyList(), Function.identity(), Setting.Property.NodeScope);

View File

@ -29,8 +29,6 @@ public enum Transports {
/** threads whose name is prefixed by this string will be considered network threads, even though they aren't */ /** threads whose name is prefixed by this string will be considered network threads, even though they aren't */
public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread";
public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker";
/** /**
* Utility method to detect whether a thread is a network thread. Typically * Utility method to detect whether a thread is a network thread. Typically
* used in assertions to make sure that we do not call blocking code from * used in assertions to make sure that we do not call blocking code from
@ -41,10 +39,8 @@ public enum Transports {
for (String s : Arrays.asList( for (String s : Arrays.asList(
HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
HttpServerTransport.HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX, HttpServerTransport.HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
TEST_MOCK_TRANSPORT_THREAD_PREFIX,
NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX)) {
if (threadName.contains(s)) { if (threadName.contains(s)) {
return true; return true;
} }

View File

@ -46,7 +46,6 @@ import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpServerChannel; import org.elasticsearch.transport.TcpServerChannel;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.Transports;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -65,8 +64,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class MockNioTransport extends TcpTransport { public class MockNioTransport extends TcpTransport {
private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;
private final PageCacheRecycler pageCacheRecycler; private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap(); private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private volatile NioGroup nioGroup; private volatile NioGroup nioGroup;
@ -97,7 +94,7 @@ public class MockNioTransport extends TcpTransport {
protected void doStart() { protected void doStart() {
boolean success = false; boolean success = false;
try { try {
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestingSocketEventHandler(this::onNonChannelException, s)); (s) -> new TestingSocketEventHandler(this::onNonChannelException, s));
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");