Share netty event loops between transports (#56553)

Currently Elasticsearch creates independent event loop groups for each
transport (http and internal) transport type. This is unnecessary and
can lead to contention when different threads access shared resources
(ex: allocators). This commit moves to a model where, by default, the
event loops are shared between the transports. The previous behavior can
be attained by specifically setting the http worker count.
This commit is contained in:
Tim Brooks 2020-05-11 15:43:43 -06:00 committed by GitHub
parent e0e7b89499
commit 760ab726c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 342 additions and 76 deletions

View File

@ -29,7 +29,6 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentCompressor;
@ -62,6 +61,7 @@ import org.elasticsearch.http.HttpReadTimeoutException;
import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.transport.netty4.Netty4Utils;
@ -69,7 +69,6 @@ import java.net.InetSocketAddress;
import java.net.SocketOption; import java.net.SocketOption;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
@ -126,9 +125,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
// Netty's CompositeByteBuf implementation does not allow less than two components. // Netty's CompositeByteBuf implementation does not allow less than two components.
}, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope); }, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope);
public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count", public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = Setting.intSetting("http.netty.worker_count", 0, Property.NodeScope);
(s) -> Integer.toString(EsExecutors.allocatedProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE = public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE =
Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope); Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope);
@ -137,21 +134,23 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final ByteSizeValue maxHeaderSize; private final ByteSizeValue maxHeaderSize;
private final ByteSizeValue maxChunkSize; private final ByteSizeValue maxChunkSize;
private final int workerCount;
private final int pipeliningMaxEvents; private final int pipeliningMaxEvents;
private final SharedGroupFactory sharedGroupFactory;
private final RecvByteBufAllocator recvByteBufAllocator; private final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis; private final int readTimeoutMillis;
private final int maxCompositeBufferComponents; private final int maxCompositeBufferComponents;
private volatile ServerBootstrap serverBootstrap; private volatile ServerBootstrap serverBootstrap;
private volatile SharedGroupFactory.SharedGroup sharedGroup;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings) { NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings); super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings)); Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
this.sharedGroupFactory = sharedGroupFactory;
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
@ -159,7 +158,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings); this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());
@ -180,10 +178,10 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
protected void doStart() { protected void doStart() {
boolean success = false; boolean success = false;
try { try {
sharedGroup = sharedGroupFactory.getHttpGroup();
serverBootstrap = new ServerBootstrap(); serverBootstrap = new ServerBootstrap();
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, serverBootstrap.group(sharedGroup.getLowLevelGroup());
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
// NettyAllocator will return the channel type designed to work with the configuredAllocator // NettyAllocator will return the channel type designed to work with the configuredAllocator
serverBootstrap.channel(NettyAllocator.getServerChannelType()); serverBootstrap.channel(NettyAllocator.getServerChannelType());
@ -260,9 +258,9 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
@Override @Override
protected void stopInternal() { protected void stopInternal() {
if (serverBootstrap != null) { if (sharedGroup != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); sharedGroup.shutdown();
serverBootstrap = null; sharedGroup = null;
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
@ -48,6 +49,8 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
public static final String NETTY_TRANSPORT_NAME = "netty4"; public static final String NETTY_TRANSPORT_NAME = "netty4";
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4"; public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";
private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();
@Override @Override
public List<Setting<?>> getSettings() { public List<Setting<?>> getSettings() {
return Arrays.asList( return Arrays.asList(
@ -77,7 +80,7 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
CircuitBreakerService circuitBreakerService, CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool, return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService)); networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
} }
@Override @Override
@ -90,6 +93,17 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
ClusterSettings clusterSettings) { ClusterSettings clusterSettings) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME, return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
clusterSettings)); clusterSettings, getSharedGroupFactory(settings)));
}
private SharedGroupFactory getSharedGroupFactory(Settings settings) {
SharedGroupFactory groupFactory = this.groupFactory.get();
if (groupFactory != null) {
assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided";
return groupFactory;
} else {
this.groupFactory.set(new SharedGroupFactory(settings));
return this.groupFactory.get();
}
} }
} }

View File

