diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioGroup.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioGroup.java index fe1bc1cf404..abb46cf3aea 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioGroup.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioGroup.java @@ -19,161 +19,26 @@ package org.elasticsearch.nio; -import org.elasticsearch.nio.utils.ExceptionsHelper; - +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** - * The NioGroup is a group of selectors for interfacing with java nio. When it is started it will create the - * configured number of selectors. Each selector will be running in a dedicated thread. Server connections - * can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} method. Client - * connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} method. - *

- * The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method - * when the channel is created. This is what allows an NioGroup to support different channel types. + * An class for interfacing with java.nio. Implementations implement the underlying logic for opening + * channels and registering them with the OS. */ -public class NioGroup implements AutoCloseable { - - - private final List dedicatedAcceptors; - private final RoundRobinSupplier acceptorSupplier; - - private final List selectors; - private final RoundRobinSupplier selectorSupplier; - - private final AtomicBoolean isOpen = new AtomicBoolean(true); +public interface NioGroup extends Closeable { /** - * This will create an NioGroup with no dedicated acceptors. All server channels will be handled by the - * same selectors that are handling child channels. - * - * @param threadFactory factory to create selector threads - * @param selectorCount the number of selectors to be created - * @param eventHandlerFunction function for creating event handlers - * @throws IOException occurs if there is a problem while opening a java.nio.Selector + * Opens and binds a server channel to accept incoming connections. */ - public NioGroup(ThreadFactory threadFactory, int selectorCount, Function, EventHandler> eventHandlerFunction) - throws IOException { - this(null, 0, threadFactory, selectorCount, eventHandlerFunction); - } + S bindServerChannel(InetSocketAddress address, ChannelFactory factory) throws IOException; /** - * This will create an NioGroup with dedicated acceptors. All server channels will be handled by a group - * of selectors dedicated to accepting channels. These accepted channels will be handed off the - * non-server selectors. - * - * @param acceptorThreadFactory factory to create acceptor selector threads - * @param dedicatedAcceptorCount the number of dedicated acceptor selectors to be created - * @param selectorThreadFactory factory to create non-acceptor selector threads - * @param selectorCount the number of non-acceptor selectors to be created - * @param eventHandlerFunction function for creating event handlers - * @throws IOException occurs if there is a problem while opening a java.nio.Selector + * Opens a outgoing client channel. */ - public NioGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount, - Function, EventHandler> eventHandlerFunction) throws IOException { - dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); - selectors = new ArrayList<>(selectorCount); - - try { - List> suppliersToSet = new ArrayList<>(selectorCount); - for (int i = 0; i < selectorCount; ++i) { - RoundRobinSupplier supplier = new RoundRobinSupplier<>(); - suppliersToSet.add(supplier); - NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); - selectors.add(selector); - } - for (RoundRobinSupplier supplierToSet : suppliersToSet) { - supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); - assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; - } - - for (int i = 0; i < dedicatedAcceptorCount; ++i) { - RoundRobinSupplier supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); - NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); - dedicatedAcceptors.add(acceptor); - } - - if (dedicatedAcceptorCount != 0) { - acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); - } else { - acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); - } - selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); - assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; - assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; - - startSelectors(selectors, selectorThreadFactory); - startSelectors(dedicatedAcceptors, acceptorThreadFactory); - } catch (Exception e) { - try { - close(); - } catch (Exception e1) { - e.addSuppressed(e1); - } - throw e; - } - } - - public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) - throws IOException { - ensureOpen(); - return factory.openNioServerSocketChannel(address, acceptorSupplier); - } - - public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { - ensureOpen(); - return factory.openNioChannel(address, selectorSupplier); - } + S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException; @Override - public void close() throws IOException { - if (isOpen.compareAndSet(true, false)) { - List toClose = Stream.concat(dedicatedAcceptors.stream(), selectors.stream()).collect(Collectors.toList()); - List closingExceptions = new ArrayList<>(); - for (NioSelector selector : toClose) { - try { - selector.close(); - } catch (IOException e) { - closingExceptions.add(e); - } - } - ExceptionsHelper.rethrowAndSuppress(closingExceptions); - } - } - - private static void startSelectors(Iterable selectors, ThreadFactory threadFactory) { - for (NioSelector selector : selectors) { - if (selector.isRunning() == false) { - threadFactory.newThread(selector::runLoop).start(); - try { - selector.isRunningFuture().get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Interrupted while waiting for selector to start.", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } else { - throw new RuntimeException("Exception during selector start.", e); - } - } - } - } - } - - private void ensureOpen() { - if (isOpen.get() == false) { - throw new IllegalStateException("NioGroup is closed."); - } - } + void close() throws IOException; } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java new file mode 100644 index 00000000000..bb4d4d14a0c --- /dev/null +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import org.elasticsearch.nio.utils.ExceptionsHelper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * The NioSelectorGroup is a group of selectors for interfacing with java nio. When it is started it will create the + * configured number of selectors. Each selector will be running in a dedicated thread. Server connections + * can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} method. Client + * connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} method. + *

+ * The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method + * when the channel is created. This is what allows an NioSelectorGroup to support different channel types. + */ +public class NioSelectorGroup implements NioGroup { + + + private final List dedicatedAcceptors; + private final RoundRobinSupplier acceptorSupplier; + + private final List selectors; + private final RoundRobinSupplier selectorSupplier; + + private final AtomicBoolean isOpen = new AtomicBoolean(true); + + /** + * This will create an NioSelectorGroup with no dedicated acceptors. All server channels will be handled by the + * same selectors that are handling child channels. + * + * @param threadFactory factory to create selector threads + * @param selectorCount the number of selectors to be created + * @param eventHandlerFunction function for creating event handlers + * @throws IOException occurs if there is a problem while opening a java.nio.Selector + */ + public NioSelectorGroup(ThreadFactory threadFactory, int selectorCount, + Function, EventHandler> eventHandlerFunction) throws IOException { + this(null, 0, threadFactory, selectorCount, eventHandlerFunction); + } + + /** + * This will create an NioSelectorGroup with dedicated acceptors. All server channels will be handled by a group + * of selectors dedicated to accepting channels. These accepted channels will be handed off the + * non-server selectors. + * + * @param acceptorThreadFactory factory to create acceptor selector threads + * @param dedicatedAcceptorCount the number of dedicated acceptor selectors to be created + * @param selectorThreadFactory factory to create non-acceptor selector threads + * @param selectorCount the number of non-acceptor selectors to be created + * @param eventHandlerFunction function for creating event handlers + * @throws IOException occurs if there is a problem while opening a java.nio.Selector + */ + public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, + int selectorCount, Function, EventHandler> eventHandlerFunction) throws IOException { + dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); + selectors = new ArrayList<>(selectorCount); + + try { + List> suppliersToSet = new ArrayList<>(selectorCount); + for (int i = 0; i < selectorCount; ++i) { + RoundRobinSupplier supplier = new RoundRobinSupplier<>(); + suppliersToSet.add(supplier); + NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); + selectors.add(selector); + } + for (RoundRobinSupplier supplierToSet : suppliersToSet) { + supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); + assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; + } + + for (int i = 0; i < dedicatedAcceptorCount; ++i) { + RoundRobinSupplier supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); + dedicatedAcceptors.add(acceptor); + } + + if (dedicatedAcceptorCount != 0) { + acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); + } else { + acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + } + selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; + assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; + + startSelectors(selectors, selectorThreadFactory); + startSelectors(dedicatedAcceptors, acceptorThreadFactory); + } catch (Exception e) { + try { + close(); + } catch (Exception e1) { + e.addSuppressed(e1); + } + throw e; + } + } + + @Override + public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) + throws IOException { + ensureOpen(); + return factory.openNioServerSocketChannel(address, acceptorSupplier); + } + + @Override + public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { + ensureOpen(); + return factory.openNioChannel(address, selectorSupplier); + } + + @Override + public void close() throws IOException { + if (isOpen.compareAndSet(true, false)) { + List toClose = Stream.concat(dedicatedAcceptors.stream(), selectors.stream()).collect(Collectors.toList()); + List closingExceptions = new ArrayList<>(); + for (NioSelector selector : toClose) { + try { + selector.close(); + } catch (IOException e) { + closingExceptions.add(e); + } + } + ExceptionsHelper.rethrowAndSuppress(closingExceptions); + } + } + + private static void startSelectors(Iterable selectors, ThreadFactory threadFactory) { + for (NioSelector selector : selectors) { + if (selector.isRunning() == false) { + threadFactory.newThread(selector::runLoop).start(); + try { + selector.isRunningFuture().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted while waiting for selector to start.", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException("Exception during selector start.", e); + } + } + } + } + } + + private void ensureOpen() { + if (isOpen.get() == false) { + throw new IllegalStateException("NioGroup is closed."); + } + } +} diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorGroupTests.java similarity index 88% rename from libs/nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java rename to libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorGroupTests.java index 027f1255a59..34a8d501ea3 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorGroupTests.java @@ -30,16 +30,16 @@ import java.util.function.Consumer; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.mockito.Mockito.mock; -public class NioGroupTests extends ESTestCase { +public class NioSelectorGroupTests extends ESTestCase { - private NioGroup nioGroup; + private NioSelectorGroup nioGroup; @Override @SuppressWarnings("unchecked") public void setUp() throws Exception { super.setUp(); - nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, daemonThreadFactory(Settings.EMPTY, "selector"), 1, - (s) -> new EventHandler(mock(Consumer.class), s)); + nioGroup = new NioSelectorGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, + daemonThreadFactory(Settings.EMPTY, "selector"), 1, (s) -> new EventHandler(mock(Consumer.class), s)); } @Override @@ -74,7 +74,7 @@ public class NioGroupTests extends ESTestCase { @SuppressWarnings("unchecked") public void testExceptionAtStartIsHandled() throws IOException { RuntimeException ex = new RuntimeException(); - CheckedRunnable ctor = () -> new NioGroup(r -> {throw ex;}, 1, + CheckedRunnable ctor = () -> new NioSelectorGroup(r -> {throw ex;}, 1, daemonThreadFactory(Settings.EMPTY, "selector"), 1, (s) -> new EventHandler(mock(Consumer.class), s)); RuntimeException runtimeException = expectThrows(RuntimeException.class, ctor::run); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index aa77a189cb7..a5f274c7ccd 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -26,13 +26,11 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; import org.elasticsearch.http.HttpChannel; @@ -41,7 +39,6 @@ import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; -import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioSelector; @@ -50,6 +47,7 @@ import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.nio.NioGroupFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -61,8 +59,6 @@ import java.util.function.Consumer; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; -import static org.elasticsearch.common.settings.Setting.intSetting; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS; @@ -83,15 +79,9 @@ import static org.elasticsearch.http.nio.cors.NioCorsHandler.ANY_ORIGIN; public class NioHttpServerTransport extends AbstractHttpServerTransport { private static final Logger logger = LogManager.getLogger(NioHttpServerTransport.class); - public static final Setting NIO_HTTP_ACCEPTOR_COUNT = - intSetting("http.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); - public static final Setting NIO_HTTP_WORKER_COUNT = - new Setting<>("http.nio.worker_count", - (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), - (s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope); - protected final PageCacheRecycler pageCacheRecycler; protected final NioCorsConfig corsConfig; + private final NioGroupFactory nioGroupFactory; protected final boolean tcpNoDelay; protected final boolean tcpKeepAlive; @@ -99,14 +89,15 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { protected final int tcpSendBufferSize; protected final int tcpReceiveBufferSize; - private NioGroup nioGroup; + private volatile NioGroup nioGroup; private ChannelFactory channelFactory; public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, - Dispatcher dispatcher) { + Dispatcher dispatcher, NioGroupFactory nioGroupFactory) { super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher); this.pageCacheRecycler = pageCacheRecycler; + this.nioGroupFactory = nioGroupFactory; ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); @@ -134,11 +125,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { protected void doStart() { boolean success = false; try { - int acceptorCount = NIO_HTTP_ACCEPTOR_COUNT.get(settings); - int workerCount = NIO_HTTP_WORKER_COUNT.get(settings); - nioGroup = new NioGroup(daemonThreadFactory(this.settings, HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount, - (s) -> new EventHandler(this::onNonChannelException, s)); + nioGroup = nioGroupFactory.getHttpGroup(); channelFactory = channelFactory(); bindServer(); success = true; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioGroupFactory.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioGroupFactory.java new file mode 100644 index 00000000000..0c17c1d8b85 --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioGroupFactory.java @@ -0,0 +1,172 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.nio.ChannelFactory; +import org.elasticsearch.nio.EventHandler; +import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelectorGroup; +import org.elasticsearch.nio.NioServerSocketChannel; +import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.transport.TcpTransport; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; + +/** + * Creates and returns {@link NioSelectorGroup} instances. It will return a shared group for + * both {@link #getHttpGroup()} and {@link #getTransportGroup()} if + * {@link NioTransportPlugin#NIO_HTTP_WORKER_COUNT} is configured to be 0. If that setting is not 0, then it + * will return a different group in the {@link #getHttpGroup()} call. + */ +public final class NioGroupFactory { + + private final Logger logger; + private final Settings settings; + private final int httpWorkerCount; + + private RefCountedNioGroup refCountedGroup; + + public NioGroupFactory(Settings settings, Logger logger) { + this.logger = logger; + this.settings = settings; + this.httpWorkerCount = NioTransportPlugin.NIO_HTTP_WORKER_COUNT.get(settings); + } + + public Settings getSettings() { + return settings; + } + + public synchronized NioGroup getTransportGroup() throws IOException { + return getGenericGroup(); + } + + public synchronized NioGroup getHttpGroup() throws IOException { + if (httpWorkerCount == 0) { + return getGenericGroup(); + } else { + return new NioSelectorGroup(daemonThreadFactory(this.settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), + httpWorkerCount, (s) -> new EventHandler(this::onException, s)); + } + } + + private NioGroup getGenericGroup() throws IOException { + if (refCountedGroup == null) { + ThreadFactory threadFactory = daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX); + NioSelectorGroup nioGroup = new NioSelectorGroup(threadFactory, NioTransportPlugin.NIO_WORKER_COUNT.get(settings), + (s) -> new EventHandler(this::onException, s)); + this.refCountedGroup = new RefCountedNioGroup(nioGroup); + return new WrappedNioGroup(refCountedGroup); + } else { + refCountedGroup.incRef(); + return new WrappedNioGroup(refCountedGroup); + } + } + + private void onException(Exception exception) { + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), + exception); + } + + private static class RefCountedNioGroup extends AbstractRefCounted implements NioGroup { + + public static final String NAME = "ref-counted-nio-group"; + private final NioSelectorGroup nioGroup; + + private RefCountedNioGroup(NioSelectorGroup nioGroup) { + super(NAME); + this.nioGroup = nioGroup; + } + + @Override + protected void closeInternal() { + try { + nioGroup.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) + throws IOException { + return nioGroup.bindServerChannel(address, factory); + } + + @Override + public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { + return nioGroup.openChannel(address, factory); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("Should not close. Instead use decRef call."); + } + } + + /** + * Wraps the {@link RefCountedNioGroup}. Calls {@link RefCountedNioGroup#decRef()} on close. After close, + * this wrapped instance can no longer be used. + */ + private class WrappedNioGroup implements NioGroup { + + private final RefCountedNioGroup refCountedNioGroup; + + private final AtomicBoolean isOpen = new AtomicBoolean(true); + + private WrappedNioGroup(RefCountedNioGroup refCountedNioGroup) { + this.refCountedNioGroup = refCountedNioGroup; + } + + public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) + throws IOException { + ensureOpen(); + return refCountedNioGroup.bindServerChannel(address, factory); + } + + public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { + ensureOpen(); + return refCountedNioGroup.openChannel(address, factory); + } + + @Override + public void close() { + if (isOpen.compareAndSet(true, false)) { + refCountedNioGroup.decRef(); + } + } + + private void ensureOpen() { + if (isOpen.get() == false) { + throw new IllegalStateException("NioGroup is closed."); + } + } + } +} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index c5f5b414c07..a9a1c716ef8 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -27,14 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; -import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioGroup; import org.elasticsearch.nio.NioSelector; @@ -54,25 +51,21 @@ import java.util.function.Function; import java.util.function.Supplier; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; public class NioTransport extends TcpTransport { private static final Logger logger = LogManager.getLogger(NioTransport.class); - public static final Setting NIO_WORKER_COUNT = - new Setting<>("transport.nio.worker_count", - (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), - (s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope); - private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); + private final NioGroupFactory groupFactory; private volatile NioGroup nioGroup; private volatile Function clientChannelFactory; protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, - CircuitBreakerService circuitBreakerService) { + CircuitBreakerService circuitBreakerService, NioGroupFactory groupFactory) { super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + this.groupFactory = groupFactory; } @Override @@ -91,8 +84,7 @@ public class NioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), - NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s)); + nioGroup = groupFactory.getTransportGroup(); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = clientChannelFactoryFunction(clientProfileSettings); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index 9c73cbc5a66..647b53ad47a 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -19,6 +19,9 @@ package org.elasticsearch.transport.nio; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -26,6 +29,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.nio.NioHttpServerTransport; @@ -41,17 +45,29 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; +import static org.elasticsearch.common.settings.Setting.intSetting; + public class NioTransportPlugin extends Plugin implements NetworkPlugin { public static final String NIO_TRANSPORT_NAME = "nio-transport"; public static final String NIO_HTTP_TRANSPORT_NAME = "nio-http-transport"; + private static final Logger logger = LogManager.getLogger(NioTransportPlugin.class); + + public static final Setting NIO_WORKER_COUNT = + new Setting<>("transport.nio.worker_count", + (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), + (s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope); + public static final Setting NIO_HTTP_WORKER_COUNT = + intSetting("http.nio.worker_count", 0, 0, Setting.Property.NodeScope); + + private final SetOnce groupFactory = new SetOnce<>(); + @Override public List> getSettings() { return Arrays.asList( - NioHttpServerTransport.NIO_HTTP_ACCEPTOR_COUNT, - NioHttpServerTransport.NIO_HTTP_WORKER_COUNT, - NioTransport.NIO_WORKER_COUNT + NIO_HTTP_WORKER_COUNT, + NIO_WORKER_COUNT ); } @@ -61,7 +77,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { return Collections.singletonMap(NIO_TRANSPORT_NAME, () -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, - circuitBreakerService)); + circuitBreakerService, getNioGroupFactory(settings))); } @Override @@ -73,6 +89,17 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { HttpServerTransport.Dispatcher dispatcher) { return Collections.singletonMap(NIO_HTTP_TRANSPORT_NAME, () -> new NioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, - dispatcher)); + dispatcher, getNioGroupFactory(settings))); + } + + private synchronized NioGroupFactory getNioGroupFactory(Settings settings) { + NioGroupFactory nioGroupFactory = groupFactory.get(); + if (nioGroupFactory != null) { + assert nioGroupFactory.getSettings().equals(settings) : "Different settings than originally provided"; + return nioGroupFactory; + } else { + groupFactory.set(new NioGroupFactory(settings, logger)); + return groupFactory.get(); + } } } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java b/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java index 703f7acbf82..9ed64213cbf 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java @@ -20,10 +20,8 @@ package org.elasticsearch; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.http.nio.NioHttpServerTransport; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.transport.nio.NioTransport; import org.elasticsearch.transport.nio.NioTransportPlugin; import java.util.Collection; @@ -46,8 +44,8 @@ public abstract class NioIntegTestCase extends ESIntegTestCase { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); // randomize nio settings if (randomBoolean()) { - builder.put(NioTransport.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1); - builder.put(NioHttpServerTransport.NIO_HTTP_WORKER_COUNT.getKey(), random().nextInt(3) + 1); + builder.put(NioTransportPlugin.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1); + builder.put(NioTransportPlugin.NIO_HTTP_WORKER_COUNT.getKey(), random().nextInt(3) + 1); } builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NioTransportPlugin.NIO_TRANSPORT_NAME); builder.put(NetworkModule.HTTP_TYPE_KEY, NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java index b49d5b51866..e3259b10b97 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java @@ -45,7 +45,7 @@ import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; -import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelectorGroup; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; @@ -88,11 +88,11 @@ class NioHttpClient implements Closeable { private static final Logger logger = LogManager.getLogger(NioHttpClient.class); - private final NioGroup nioGroup; + private final NioSelectorGroup nioGroup; NioHttpClient() { try { - nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "nio-http-client"), 1, + nioGroup = new NioSelectorGroup(daemonThreadFactory(Settings.EMPTY, "nio-http-client"), 1, (s) -> new EventHandler(this::onException, s)); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java index 13b8e60336e..2ffd5a64147 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java @@ -55,6 +55,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.junit.After; import org.junit.Before; @@ -202,7 +203,7 @@ public class NioHttpServerTransportTests extends ESTestCase { } }; try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, threadPool, - xContentRegistry(), dispatcher)) { + xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); try (NioHttpClient client = new NioHttpClient()) { @@ -235,12 +236,12 @@ public class NioHttpServerTransportTests extends ESTestCase { public void testBindUnavailableAddress() { try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, pageRecycler, - threadPool, xContentRegistry(), new NullDispatcher())) { + threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settings.EMPTY, logger))) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build(); try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, - threadPool, xContentRegistry(), new NullDispatcher())) { + threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settings.EMPTY, logger))) { BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start()); assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage()); } @@ -284,7 +285,7 @@ public class NioHttpServerTransportTests extends ESTestCase { } try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, - threadPool, xContentRegistry(), dispatcher)) { + threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) { transport.start(); final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioGroupFactoryTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioGroupFactoryTests.java new file mode 100644 index 00000000000..356eb39b734 --- /dev/null +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioGroupFactoryTests.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.nio.ChannelFactory; +import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelector; +import org.elasticsearch.nio.NioServerSocketChannel; +import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.ServerChannelContext; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.function.Consumer; + +public class NioGroupFactoryTests extends ESTestCase { + + public void testSharedGroupStillWorksWhenOneInstanceClosed() throws IOException { + NioGroupFactory groupFactory = new NioGroupFactory(Settings.EMPTY, logger); + + InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + NioGroup httpGroup = groupFactory.getHttpGroup(); + try { + NioGroup transportGroup = groupFactory.getTransportGroup(); + transportGroup.close(); + expectThrows(IllegalStateException.class, () -> transportGroup.bindServerChannel(inetSocketAddress, new BindingFactory())); + + httpGroup.bindServerChannel(inetSocketAddress, new BindingFactory()); + } finally { + httpGroup.close(); + } + expectThrows(IllegalStateException.class, () -> httpGroup.bindServerChannel(inetSocketAddress, new BindingFactory())); + } + + private static class BindingFactory extends ChannelFactory { + + private BindingFactory() { + super(new ChannelFactory.RawChannelFactory(false, false, false, -1, -1)); + } + + @Override + public NioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { + throw new IOException("boom"); + } + + @Override + public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { + NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel); + Consumer exceptionHandler = (e) -> {}; + Consumer acceptor = (c) -> {}; + ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); + nioChannel.setContext(context); + return nioChannel; + } + } +} diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java index 61f07aa0162..334e63dc0bf 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.transport.nio; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.NioIntegTestCase; import org.elasticsearch.Version; @@ -54,6 +56,7 @@ import static org.hamcrest.Matchers.is; @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) public class NioTransportIT extends NioIntegTestCase { + // static so we can use it in anonymous classes private static String channelProfileName = null; @@ -86,6 +89,8 @@ public class NioTransportIT extends NioIntegTestCase { public static final class ExceptionThrowingNioTransport extends NioTransport { + private static final Logger logger = LogManager.getLogger(ExceptionThrowingNioTransport.class); + public static class TestPlugin extends Plugin implements NetworkPlugin { @Override @@ -103,7 +108,8 @@ public class NioTransportIT extends NioIntegTestCase { ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { - super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, + new NioGroupFactory(settings, logger)); } @Override diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 82a99103627..a78e924298a 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -52,12 +52,12 @@ import static org.hamcrest.Matchers.instanceOf; public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { - public static MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version, + public MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version, ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NetworkService networkService = new NetworkService(Collections.emptyList()); Transport transport = new NioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings), - namedWriteableRegistry, new NoneCircuitBreakerService()) { + namedWriteableRegistry, new NoneCircuitBreakerService(), new NioGroupFactory(settings, logger)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 92f305cb67a..b79a5e77309 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -278,16 +278,6 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e); } - /** - * Exception handler for exceptions that are not associated with a specific channel. - * - * @param exception the exception - */ - protected void onNonChannelException(Exception exception) { - String threadName = Thread.currentThread().getName(); - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception); - } - protected void serverAcceptedChannel(HttpChannel httpChannel) { boolean addedOnThisCall = httpChannels.add(httpChannel); assert addedOnThisCall : "Channel should only be added to http channel set once"; diff --git a/server/src/main/java/org/elasticsearch/http/HttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/HttpServerTransport.java index 0ce8edcf3b6..de345a39fd6 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/HttpServerTransport.java @@ -29,8 +29,6 @@ public interface HttpServerTransport extends LifecycleComponent { String HTTP_SERVER_WORKER_THREAD_NAME_PREFIX = "http_server_worker"; - String HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX = "http_server_acceptor"; - BoundTransportAddress boundAddress(); HttpInfo info(); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index dd6a21acc8f..e0d939b37a3 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -660,16 +660,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", channel), e); } - /** - * Exception handler for exceptions that are not associated with a specific channel. - * - * @param exception the exception - */ - protected void onNonChannelException(Exception exception) { - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), - exception); - } - protected void serverAcceptedChannel(TcpChannel channel) { boolean addedOnThisCall = acceptedChannels.add(channel); assert addedOnThisCall : "Channel should only be added to accepted channel set once"; diff --git a/server/src/main/java/org/elasticsearch/transport/Transports.java b/server/src/main/java/org/elasticsearch/transport/Transports.java index c531d33224a..d6968a85536 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transports.java +++ b/server/src/main/java/org/elasticsearch/transport/Transports.java @@ -38,7 +38,6 @@ public enum Transports { final String threadName = t.getName(); for (String s : Arrays.asList( HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, - HttpServerTransport.HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX, TEST_MOCK_TRANSPORT_THREAD_PREFIX)) { if (threadName.contains(s)) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index cbadcfa1f38..a7dbce45d3c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -37,7 +37,7 @@ import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.BytesWriteHandler; import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.InboundChannelBuffer; -import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelectorGroup; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; @@ -69,7 +69,7 @@ public class MockNioTransport extends TcpTransport { private static final Logger logger = LogManager.getLogger(MockNioTransport.class); private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); - private volatile NioGroup nioGroup; + private volatile NioSelectorGroup nioGroup; private volatile MockTcpChannelFactory clientChannelFactory; public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, @@ -94,7 +94,7 @@ public class MockNioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, + nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, (s) -> new TestingSocketEventHandler(this::onNonChannelException, s)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); @@ -160,6 +160,11 @@ public class MockNioTransport extends TcpTransport { return builder.build(); } + private void onNonChannelException(Exception exception) { + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), + exception); + } + private void exceptionCaught(NioSocketChannel channel, Exception exception) { onException((TcpChannel) channel, exception); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 714da7cf11c..86f615a259f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -76,6 +76,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackPlugin; @@ -286,6 +287,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw private final SetOnce securityActionFilter = new SetOnce<>(); private final SetOnce securityIndex = new SetOnce<>(); private final SetOnce indexAuditTrail = new SetOnce<>(); + private final SetOnce groupFactory = new SetOnce<>(); private final List bootstrapChecks; private final List securityExtensions = new ArrayList<>(); @@ -943,11 +945,12 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw return Collections.emptyMap(); } + IPFilter ipFilter = this.ipFilter.get(); Map> transports = new HashMap<>(); transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool, - networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); - transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, - networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService())); + transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, networkService, + pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService(), getNioGroupFactory(settings))); return Collections.unmodifiableMap(transports); } @@ -967,7 +970,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw httpTransports.put(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings, networkService, bigArrays, ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher)); httpTransports.put(SecurityField.NIO, () -> new SecurityNioHttpServerTransport(settings, networkService, bigArrays, - pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService())); + pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService(), getNioGroupFactory(settings))); return httpTransports; } @@ -1122,4 +1125,14 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw public void reloadSPI(ClassLoader loader) { securityExtensions.addAll(SecurityExtension.loadExtensions(loader)); } + + private synchronized NioGroupFactory getNioGroupFactory(Settings settings) { + if (groupFactory.get() != null) { + assert groupFactory.get().getSettings().equals(settings) : "Different settings than originally provided"; + return groupFactory.get(); + } else { + groupFactory.set(new NioGroupFactory(settings, logger)); + return groupFactory.get(); + } + } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java index a2dab36be7e..9e0da251883 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransport.java @@ -25,6 +25,7 @@ import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.security.transport.SecurityHttpExceptionHandler; @@ -54,8 +55,8 @@ public class SecurityNioHttpServerTransport extends NioHttpServerTransport { public SecurityNioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, IPFilter ipFilter, - SSLService sslService) { - super(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, dispatcher); + SSLService sslService, NioGroupFactory nioGroupFactory) { + super(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, dispatcher, nioGroupFactory); this.securityExceptionHandler = new SecurityHttpExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e)); this.ipFilter = ipFilter; this.nioIpFilter = new NioIPFilter(ipFilter, IPFilter.HTTP_PROFILE_NAME); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 7fdf946675f..a5b64c84c43 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -29,6 +29,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.transport.nio.NioTcpChannel; import org.elasticsearch.transport.nio.NioTcpServerChannel; import org.elasticsearch.transport.nio.NioTransport; @@ -76,8 +77,9 @@ public class SecurityNioTransport extends NioTransport { public SecurityNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, - SSLService sslService) { - super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + SSLService sslService, NioGroupFactory groupFactory) { + super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, + groupFactory); this.authenticator = authenticator; this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransportTests.java index 2f26456112a..76048590cea 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioHttpServerTransportTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.http.nio.NioHttpChannel; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ssl.SSLClientAuth; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -46,6 +47,7 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { private SSLService sslService; private Environment env; private InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + private NioGroupFactory nioGroupFactory; @Before public void createSSLService() { @@ -67,10 +69,11 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { Settings settings = Settings.builder() .put(env.settings()) .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true).build(); + nioGroupFactory = new NioGroupFactory(settings, logger); sslService = new SSLService(settings, env); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); when(socketChannel.getRemoteAddress()).thenReturn(address); @@ -88,9 +91,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true) .put("xpack.security.http.ssl.client_authentication", value).build(); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); @@ -107,10 +111,11 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put(env.settings()) .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true) .put("xpack.security.http.ssl.client_authentication", value).build(); + nioGroupFactory = new NioGroupFactory(settings, logger); sslService = new SSLService(settings, env); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); @@ -128,9 +133,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true) .put("xpack.security.http.ssl.client_authentication", value).build(); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); @@ -146,9 +152,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put(env.settings()) .put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true).build(); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory(); SocketChannel socketChannel = mock(SocketChannel.class); when(socketChannel.getRemoteAddress()).thenReturn(address); @@ -161,9 +168,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .put("xpack.security.http.ssl.supported_protocols", "TLSv1.2") .build(); sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings)); + nioGroupFactory = new NioGroupFactory(settings, logger); transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); factory = transport.channelFactory(); channel = factory.createChannel(mock(NioSelector.class), socketChannel); SSLEngine customEngine = SSLEngineUtils.getSSLEngine(channel); @@ -183,11 +191,12 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .build(); env = TestEnvironment.newEnvironment(settings); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService)); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory)); assertThat(e.getMessage(), containsString("key must be provided")); } @@ -202,8 +211,9 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase { .build(); env = TestEnvironment.newEnvironment(settings); sslService = new SSLService(settings, env); + nioGroupFactory = new NioGroupFactory(settings, logger); SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class), - xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService); + xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 84d68ffd63e..a30e1329432 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.nio.NioGroupFactory; import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase; import java.util.Collections; @@ -34,7 +35,8 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans .put(settings) .put("xpack.security.transport.ssl.enabled", true).build(); Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, new MockPageCacheRecycler(settings), - namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1)) { + namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1), + new NioGroupFactory(settings, logger)) { @Override public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,