From f516d68fb28ef56271f2784715a68908570d9b87 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 21 Jan 2019 13:50:56 -0700 Subject: [PATCH] Share `NioGroup` between http and transport impls (#37396) Currently we create dedicated network threads for both the http and transport implementations. Since these these threads should never perform blocking operations, these threads could be shared. This commit modifies the nio-transport to have 0 http workers be default. If the default configs are used, this will cause the http transport to be run on the transport worker threads. The http worker setting will still exist in case the user would like to configure dedicated workers. Additionally, this commmit deletes dedicated acceptor threads. We have never had these for the netty transport and they can be added back if a need is determined in the future. --- .../java/org/elasticsearch/nio/NioGroup.java | 153 +-------------- .../elasticsearch/nio/NioSelectorGroup.java | 181 ++++++++++++++++++ ...pTests.java => NioSelectorGroupTests.java} | 10 +- .../http/nio/NioHttpServerTransport.java | 25 +-- .../transport/nio/NioGroupFactory.java | 172 +++++++++++++++++ .../transport/nio/NioTransport.java | 16 +- .../transport/nio/NioTransportPlugin.java | 37 +++- .../org/elasticsearch/NioIntegTestCase.java | 6 +- .../elasticsearch/http/nio/NioHttpClient.java | 6 +- .../http/nio/NioHttpServerTransportTests.java | 9 +- .../transport/nio/NioGroupFactoryTests.java | 78 ++++++++ .../transport/nio/NioTransportIT.java | 8 +- .../nio/SimpleNioTransportTests.java | 4 +- .../http/AbstractHttpServerTransport.java | 10 - .../http/HttpServerTransport.java | 2 - .../elasticsearch/transport/TcpTransport.java | 10 - .../elasticsearch/transport/Transports.java | 1 - .../transport/nio/MockNioTransport.java | 11 +- .../xpack/security/Security.java | 21 +- .../nio/SecurityNioHttpServerTransport.java | 5 +- .../transport/nio/SecurityNioTransport.java | 6 +- .../SecurityNioHttpServerTransportTests.java | 26 ++- .../nio/SimpleSecurityNioTransportTests.java | 4 +- 23 files changed, 559 insertions(+), 242 deletions(-) create mode 100644 libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java rename libs/nio/src/test/java/org/elasticsearch/nio/{NioGroupTests.java => NioSelectorGroupTests.java} (88%) create mode 100644 plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioGroupFactory.java create mode 100644 plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioGroupFactoryTests.java diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioGroup.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioGroup.java index fe1bc1cf404..abb46cf3aea 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioGroup.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioGroup.java @@ -19,161 +19,26 @@ package org.elasticsearch.nio; -import org.elasticsearch.nio.utils.ExceptionsHelper; - +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** - * The NioGroup is a group of selectors for interfacing with java nio. When it is started it will create the - * configured number of selectors. Each selector will be running in a dedicated thread. Server connections - * can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} method. Client - * connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} method. - *

- * The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method - * when the channel is created. This is what allows an NioGroup to support different channel types. + * An class for interfacing with java.nio. Implementations implement the underlying logic for opening + * channels and registering them with the OS. */ -public class NioGroup implements AutoCloseable { - - - private final List dedicatedAcceptors; - private final RoundRobinSupplier acceptorSupplier; - - private final List selectors; - private final RoundRobinSupplier selectorSupplier; - - private final AtomicBoolean isOpen = new AtomicBoolean(true); +public interface NioGroup extends Closeable { /** - * This will create an NioGroup with no dedicated acceptors. All server channels will be handled by the - * same selectors that are handling child channels. - * - * @param threadFactory factory to create selector threads - * @param selectorCount the number of selectors to be created - * @param eventHandlerFunction function for creating event handlers - * @throws IOException occurs if there is a problem while opening a java.nio.Selector + * Opens and binds a server channel to accept incoming connections. */ - public NioGroup(ThreadFactory threadFactory, int selectorCount, Function, EventHandler> eventHandlerFunction) - throws IOException { - this(null, 0, threadFactory, selectorCount, eventHandlerFunction); - } + S bindServerChannel(InetSocketAddress address, ChannelFactory factory) throws IOException; /** - * This will create an NioGroup with dedicated acceptors. All server channels will be handled by a group - * of selectors dedicated to accepting channels. These accepted channels will be handed off the - * non-server selectors. - * - * @param acceptorThreadFactory factory to create acceptor selector threads - * @param dedicatedAcceptorCount the number of dedicated acceptor selectors to be created - * @param selectorThreadFactory factory to create non-acceptor selector threads - * @param selectorCount the number of non-acceptor selectors to be created - * @param eventHandlerFunction function for creating event handlers - * @throws IOException occurs if there is a problem while opening a java.nio.Selector + * Opens a outgoing client channel. */ - public NioGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount, - Function, EventHandler> eventHandlerFunction) throws IOException { - dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); - selectors = new ArrayList<>(selectorCount); - - try { - List> suppliersToSet = new ArrayList<>(selectorCount); - for (int i = 0; i < selectorCount; ++i) { - RoundRobinSupplier supplier = new RoundRobinSupplier<>(); - suppliersToSet.add(supplier); - NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); - selectors.add(selector); - } - for (RoundRobinSupplier supplierToSet : suppliersToSet) { - supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); - assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; - } - - for (int i = 0; i < dedicatedAcceptorCount; ++i) { - RoundRobinSupplier supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); - NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); - dedicatedAcceptors.add(acceptor); - } - - if (dedicatedAcceptorCount != 0) { - acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); - } else { - acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); - } - selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); - assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; - assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; - - startSelectors(selectors, selectorThreadFactory); - startSelectors(dedicatedAcceptors, acceptorThreadFactory); - } catch (Exception e) { - try { - close(); - } catch (Exception e1) { - e.addSuppressed(e1); - } - throw e; - } - } - - public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) - throws IOException { - ensureOpen(); - return factory.openNioServerSocketChannel(address, acceptorSupplier); - } - - public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { - ensureOpen(); - return factory.openNioChannel(address, selectorSupplier); - } + S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException; @Override - public void close() throws IOException { - if (isOpen.compareAndSet(true, false)) { - List toClose = Stream.concat(dedicatedAcceptors.stream(), selectors.stream()).collect(Collectors.toList()); - List closingExceptions = new ArrayList<>(); - for (NioSelector selector : toClose) { - try { - selector.close(); - } catch (IOException e) { - closingExceptions.add(e); - } - } - ExceptionsHelper.rethrowAndSuppress(closingExceptions); - } - } - - private static void startSelectors(Iterable selectors, ThreadFactory threadFactory) { - for (NioSelector selector : selectors) { - if (selector.isRunning() == false) { - threadFactory.newThread(selector::runLoop).start(); - try { - selector.isRunningFuture().get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Interrupted while waiting for selector to start.", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } else { - throw new RuntimeException("Exception during selector start.", e); - } - } - } - } - } - - private void ensureOpen() { - if (isOpen.get() == false) { - throw new IllegalStateException("NioGroup is closed."); - } - } + void close() throws IOException; } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java new file mode 100644 index 00000000000..bb4d4d14a0c --- /dev/null +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import org.elasticsearch.nio.utils.ExceptionsHelper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * The NioSelectorGroup is a group of selectors for interfacing with java nio. When it is started it will create the + * configured number of selectors. Each selector will be running in a dedicated thread. Server connections + * can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} method. Client + * connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} method. + *

+ * The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method + * when the channel is created. This is what allows an NioSelectorGroup to support different channel types. + */ +public class NioSelectorGroup implements NioGroup { + + + private final List dedicatedAcceptors; + private final RoundRobinSupplier acceptorSupplier; + + private final List selectors; + private final RoundRobinSupplier selectorSupplier; + + private final AtomicBoolean isOpen = new AtomicBoolean(true); + + /** + * This will create an NioSelectorGroup with no dedicated acceptors. All server channels will be handled by the + * same selectors that are handling child channels. + * + * @param threadFactory factory to create selector threads + * @param selectorCount the number of selectors to be created + * @param eventHandlerFunction function for creating event handlers + * @throws IOException occurs if there is a problem while opening a java.nio.Selector + */ + public NioSelectorGroup(ThreadFactory threadFactory, int selectorCount, + Function, EventHandler> eventHandlerFunction) throws IOException { + this(null, 0, threadFactory, selectorCount, eventHandlerFunction); + } + + /** + * This will create an NioSelectorGroup with dedicated acceptors. All server channels will be handled by a group + * of selectors dedicated to accepting channels. These accepted channels will be handed off the + * non-server selectors. + * + * @param acceptorThreadFactory factory to create acceptor selector threads + * @param dedicatedAcceptorCount the number of dedicated acceptor selectors to be created + * @param selectorThreadFactory factory to create non-acceptor selector threads + * @param selectorCount the number of non-acceptor selectors to be created + * @param eventHandlerFunction function for creating event handlers + * @throws IOException occurs if there is a problem while opening a java.nio.Selector + */ + public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, + int selectorCount, Function, EventHandler> eventHandlerFunction) throws IOException { + dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); + selectors = new ArrayList<>(selectorCount); + + try { + List> suppliersToSet = new ArrayList<>(selectorCount); + for (int i = 0; i < selectorCount; ++i) { + RoundRobinSupplier supplier = new RoundRobinSupplier<>(); + suppliersToSet.add(supplier); + NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); + selectors.add(selector); + } + for (RoundRobinSupplier supplierToSet : suppliersToSet) { + supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); + assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; + } + + for (int i = 0; i < dedicatedAcceptorCount; ++i) { + RoundRobinSupplier supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); + dedicatedAcceptors.add(acceptor); + } + + if (dedicatedAcceptorCount != 0) { + acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); + } else { + acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + } + selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; + assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; + + startSelectors(selectors, selectorThreadFactory); + startSelectors(dedicatedAcceptors, acceptorThreadFactory); + } catch (Exception e) { + try { + close(); + } catch (Exception e1) { + e.addSuppressed(e1); + } + throw e; + } + } + + @Override + public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) + throws IOException { + ensureOpen(); + return factory.openNioServerSocketChannel(address, acceptorSupplier); + } + + @Override + public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { + ensureOpen(); + return factory.openNioChannel(address, selectorSupplier); + } + + @Override + public void close() throws IOException { + if (isOpen.compareAndSet(true, false)) { + List toClose = Stream.concat(dedicatedAcceptors.stream(), selectors.stream()).collect(Collectors.toList()); + List closingExceptions = new ArrayList<>(); + for (NioSelector selector : toClose) { + try { + selector.close(); + } catch (IOException e) { + closingExceptions.add(e); + } + } + ExceptionsHelper.rethrowAndSuppress(closingExceptions); + } + } + + private static void startSelectors(Iterable selectors, ThreadFactory threadFactory) { + for (NioSelector selector : selectors) { + if (selector.isRunning() == false) { + threadFactory.newThread(selector::runLoop).start(); + try { + selector.isRunningFuture().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted while waiting for selector to start.", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException("Exception during selector start.", e); + } + } + } + } + } + + private void ensureOpen() { + if (isOpen.get() == false) { + throw new IllegalStateException("NioGroup is closed."); + } + } +} diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorGroupTests.java similarity index 88% rename from libs/nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java rename to libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorGroupTests.java index 027f1255a59..34a8d501ea3 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorGroupTests.java @@ -30,16 +30,16 @@ import java.util.function.Consumer; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.mockito.Mockito.mock; -public class NioGroupTests extends ESTestCase { +public class NioSelectorGroupTests extends ESTestCase { - private NioGroup nioGroup; + private NioSelectorGroup nioGroup; @Override @SuppressWarnings("unchecked") public void setUp() throws Exception { super.setUp(); - nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, daemonThreadFactory(Settings.EMPTY, "selector"), 1, - (s) -> new EventHandler(mock(Consumer.class), s)); + nioGroup = new NioSelectorGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, + daemonThreadFactory(Settings.EMPTY, "selector"), 1, (s) -> new EventHandler(mock(Consumer.class), s)); } @Override @@ -74,7 +74,7 @@ public class NioGroupTests extends ESTestCase { @SuppressWarnings("unchecked") public void testExceptionAtStartIsHandled() throws IOException { RuntimeException ex = new RuntimeException(); - CheckedRunnable ctor = () -> new NioGroup(r -> {throw ex;}, 1, + CheckedRunnable ctor = () -> new NioSelectorGroup(r -> {throw ex;}, 1, daemonThreadFactory(Settings.EMPTY, "selector"), 1, (s) -> new EventHandler(mock(Consumer.class), s)); RuntimeException runtimeException = expectThrows(RuntimeException.class, ctor::run); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index aa77a189cb7..a5f274c7ccd 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -26,13 +26,11 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; import org.elasticsearch.http.HttpChannel; @@ -41,7 +39,6 @@ import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; -import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioSelector; @@ -50,6 +47,7 @@ import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.nio.NioGroupFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -61,8 +59,6 @@ import java.util.function.Consumer; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; -import static org.elasticsearch.common.settings.Setting.intSetting; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS; @@ -83,15 +79,9 @@ import static org.elasticsearch.http.nio.cors.NioCorsHandler.ANY_ORIGIN; public class NioHttpServerTransport extends AbstractHttpServerTransport { private static final Logger logger = LogManager.getLogger(NioHttpServerTransport.class); - public static final Setting NIO_HTTP_ACCEPTOR_COUNT = - intSetting("http.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); - public static final Setting NIO_HTTP_WORKER_COUNT = - new Setting<>("http.nio.worker_count", - (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), - (s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope); - protected final PageCacheRecycler pageCacheRecycler; protected final NioCorsConfig corsConfig; + private final NioGroupFactory nioGroupFactory; protected final boolean tcpNoDelay; protected final boolean tcpKeepAlive; @@ -99,14 +89,15 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { protected final int tcpSendBufferSize; protected final int tcpReceiveBufferSize; - private NioGroup nioGroup; + private volatile NioGroup nioGroup; private ChannelFactory channelFactory; public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, - Dispatcher dispatcher) { + Dispatcher dispatcher, NioGroupFactory nioGroupFactory) { super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher); this.pageCacheRecycler = pageCacheRecycler; + this.nioGroupFactory = nioGroupFactory; ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); @@ -134,11 +125,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { protected void doStart() { boolean success = false; try { - int acceptorCount = NIO_HTTP_ACCEPTOR_COUNT.get(settings); - int workerCount = NIO_HTTP_WORKER_COUNT.get(settings); - nioGroup = new NioGroup(daemonThreadFactory(this.settings, HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount, - (s) -> new EventHandler(this::onNonChannelException, s)); + nioGroup = nioGroupFactory.getHttpGroup(); channelFactory = channelFactory(); bindServer(); success = true; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioGroupFactory.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioGroupFactory.java new file mode 100644 index 00000000000..0c17c1d8b85 --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioGroupFactory.java @@ -0,0 +1,172 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.nio.ChannelFactory; +import org.elasticsearch.nio.EventHandler; +import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelectorGroup; +import org.elasticsearch.nio.NioServerSocketChannel; +import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.transport.TcpTransport; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; + +/** + * Creates and returns {@link NioSelectorGroup} instances. It will return a shared group for + * both {@link #getHttpGroup()} and {@link #getTransportGroup()} if + * {@link NioTransportPlugin#NIO_HTTP_WORKER_COUNT} is configured to be 0. If that setting is not 0, then it + * will return a different group in the {@link #getHttpGroup()} call. + */ +public final class NioGroupFactory { + + private final Logger logger; + private final Settings settings; + private final int httpWorkerCount; + + private RefCountedNioGroup refCountedGroup; + + public NioGroupFactory(Settings settings, Logger logger) { + this.logger = logger; + this.settings = settings; + this.httpWorkerCount = NioTransportPlugin.NIO_HTTP_WORKER_COUNT.get(settings); + } + + public Settings getSettings() { + return settings; + } + + public synchronized NioGroup getTransportGroup() throws IOException { + return getGenericGroup(); + } + + public synchronized NioGroup getHttpGroup() throws IOException { + if (httpWorkerCount == 0) { + return getGenericGroup(); + } else { + return new NioSelectorGroup(daemonThreadFactory(this.settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), + httpWorkerCount, (s) -> new EventHandler(this::onException, s)); + } + } + + private NioGroup getGenericGroup() throws IOException { + if (refCountedGroup == null) { + ThreadFactory threadFactory = daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX); + NioSelectorGroup nioGroup = new NioSelectorGroup(threadFactory, NioTransportPlugin.NIO_WORKER_COUNT.get(settings), + (s) -> new EventHandler(this::onException, s)); + this.refCountedGroup = new RefCountedNioGroup(nioGroup); + return new WrappedNioGroup(refCountedGroup); + } else { + refCountedGroup.incRef(); + return new WrappedNioGroup(refCountedGroup); + } + } + + private void onException(Exception exception) { + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), + exception); + } + + private static class RefCountedNioGroup extends AbstractRefCounted implements NioGroup { + + public static final String NAME = "ref-counted-nio-group"; + private final NioSelectorGroup nioGroup; + + private RefCountedNioGroup(NioSelectorGroup nioGroup) { + super(NAME); + this.nioGroup = nioGroup; + } + + @Override + protected void closeInternal() { + try { + nioGroup.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) + throws IOException { + return nioGroup.bindServerChannel(address, factory); + } + + @Override + public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { + return nioGroup.openChannel(address, factory); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("Should not close. Instead use decRef call."); + } + } + + /** + * Wraps the {@link RefCountedNioGroup}. Calls {@link RefCountedNioGroup#decRef()} on close. After close, + * this wrapped instance can no longer be used. + */ + private class WrappedNioGroup implements NioGroup { + + private final RefCountedNioGroup refCountedNioGroup; + + private final AtomicBoolean isOpen = new AtomicBoolean(true); + + private WrappedNioGroup(RefCountedNioGroup refCountedNioGroup) { + this.refCountedNioGroup = refCountedNioGroup; + } + + public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) + throws IOException { + ensureOpen(); + return refCountedNioGroup.bindServerChannel(address, factory); + } + + public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { + ensureOpen(); + return refCountedNioGroup.openChannel(address, factory); + } + + @Override + public void close() { + if (isOpen.compareAndSet(true, false)) { + refCountedNioGroup.decRef(); + } + } + + private void ensureOpen() { + if (isOpen.get() == false) { + throw new IllegalStateException("NioGroup is closed."); + } + } + } +} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index c5f5b414c07..a9a1c716ef8 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -27,14 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; -import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioSelector; @@ -54,25 +51,21 @@ import java.util.function.Function; import java.util.function.Supplier; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; public class NioTransport extends TcpTransport { private static final Logger logger = LogManager.getLogger(NioTransport.class); - public static final Setting NIO_WORKER_COUNT = - new Setting<>("transport.nio.worker_count", - (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), - (s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope); - private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); + private final NioGroupFactory groupFactory; private volatile NioGroup nioGroup; private volatile Function clientChannelFactory; protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, - CircuitBreakerService circuitBreakerService) { + CircuitBreakerService circuitBreakerService, NioGroupFactory groupFactory) { super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + this.groupFactory = groupFactory; } @Override @@ -91,8 +84,7 @@ public class NioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), - NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s)); + nioGroup = groupFactory.getTransportGroup(); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = clientChannelFactoryFunction(clientProfileSettings); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index 9c73cbc5a66..647b53ad47a 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -19,6 +19,9 @@ package org.elasticsearch.transport.nio; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -26,6 +29,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.nio.NioHttpServerTransport; @@ -41,17 +45,29 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; +import static org.elasticsearch.common.settings.Setting.intSetting; + public class NioTransportPlugin extends Plugin implements NetworkPlugin { public static final String NIO_TRANSPORT_NAME = "nio-transport"; public static final String NIO_HTTP_TRANSPORT_NAME = "nio-http-transport"; + private static final Logger logger = LogManager.getLogger(NioTransportPlugin.class); + + public static final Setting NIO_WORKER_COUNT = + new Setting<>("transport.nio.worker_count", + (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), + (s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope); + public static final Setting NIO_HTTP_WORKER_COUNT = + intSetting("http.nio.worker_count", 0, 0, Setting.Property.NodeScope); + + private final SetOnce groupFactory = new SetOnce<>(); + @Override public List> getSettings() { return Arrays.asList( - NioHttpServerTransport.NIO_HTTP_ACCEPTOR_COUNT, - NioHttpServerTransport.NIO_HTTP_WORKER_COUNT, - NioTransport.NIO_WORKER_COUNT + NIO_HTTP_WORKER_COUNT, + NIO_WORKER_COUNT ); } @@ -61,7 +77,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(NIO_TRANSPORT_NAME, () -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, - circuitBreakerService)); + circuitBreakerService, getNioGroupFactory(settings))); } @Override @@ -73,6 +89,17 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { HttpServerTransport.Dispatcher dispatcher) { return Collections.singletonMap(NIO_HTTP_TRANSPORT_NAME, () -> new NioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, - dispatcher)); + dispatcher, getNioGroupFactory(settings))); + } + + private synchronized NioGroupFactory getNioGroupFactory(Settings settings) { + NioGroupFactory nioGroupFactory = groupFactory.get(); + if (nioGroupFactory != null) { + assert nioGroupFactory.getSettings().equals(settings) : "Different settings than originally provided"; + return nioGroupFactory; + } else { + groupFactory.set(new NioGroupFactory(settings, logger)); + return groupFactory.get(); + } } } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java b/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java index 703f7acbf82..9ed64213cbf 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java @@ -20,10 +20,8 @@ package org.elasticsearch; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.http.nio.NioHttpServerTransport; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.transport.nio.NioTransport; import org.elasticsearch.transport.nio.NioTransportPlugin; import java.util.Collection; @@ -46,8 +44,8 @@ public abstract class NioIntegTestCase extends ESIntegTestCase { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); // randomize nio settings if (randomBoolean()) { - builder.put(NioTransport.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1); - builder.put(NioHttpServerTransport.NIO_HTTP_WORKER_COUNT.getKey(), random().nextInt(3) + 1); + builder.put(NioTransportPlugin.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1); + builder.put(NioTransportPlugin.NIO_HTTP_WORKER_COUNT.getKey(), random().nextInt(3) + 1); } builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NioTransportPlugin.NIO_TRANSPORT_NAME); builder.put(NetworkModule.HTTP_TYPE_KEY, NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java index b49d5b51866..e3259b10b97 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java @@ -45,7 +45,7 @@ import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; -import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelectorGroup; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; @@ -88,11 +88,11 @@ class NioHttpClient implements Closeable { private static final Logger logger = LogManager.getLogger(NioHttpClient.class); - private final NioGroup nioGroup; + private final NioSelectorGroup nioGroup; NioHttpClient() { try { - nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "nio-http-client"), 1, + nioGroup = new NioSelectorGroup(daemonThreadFactory(Settings.EMPTY, "nio-http-client"), 1, (s) -> new EventHandler(this::onException, s)); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java index 13b8e60336e..2ffd5a64147 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java @@ -55,6 +55,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.junit.After; import org.junit.Before; @@ -202,7 +203,7 @@ public class NioHttpServerTransportTests extends ESTestCase { } }; try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, threadPool, - xContentRegistry(), dispatcher)) { + xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); try (NioHttpClient client = new NioHttpClient()) { @@ -235,12 +236,12 @@ public class NioHttpServerTransportTests extends ESTestCase { public void testBindUnavailableAddress() { try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, pageRecycler, - threadPool, xContentRegistry(), new NullDispatcher())) { + threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settings.EMPTY, logger))) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build(); try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, - threadPool, xContentRegistry(), new NullDispatcher())) { + threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settings.EMPTY, logger))) { BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start()); assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage()); } @@ -284,7 +285,7 @@ public class NioHttpServerTransportTests extends ESTestCase { } try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, - threadPool, xContentRegistry(), dispatcher)) { + threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioGroupFactoryTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioGroupFactoryTests.java new file mode 100644 index 00000000000..356eb39b734 --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioGroupFactoryTests.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.nio.ChannelFactory; +import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelector; +import org.elasticsearch.nio.NioServerSocketChannel; +import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.ServerChannelContext; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.function.Consumer; + +public class NioGroupFactoryTests extends ESTestCase { + + public void testSharedGroupStillWorksWhenOneInstanceClosed() throws IOException { + NioGroupFactory groupFactory = new NioGroupFactory(Settings.EMPTY, logger); + + InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + NioGroup httpGroup = groupFactory.getHttpGroup(); + try { + NioGroup transportGroup = groupFactory.getTransportGroup(); + transportGroup.close(); + expectThrows(IllegalStateException.class, () -> transportGroup.bindServerChannel(inetSocketAddress, new BindingFactory())); + + httpGroup.bindServerChannel(inetSocketAddress, new BindingFactory()); + } finally { + httpGroup.close(); + } + expectThrows(IllegalStateException.class, () -> httpGroup.bindServerChannel(inetSocketAddress, new BindingFactory())); + } + + private static class BindingFactory extends ChannelFactory { + + private BindingFactory() { + super(new ChannelFactory.RawChannelFactory(false, false, false, -1, -1)); + } + + @Override + public NioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { + throw new IOException("boom"); + } + + @Override + public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { + NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel); + Consumer exceptionHandler = (e) -> {}; + Consumer acceptor = (c) -> {}; + ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); + nioChannel.setContext(context); + return nioChannel; + } + } +} diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java index 61f07aa0162..334e63dc0bf 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.transport.nio; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.NioIntegTestCase; import org.elasticsearch.Version; @@ -54,6 +56,7 @@ import static org.hamcrest.Matchers.is; @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) public class NioTransportIT extends NioIntegTestCase { + // static so we can use it in anonymous classes private static String channelProfileName = null; @@ -86,6 +89,8 @@ public class NioTransportIT extends NioIntegTestCase { public static final class ExceptionThrowingNioTransport extends NioTransport { + private static final Logger logger = LogManager.getLogger(ExceptionThrowingNioTransport.class); + public static class TestPlugin extends Plugin implements NetworkPlugin { @Override @@ -103,7 +108,8 @@ public class NioTransportIT extends NioIntegTestCase { ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, + new NioGroupFactory(settings, logger)); } @Override diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 82a99103627..a78e924298a 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -52,12 +52,12 @@ import static org.hamcrest.Matchers.instanceOf; public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { - public static MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version, + public MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version, ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NetworkService networkService = new NetworkService(Collections.emptyList()); Transport transport = new NioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings), - namedWriteableRegistry, new NoneCircuitBreakerService()) { + namedWriteableRegistry, new NoneCircuitBreakerService(), new NioGroupFactory(settings, logger)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 92f305cb67a..b79a5e77309 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -278,16 +278,6 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e); } - /** - * Exception handler for exceptions that are not associated with a specific channel. - * - * @param exception the exception - */ - protected void onNonChannelException(Exception exception) { - String threadName = Thread.currentThread().getName(); - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception); - } - protected void serverAcceptedChannel(HttpChannel httpChannel) { boolean addedOnThisCall = httpChannels.add(httpChannel); assert addedOnThisCall : "Channel should only be added to http channel set once"; diff --git a/server/src/main/java/org/elasticsearch/http/HttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/HttpServerTransport.java index 0ce8edcf3b6..de345a39fd6 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/HttpServerTransport.java @@ -29,8 +29,6 @@ public interface HttpServerTransport extends LifecycleComponent { String HTTP_SERVER_WORKER_THREAD_NAME_PREFIX = "http_server_worker"; - String HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX = "http_server_acceptor"; - BoundTransportAddress boundAddress(); HttpInfo info(); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index dd6a21acc8f..e0d939b37a3 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -660,16 +660,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", channel), e); } - /** - * Exception handler for exceptions that are not associated with a specific channel. - * - * @param exception the exception - */ - protected void onNonChannelException(Exception exception) { - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), - exception); - } - protected void serverAcceptedChannel(TcpChannel channel) { boolean addedOnThisCall = acceptedChannels.add(channel); assert addedOnThisCall : "Channel should only be added to accepted channel set once"; diff --git a/server/src/main/java/org/elasticsearch/transport/Transports.java b/server/src/main/java/org/elasticsearch/transport/Transports.java index c531d33224a..d6968a85536 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transports.java +++ b/server/src/main/java/org/elasticsearch/transport/Transports.java @@ -38,7 +38,6 @@ public enum Transports { final String threadName = t.getName(); for (String s : Arrays.asList( HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, - HttpServerTransport.HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX, TEST_MOCK_TRANSPORT_THREAD_PREFIX)) { if (threadName.contains(s)) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index cbadcfa1f38..a7dbce45d3c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -37,7 +37,7 @@ import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.BytesWriteHandler; import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.InboundChannelBuffer; -import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelectorGroup; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; @@ -69,7 +69,7 @@ public class MockNioTransport extends TcpTransport { private static final Logger logger = LogManager.getLogger(MockNioTransport.class); private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); - private volatile NioGroup nioGroup; + private volatile NioSelectorGroup nioGroup; private volatile MockTcpChannelFactory clientChannelFactory; public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, @@ -94,7 +94,7 @@ public class MockNioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, + nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, (s) -> new TestingSocketEventHandler(this::onNonChannelException, s)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); @@ -160,6 +160,11 @@ public class MockNioTransport extends TcpTransport { return builder.build(); } + private void onNonChannelException(Exception exception) { + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), + exception); + } + private void exceptionCaught(NioSocketChannel channel, Exception exception) { onException((TcpChannel) channel, exception); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 714da7cf11c..86f615a259f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -76,6 +76,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackPlugin; @@ -286,6 +287,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw private final SetOnce securityActionFilter = new SetOnce<>(); private final SetOnce securityIndex = new SetOnce<>(); private final SetOnce indexAuditTrail = new SetOnce<>(); + private final SetOnce groupFactory = new SetOnce<>(); private final List bootstrapChecks; private final List securityExtensions = new ArrayList<>(); @@ -943,11 +945,12 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw return Collections.emptyMap(); } + IPFilter ipFilter = this.ipFilter.get(); Map> transports = new HashMap<>(); transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool, - networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); - transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, - networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService())); + transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, networkService, + pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService(), getNioGroupFactory(settings))); return Collections.unmodifiableMap(transports); } @@ -967,7 +970,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw httpTransports.put(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings, networkService, bigArrays, ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher)); httpTransports.put(SecurityField.NIO, () -> new SecurityNioHttpServerTransport(settings, networkService, bigArrays, - pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService())); + pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService(), getNioGroupFactory(settings))); return httpTransports; } @@ -1122,4 +1125,14 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw public void reloadSPI(ClassLoader loader) { securityExtensions.addAll(SecurityExtension.loadExtensions(loader)); } + + private synchronized NioGroupFactory getNioGroupFactory(Settings settings) { + if (groupFactory.get() != null) { + assert groupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; + return groupFactory.get(); + } else { + groupFactory.set(new NioGroupFactory(settings, logger)); + return groupFactory.get(); + } + } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java index a2dab36be7e..9e0da251883 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java @@ -25,6 +25,7 @@ import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.security.transport.SecurityHttpExceptionHandler; @@ -54,8 +55,8 @@ public class SecurityNioHttpServerTransport extends NioHttpServerTransport { public SecurityNioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, IPFilter ipFilter, - SSLService sslService) { - super(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, dispatcher); + SSLService sslService, NioGroupFactory nioGroupFactory) { + super(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, dispatcher, nioGroupFactory); this.securityExceptionHandler = new SecurityHttpExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.ipFilter = ipFilter; this.nioIpFilter = new NioIPFilter(ipFilter, IPFilter.HTTP_PROFILE_NAME); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 7fdf946675f..a5b64c84c43 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -29,6 +29,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.transport.nio.NioTcpChannel; import org.elasticsearch.transport.nio.NioTcpServerChannel; import org.elasticsearch.transport.nio.NioTransport; @@ -76,8 +77,9 @@ public class SecurityNioTransport extends NioTransport { public SecurityNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, - SSLService sslService) { - super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + SSLService sslService, NioGroupFactory groupFactory) { + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, + groupFactory); this.authenticator = authenticator; this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransportTests.java index 2f26456112a..76048590cea 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransportTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.http.nio.NioHttpChannel; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ssl.SSLClientAuth; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -46,6 +47,7 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { private SSLService sslService; private Environment env; private InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + private NioGroupFactory nioGroupFactory; @Before public void createSSLService() { @@ -67,10 +69,11 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { Settings settings = Settings.builder() .put(env.settings()) .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true).build(); + nioGroupFactory = new NioGroupFactory(settings, logger); sslService = new SSLService(settings, env); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); when(socketChannel.getRemoteAddress()).thenReturn(address); @@ -88,9 +91,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true) .put("xpack.security.http.ssl.client_authentication", value).build(); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); @@ -107,10 +111,11 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put(env.settings()) .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true) .put("xpack.security.http.ssl.client_authentication", value).build(); + nioGroupFactory = new NioGroupFactory(settings, logger); sslService = new SSLService(settings, env); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); @@ -128,9 +133,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true) .put("xpack.security.http.ssl.client_authentication", value).build(); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); @@ -146,9 +152,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put(env.settings()) .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true).build(); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); when(socketChannel.getRemoteAddress()).thenReturn(address); @@ -161,9 +168,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put("xpack.security.http.ssl.supported_protocols", "TLSv1.2") .build(); sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings)); + nioGroupFactory = new NioGroupFactory(settings, logger); transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); factory = transport.channelFactory(); channel = factory.createChannel(mock(NioSelector.class), socketChannel); SSLEngine customEngine = SSLEngineUtils.getSSLEngine(channel); @@ -183,11 +191,12 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .build(); env = TestEnvironment.newEnvironment(settings); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService)); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory)); assertThat(e.getMessage(), containsString("key must be provided")); } @@ -202,8 +211,9 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .build(); env = TestEnvironment.newEnvironment(settings); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 84d68ffd63e..a30e1329432 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase; import java.util.Collections; @@ -34,7 +35,8 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans .put(settings) .put("xpack.security.transport.ssl.enabled", true).build(); Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, new MockPageCacheRecycler(settings), - namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1)) { + namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1), + new NioGroupFactory(settings, logger)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,