@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.transport.netty4.Netty4Transport;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
/**
* Creates and returns {@link io.netty.channel.EventLoopGroup} instances. It will return a shared group for
* both {@link #getHttpGroup()} and {@link #getTransportGroup()} if
* {@link org.elasticsearch.http.netty4.Netty4HttpServerTransport#SETTING_HTTP_WORKER_COUNT} is configured to be 0.
* If that setting is not 0, then it will return a different group in the {@link #getHttpGroup()} call.
*/
public final class SharedGroupFactory {
private static final Logger logger = LogManager.getLogger(SharedGroupFactory.class);
private final Settings settings;
private final int workerCount;
private final int httpWorkerCount;
private RefCountedGroup genericGroup;
private SharedGroup dedicatedHttpGroup;
public SharedGroupFactory(Settings settings) {
this.settings = settings;
this.workerCount = Netty4Transport.WORKER_COUNT.get(settings);
this.httpWorkerCount = Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings);
}
public Settings getSettings() {
return settings;
}
public int getTransportWorkerCount() {
return workerCount;
}
public synchronized SharedGroup getTransportGroup() {
return getGenericGroup();
}
public synchronized SharedGroup getHttpGroup() {
if (httpWorkerCount == 0) {
return getGenericGroup();
} else {
if (dedicatedHttpGroup == null) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(httpWorkerCount,
daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX));
dedicatedHttpGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup));
}
return dedicatedHttpGroup;
}
}
private SharedGroup getGenericGroup() {
if (genericGroup == null) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(workerCount,
daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX));
this.genericGroup = new RefCountedGroup(eventLoopGroup);
} else {
genericGroup.incRef();
}
return new SharedGroup(genericGroup);
}
private static class RefCountedGroup extends AbstractRefCounted {
public static final String NAME = "ref-counted-event-loop-group";
private final EventLoopGroup eventLoopGroup;
private RefCountedGroup(EventLoopGroup eventLoopGroup) {
super(NAME);
this.eventLoopGroup = eventLoopGroup;
}
@Override
protected void closeInternal() {
Future<?> shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
shutdownFuture.awaitUninterruptibly();
if (shutdownFuture.isSuccess() == false) {
logger.warn("Error closing netty event loop group", shutdownFuture.cause());
}
}
}
/**
* Wraps the {@link RefCountedGroup}. Calls {@link RefCountedGroup#decRef()} on close. After close,
* this wrapped instance can no longer be used.
*/
public static class SharedGroup {
private final RefCountedGroup refCountedGroup;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private SharedGroup(RefCountedGroup refCountedGroup) {
this.refCountedGroup = refCountedGroup;
}
public EventLoopGroup getLowLevelGroup() {
return refCountedGroup.eventLoopGroup;
}
public void shutdown() {
if (isOpen.compareAndSet(true, false)) {
refCountedGroup.decRef();
}
}
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.netty4; package org.elasticsearch.transport.netty4;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
@ -31,10 +30,8 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
@ -56,6 +53,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.net.NetUtils; import org.elasticsearch.core.internal.net.NetUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.TransportSettings;
@ -64,13 +62,10 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketOption; import java.net.SocketOption;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.Setting.byteSizeSetting; import static org.elasticsearch.common.settings.Setting.byteSizeSetting;
import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.settings.Setting.intSetting;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
/** /**
* There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or * There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or
@ -96,20 +91,20 @@ public class Netty4Transport extends TcpTransport {
intSetting("transport.netty.boss_count", 1, 1, Property.NodeScope); intSetting("transport.netty.boss_count", 1, 1, Property.NodeScope);
private final SharedGroupFactory sharedGroupFactory;
private final RecvByteBufAllocator recvByteBufAllocator; private final RecvByteBufAllocator recvByteBufAllocator;
private final int workerCount;
private final ByteSizeValue receivePredictorMin; private final ByteSizeValue receivePredictorMin;
private final ByteSizeValue receivePredictorMax; private final ByteSizeValue receivePredictorMax;
private final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap(); private final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
private volatile Bootstrap clientBootstrap; private volatile Bootstrap clientBootstrap;
private volatile NioEventLoopGroup eventLoopGroup; private volatile SharedGroupFactory.SharedGroup sharedGroup;
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) { CircuitBreakerService circuitBreakerService, SharedGroupFactory sharedGroupFactory) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings)); Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
this.workerCount = WORKER_COUNT.get(settings); this.sharedGroupFactory = sharedGroupFactory;
// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
this.receivePredictorMin = NETTY_RECEIVE_PREDICTOR_MIN.get(settings); this.receivePredictorMin = NETTY_RECEIVE_PREDICTOR_MIN.get(settings);
@ -126,12 +121,11 @@ public class Netty4Transport extends TcpTransport {
protected void doStart() { protected void doStart() {
boolean success = false; boolean success = false;
try { try {
ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); sharedGroup = sharedGroupFactory.getTransportGroup();
eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory); clientBootstrap = createClientBootstrap(sharedGroup);
clientBootstrap = createClientBootstrap(eventLoopGroup);
if (NetworkService.NETWORK_SERVER.get(settings)) { if (NetworkService.NETWORK_SERVER.get(settings)) {
for (ProfileSettings profileSettings : profileSettings) { for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings, eventLoopGroup); createServerBootstrap(profileSettings, sharedGroup);
bindServer(profileSettings); bindServer(profileSettings);
} }
} }
@ -144,9 +138,9 @@ public class Netty4Transport extends TcpTransport {
} }
} }
private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGroup) {
final Bootstrap bootstrap = new Bootstrap(); final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup); bootstrap.group(sharedGroup.getLowLevelGroup());
// NettyAllocator will return the channel type designed to work with the configured allocator // NettyAllocator will return the channel type designed to work with the configured allocator
bootstrap.channel(NettyAllocator.getChannelType()); bootstrap.channel(NettyAllocator.getChannelType());
@ -196,17 +190,17 @@ public class Netty4Transport extends TcpTransport {
return bootstrap; return bootstrap;
} }
private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) { private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupFactory.SharedGroup sharedGroup) {
String name = profileSettings.profileName; String name = profileSettings.profileName;
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]", logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]",
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, name, sharedGroupFactory.getTransportWorkerCount(), profileSettings.portOrRange, profileSettings.bindHosts,
receivePredictorMin, receivePredictorMax); profileSettings.publishHosts, receivePredictorMin, receivePredictorMax);
} }
final ServerBootstrap serverBootstrap = new ServerBootstrap(); final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup); serverBootstrap.group(sharedGroup.getLowLevelGroup());
// NettyAllocator will return the channel type designed to work with the configuredAllocator // NettyAllocator will return the channel type designed to work with the configuredAllocator
serverBootstrap.channel(NettyAllocator.getServerChannelType()); serverBootstrap.channel(NettyAllocator.getServerChannelType());
@ -307,12 +301,8 @@ public class Netty4Transport extends TcpTransport {
@SuppressForbidden(reason = "debug") @SuppressForbidden(reason = "debug")
protected void stopInternal() { protected void stopInternal() {
Releasables.close(() -> { Releasables.close(() -> {
if (eventLoopGroup != null) { if (sharedGroup != null) {
Future<?> shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); sharedGroup.shutdown();
shutdownFuture.awaitUninterruptibly();
if (shutdownFuture.isSuccess() == false) {
logger.warn("Error closing netty event loop group", shutdownFuture.cause());
}
} }
}, serverBootstraps::clear, () -> clientBootstrap = null); }, serverBootstraps::clear, () -> clientBootstrap = null);
} }

View File

@ -38,6 +38,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -87,7 +88,8 @@ public class Netty4BadRequestTests extends ESTestCase {
}; };
try (HttpServerTransport httpServerTransport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, try (HttpServerTransport httpServerTransport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool,
xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(Settings.EMPTY))) {
httpServerTransport.start(); httpServerTransport.start();
final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());

View File

@ -45,6 +45,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -119,7 +120,8 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
Netty4HttpServerPipeliningTests.this.networkService, Netty4HttpServerPipeliningTests.this.networkService,
Netty4HttpServerPipeliningTests.this.bigArrays, Netty4HttpServerPipeliningTests.this.bigArrays,
Netty4HttpServerPipeliningTests.this.threadPool, Netty4HttpServerPipeliningTests.this.threadPool,
xContentRegistry(), new NullDispatcher(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); xContentRegistry(), new NullDispatcher(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings));
} }
@Override @Override

View File

@ -64,6 +64,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.NettyAllocator;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -164,7 +165,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
} }
}; };
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), dispatcher, clusterSettings)) { xContentRegistry(), dispatcher, clusterSettings, new SharedGroupFactory(settings))) {
transport.start(); transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
try (Netty4HttpClient client = new Netty4HttpClient()) { try (Netty4HttpClient client = new Netty4HttpClient()) {
@ -197,7 +198,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
public void testBindUnavailableAddress() { public void testBindUnavailableAddress() {
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool,
xContentRegistry(), new NullDispatcher(), clusterSettings)) { xContentRegistry(), new NullDispatcher(), clusterSettings, new SharedGroupFactory(Settings.EMPTY))) {
transport.start(); transport.start();
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
Settings settings = Settings.builder() Settings settings = Settings.builder()
@ -205,7 +206,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
.put("network.host", remoteAddress.getAddress()) .put("network.host", remoteAddress.getAddress())
.build(); .build();
try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), new NullDispatcher(), clusterSettings)) { xContentRegistry(), new NullDispatcher(), clusterSettings, new SharedGroupFactory(settings))) {
BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start); BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start);
assertEquals( assertEquals(
"Failed to bind to " + NetworkAddress.format(remoteAddress.address()), "Failed to bind to " + NetworkAddress.format(remoteAddress.address()),
@ -249,7 +250,8 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
} }
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport( try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings)) { settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings,
new SharedGroupFactory(settings))) {
transport.start(); transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
@ -295,7 +297,8 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
.put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "elastic.co").build(); .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "elastic.co").build();
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings))) {
transport.start(); transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
@ -354,7 +357,8 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
NioEventLoopGroup group = new NioEventLoopGroup(); NioEventLoopGroup group = new NioEventLoopGroup();
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) { xContentRegistry(), dispatcher, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings))) {
transport.start(); transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.test.ESTestCase;
public final class SharedGroupFactoryTests extends ESTestCase {
public void testSharedEventLoops() throws Exception {
SharedGroupFactory sharedGroupFactory = new SharedGroupFactory(Settings.EMPTY);
SharedGroupFactory.SharedGroup httpGroup = sharedGroupFactory.getHttpGroup();
SharedGroupFactory.SharedGroup transportGroup = sharedGroupFactory.getTransportGroup();
try {
assertSame(httpGroup.getLowLevelGroup(), transportGroup.getLowLevelGroup());
} finally {
httpGroup.shutdown();
assertFalse(httpGroup.getLowLevelGroup().isShuttingDown());
assertFalse(transportGroup.getLowLevelGroup().isShuttingDown());
assertFalse(transportGroup.getLowLevelGroup().isTerminated());
assertFalse(transportGroup.getLowLevelGroup().terminationFuture().isDone());
transportGroup.shutdown();
assertTrue(httpGroup.getLowLevelGroup().isShuttingDown());
assertTrue(transportGroup.getLowLevelGroup().isShuttingDown());
assertTrue(transportGroup.getLowLevelGroup().isTerminated());
assertTrue(transportGroup.getLowLevelGroup().terminationFuture().isDone());
}
}
public void testNonSharedEventLoops() throws Exception {
Settings settings = Settings.builder()
.put(Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.getKey(), randomIntBetween(1, 10))
.build();
SharedGroupFactory sharedGroupFactory = new SharedGroupFactory(settings);
SharedGroupFactory.SharedGroup httpGroup = sharedGroupFactory.getHttpGroup();
SharedGroupFactory.SharedGroup transportGroup = sharedGroupFactory.getTransportGroup();
try {
assertNotSame(httpGroup.getLowLevelGroup(), transportGroup.getLowLevelGroup());
} finally {
httpGroup.shutdown();
assertTrue(httpGroup.getLowLevelGroup().isShuttingDown());
assertTrue(httpGroup.getLowLevelGroup().isTerminated());
assertTrue(httpGroup.getLowLevelGroup().terminationFuture().isDone());
assertFalse(transportGroup.getLowLevelGroup().isShuttingDown());
assertFalse(transportGroup.getLowLevelGroup().isTerminated());
assertFalse(transportGroup.getLowLevelGroup().terminationFuture().isDone());
transportGroup.shutdown();
assertTrue(transportGroup.getLowLevelGroup().isShuttingDown());
assertTrue(transportGroup.getLowLevelGroup().isTerminated());
assertTrue(transportGroup.getLowLevelGroup().terminationFuture().isDone());
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockSocket; import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.TransportSettings;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -66,7 +67,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
NetworkService networkService = new NetworkService(Collections.emptyList()); NetworkService networkService = new NetworkService(Collections.emptyList());
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler, nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler,
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), new SharedGroupFactory(settings));
nettyTransport.start(); nettyTransport.start();
TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses();

View File

@ -30,6 +30,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.TransportSettings;
import org.junit.Before; import org.junit.Before;
@ -120,7 +121,8 @@ public class NettyTransportMultiPortTests extends ESTestCase {
private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY); PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()), TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(),
new SharedGroupFactory(settings));
transport.start(); transport.start();
assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED)); assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));

View File

@ -32,6 +32,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
@ -49,7 +50,8 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
protected Transport build(Settings settings, final Version version, ClusterSettings clusterSettings, boolean doHandshake) { protected Transport build(Settings settings, final Version version, ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
return new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()), return new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)) {
@Override @Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.core; package org.elasticsearch.xpack.core;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionType;
@ -35,6 +36,7 @@ import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackUsageAction; import org.elasticsearch.xpack.core.action.XPackUsageAction;
@ -282,6 +284,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
} }
private final Settings settings; private final Settings settings;
private final SetOnce<SharedGroupFactory> sharedGroupFactory = new SetOnce<>();
public XPackClientPlugin(final Settings settings) { public XPackClientPlugin(final Settings settings) {
this.settings = settings; this.settings = settings;
@ -692,7 +695,17 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4Transport(settings, Version.CURRENT, threadPool, return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4Transport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService)); networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService,
getNettySharedGroupFactory(settings)));
} }
private synchronized SharedGroupFactory getNettySharedGroupFactory(Settings settings) {
if (sharedGroupFactory.get() != null) {
assert sharedGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided";
return sharedGroupFactory.get();
} else {
sharedGroupFactory.set(new SharedGroupFactory(settings));
return sharedGroupFactory.get();
}
}
} }

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.netty4.Netty4Transport; import org.elasticsearch.transport.netty4.Netty4Transport;
import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackSettings;
@ -61,8 +62,10 @@ public class SecurityNetty4Transport extends Netty4Transport {
final PageCacheRecycler pageCacheRecycler, final PageCacheRecycler pageCacheRecycler,
final NamedWriteableRegistry namedWriteableRegistry, final NamedWriteableRegistry namedWriteableRegistry,
final CircuitBreakerService circuitBreakerService, final CircuitBreakerService circuitBreakerService,
final SSLService sslService) { final SSLService sslService,
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); final SharedGroupFactory sharedGroupFactory) {
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService,
sharedGroupFactory);
this.exceptionHandler = new SecurityTransportExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.exceptionHandler = new SecurityTransportExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e));
this.sslService = sslService; this.sslService = sslService;
this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);

View File

@ -67,6 +67,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest;
@ -300,7 +301,8 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
private final SetOnce<TokenService> tokenService = new SetOnce<>(); private final SetOnce<TokenService> tokenService = new SetOnce<>();
private final SetOnce<SecurityActionFilter> securityActionFilter = new SetOnce<>(); private final SetOnce<SecurityActionFilter> securityActionFilter = new SetOnce<>();
private final SetOnce<SecurityIndexManager> securityIndex = new SetOnce<>(); private final SetOnce<SecurityIndexManager> securityIndex = new SetOnce<>();
private final SetOnce<NioGroupFactory> groupFactory = new SetOnce<>(); private final SetOnce<SharedGroupFactory> sharedGroupFactory = new SetOnce<>();
private final SetOnce<NioGroupFactory> nioGroupFactory = new SetOnce<>();
private final SetOnce<DocumentSubsetBitsetCache> dlsBitsetCache = new SetOnce<>(); private final SetOnce<DocumentSubsetBitsetCache> dlsBitsetCache = new SetOnce<>();
private final SetOnce<List<BootstrapCheck>> bootstrapChecks = new SetOnce<>(); private final SetOnce<List<BootstrapCheck>> bootstrapChecks = new SetOnce<>();
private final List<SecurityExtension> securityExtensions = new ArrayList<>(); private final List<SecurityExtension> securityExtensions = new ArrayList<>();
@ -966,7 +968,8 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
IPFilter ipFilter = this.ipFilter.get(); IPFilter ipFilter = this.ipFilter.get();
Map<String, Supplier<Transport>> transports = new HashMap<>(); Map<String, Supplier<Transport>> transports = new HashMap<>();
transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool, transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService())); networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService(),
getNettySharedGroupFactory(settings)));
transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, networkService, transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, networkService,
pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService(), getNioGroupFactory(settings))); pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService(), getNioGroupFactory(settings)));
@ -987,7 +990,8 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
Map<String, Supplier<HttpServerTransport>> httpTransports = new HashMap<>(); Map<String, Supplier<HttpServerTransport>> httpTransports = new HashMap<>();
httpTransports.put(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings, networkService, bigArrays, httpTransports.put(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings, networkService, bigArrays,
ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher, clusterSettings)); ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher, clusterSettings,
getNettySharedGroupFactory(settings)));
httpTransports.put(SecurityField.NIO, () -> new SecurityNioHttpServerTransport(settings, networkService, bigArrays, httpTransports.put(SecurityField.NIO, () -> new SecurityNioHttpServerTransport(settings, networkService, bigArrays,
pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService(), getNioGroupFactory(settings), pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService(), getNioGroupFactory(settings),
clusterSettings)); clusterSettings));
@ -1113,14 +1117,23 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
} }
private synchronized NioGroupFactory getNioGroupFactory(Settings settings) { private synchronized NioGroupFactory getNioGroupFactory(Settings settings) {
if (groupFactory.get() != null) { if (nioGroupFactory.get() != null) {
assert groupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; assert nioGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided";
return groupFactory.get(); return nioGroupFactory.get();
} else { } else {
groupFactory.set(new NioGroupFactory(settings, logger)); nioGroupFactory.set(new NioGroupFactory(settings, logger));
return groupFactory.get(); return nioGroupFactory.get();
} }
} }
private synchronized SharedGroupFactory getNettySharedGroupFactory(Settings settings) {
if (sharedGroupFactory.get() != null) {
assert sharedGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided";
return sharedGroupFactory.get();
} else {
sharedGroupFactory.set(new SharedGroupFactory(settings));
return sharedGroupFactory.get();
}
}
@Override @Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) { public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport; import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.security.transport.SecurityHttpExceptionHandler; import org.elasticsearch.xpack.security.transport.SecurityHttpExceptionHandler;
@ -37,8 +38,9 @@ public class SecurityNetty4HttpServerTransport extends Netty4HttpServerTransport
public SecurityNetty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, IPFilter ipFilter, public SecurityNetty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, IPFilter ipFilter,
SSLService sslService, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, SSLService sslService, ThreadPool threadPool, NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher, ClusterSettings clusterSettings) { Dispatcher dispatcher, ClusterSettings clusterSettings,
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings); SharedGroupFactory sharedGroupFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, sharedGroupFactory);
this.securityExceptionHandler = new SecurityHttpExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.securityExceptionHandler = new SecurityHttpExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e));
this.ipFilter = ipFilter; this.ipFilter = ipFilter;
final boolean ssl = HTTP_SSL_ENABLED.get(settings); final boolean ssl = HTTP_SSL_ENABLED.get(settings);

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport; import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport;
import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.core.ssl.SSLService;
@ -33,8 +34,10 @@ public class SecurityNetty4ServerTransport extends SecurityNetty4Transport {
final NamedWriteableRegistry namedWriteableRegistry, final NamedWriteableRegistry namedWriteableRegistry,
final CircuitBreakerService circuitBreakerService, final CircuitBreakerService circuitBreakerService,
@Nullable final IPFilter authenticator, @Nullable final IPFilter authenticator,
final SSLService sslService) { final SSLService sslService,
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService); final SharedGroupFactory sharedGroupFactory) {
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, sslService,
sharedGroupFactory);
this.authenticator = authenticator; this.authenticator = authenticator;
} }

View File

@ -18,6 +18,7 @@ import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ssl.SSLClientAuth; import org.elasticsearch.xpack.core.ssl.SSLClientAuth;
import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.core.ssl.SSLService;
@ -71,7 +72,7 @@ public class SecurityNetty4HttpServerTransportTests extends ESTestCase {
SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService,
mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings));
ChannelHandler handler = transport.configureServerChannelHandler(); ChannelHandler handler = transport.configureServerChannelHandler();
final EmbeddedChannel ch = new EmbeddedChannel(handler); final EmbeddedChannel ch = new EmbeddedChannel(handler);
assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false));
@ -88,7 +89,7 @@ public class SecurityNetty4HttpServerTransportTests extends ESTestCase {
SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService,
mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings));
ChannelHandler handler = transport.configureServerChannelHandler(); ChannelHandler handler = transport.configureServerChannelHandler();
final EmbeddedChannel ch = new EmbeddedChannel(handler); final EmbeddedChannel ch = new EmbeddedChannel(handler);
assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false));
@ -105,7 +106,7 @@ public class SecurityNetty4HttpServerTransportTests extends ESTestCase {
SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService,
mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings));
ChannelHandler handler = transport.configureServerChannelHandler(); ChannelHandler handler = transport.configureServerChannelHandler();
final EmbeddedChannel ch = new EmbeddedChannel(handler); final EmbeddedChannel ch = new EmbeddedChannel(handler);
assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(true)); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(true));
@ -122,7 +123,7 @@ public class SecurityNetty4HttpServerTransportTests extends ESTestCase {
SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService,
mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings));
ChannelHandler handler = transport.configureServerChannelHandler(); ChannelHandler handler = transport.configureServerChannelHandler();
final EmbeddedChannel ch = new EmbeddedChannel(handler); final EmbeddedChannel ch = new EmbeddedChannel(handler);
assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false));
@ -137,7 +138,7 @@ public class SecurityNetty4HttpServerTransportTests extends ESTestCase {
SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService,
mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings));
ChannelHandler handler = transport.configureServerChannelHandler(); ChannelHandler handler = transport.configureServerChannelHandler();
EmbeddedChannel ch = new EmbeddedChannel(handler); EmbeddedChannel ch = new EmbeddedChannel(handler);
SSLEngine defaultEngine = ch.pipeline().get(SslHandler.class).engine(); SSLEngine defaultEngine = ch.pipeline().get(SslHandler.class).engine();
@ -150,7 +151,7 @@ public class SecurityNetty4HttpServerTransportTests extends ESTestCase {
sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings)); sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings));
transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()),
mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings));
handler = transport.configureServerChannelHandler(); handler = transport.configureServerChannelHandler();
ch = new EmbeddedChannel(handler); ch = new EmbeddedChannel(handler);
SSLEngine customEngine = ch.pipeline().get(SslHandler.class).engine(); SSLEngine customEngine = ch.pipeline().get(SslHandler.class).engine();
@ -177,7 +178,7 @@ public class SecurityNetty4HttpServerTransportTests extends ESTestCase {
SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService,
mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(), mock(ThreadPool.class), xContentRegistry(), new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new SharedGroupFactory(settings));
assertNotNull(transport.configureServerChannelHandler()); assertNotNull(transport.configureServerChannelHandler());
} }
} }

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase; import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase;
@ -32,7 +33,7 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecu
.put("xpack.security.transport.ssl.enabled", true).build(); .put("xpack.security.transport.ssl.enabled", true).build();
return new SecurityNetty4ServerTransport(settings1, version, threadPool, return new SecurityNetty4ServerTransport(settings1, version, threadPool,
networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
new NoneCircuitBreakerService(), null, createSSLService(settings1)) { new NoneCircuitBreakerService(), null, createSSLService(settings1), new SharedGroupFactory(settings1)) {
@Override @Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,