Share NioGroup between http and transport impls (#37396)

Currently we create dedicated network threads for both the http and
transport implementations. Since these these threads should never
perform blocking operations, these threads could be shared. This commit
modifies the nio-transport to have 0 http workers be default. If the
default configs are used, this will cause the http transport to be run
on the transport worker threads. The http worker setting will still exist
in case the user would like to configure dedicated workers. Additionally,
this commmit deletes dedicated acceptor threads. We have never had these
for the netty transport and they can be added back if a need is
determined in the future.
This commit is contained in:
Tim Brooks 2019-01-21 13:50:56 -07:00 committed by GitHub
parent 9a34b20233
commit f516d68fb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 559 additions and 242 deletions

View File

@ -19,161 +19,26 @@
package org.elasticsearch.nio;
import org.elasticsearch.nio.utils.ExceptionsHelper;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* The NioGroup is a group of selectors for interfacing with java nio. When it is started it will create the
* configured number of selectors. Each selector will be running in a dedicated thread. Server connections
* can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} method. Client
* connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} method.
* <p>
* The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method
* when the channel is created. This is what allows an NioGroup to support different channel types.
* An class for interfacing with java.nio. Implementations implement the underlying logic for opening
* channels and registering them with the OS.
*/
public class NioGroup implements AutoCloseable {
private final List<NioSelector> dedicatedAcceptors;
private final RoundRobinSupplier<NioSelector> acceptorSupplier;
private final List<NioSelector> selectors;
private final RoundRobinSupplier<NioSelector> selectorSupplier;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
public interface NioGroup extends Closeable {
/**
* This will create an NioGroup with no dedicated acceptors. All server channels will be handled by the
* same selectors that are handling child channels.
*
* @param threadFactory factory to create selector threads
* @param selectorCount the number of selectors to be created
* @param eventHandlerFunction function for creating event handlers
* @throws IOException occurs if there is a problem while opening a java.nio.Selector
* Opens and binds a server channel to accept incoming connections.
*/
public NioGroup(ThreadFactory threadFactory, int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction)
throws IOException {
this(null, 0, threadFactory, selectorCount, eventHandlerFunction);
}
<S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory) throws IOException;
/**
* This will create an NioGroup with dedicated acceptors. All server channels will be handled by a group
* of selectors dedicated to accepting channels. These accepted channels will be handed off the
* non-server selectors.
*
* @param acceptorThreadFactory factory to create acceptor selector threads
* @param dedicatedAcceptorCount the number of dedicated acceptor selectors to be created
* @param selectorThreadFactory factory to create non-acceptor selector threads
* @param selectorCount the number of non-acceptor selectors to be created
* @param eventHandlerFunction function for creating event handlers
* @throws IOException occurs if there is a problem while opening a java.nio.Selector
* Opens a outgoing client channel.
*/
public NioGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount,
Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException {
dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);
selectors = new ArrayList<>(selectorCount);
try {
List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount);
for (int i = 0; i < selectorCount; ++i) {
RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>();
suppliersToSet.add(supplier);
NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));
selectors.add(selector);
}
for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) {
supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));
assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";
}
for (int i = 0; i < dedicatedAcceptorCount; ++i) {
RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));
dedicatedAcceptors.add(acceptor);
}
if (dedicatedAcceptorCount != 0) {
acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));
} else {
acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
}
selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";
assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";
startSelectors(selectors, selectorThreadFactory);
startSelectors(dedicatedAcceptors, acceptorThreadFactory);
} catch (Exception e) {
try {
close();
} catch (Exception e1) {
e.addSuppressed(e1);
}
throw e;
}
}
public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
throws IOException {
ensureOpen();
return factory.openNioServerSocketChannel(address, acceptorSupplier);
}
public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
ensureOpen();
return factory.openNioChannel(address, selectorSupplier);
}
<S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException;
@Override
public void close() throws IOException {
if (isOpen.compareAndSet(true, false)) {
List<NioSelector> toClose = Stream.concat(dedicatedAcceptors.stream(), selectors.stream()).collect(Collectors.toList());
List<IOException> closingExceptions = new ArrayList<>();
for (NioSelector selector : toClose) {
try {
selector.close();
} catch (IOException e) {
closingExceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(closingExceptions);
}
}
private static void startSelectors(Iterable<NioSelector> selectors, ThreadFactory threadFactory) {
for (NioSelector selector : selectors) {
if (selector.isRunning() == false) {
threadFactory.newThread(selector::runLoop).start();
try {
selector.isRunningFuture().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while waiting for selector to start.", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException("Exception during selector start.", e);
}
}
}
}
}
private void ensureOpen() {
if (isOpen.get() == false) {
throw new IllegalStateException("NioGroup is closed.");
}
}
void close() throws IOException;
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import org.elasticsearch.nio.utils.ExceptionsHelper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* The NioSelectorGroup is a group of selectors for interfacing with java nio. When it is started it will create the
* configured number of selectors. Each selector will be running in a dedicated thread. Server connections
* can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} method. Client
* connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} method.
* <p>
* The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method
* when the channel is created. This is what allows an NioSelectorGroup to support different channel types.
*/
public class NioSelectorGroup implements NioGroup {
private final List<NioSelector> dedicatedAcceptors;
private final RoundRobinSupplier<NioSelector> acceptorSupplier;
private final List<NioSelector> selectors;
private final RoundRobinSupplier<NioSelector> selectorSupplier;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
/**
* This will create an NioSelectorGroup with no dedicated acceptors. All server channels will be handled by the
* same selectors that are handling child channels.
*
* @param threadFactory factory to create selector threads
* @param selectorCount the number of selectors to be created
* @param eventHandlerFunction function for creating event handlers
* @throws IOException occurs if there is a problem while opening a java.nio.Selector
*/
public NioSelectorGroup(ThreadFactory threadFactory, int selectorCount,
Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException {
this(null, 0, threadFactory, selectorCount, eventHandlerFunction);
}
/**
* This will create an NioSelectorGroup with dedicated acceptors. All server channels will be handled by a group
* of selectors dedicated to accepting channels. These accepted channels will be handed off the
* non-server selectors.
*
* @param acceptorThreadFactory factory to create acceptor selector threads
* @param dedicatedAcceptorCount the number of dedicated acceptor selectors to be created
* @param selectorThreadFactory factory to create non-acceptor selector threads
* @param selectorCount the number of non-acceptor selectors to be created
* @param eventHandlerFunction function for creating event handlers
* @throws IOException occurs if there is a problem while opening a java.nio.Selector
*/
public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory,
int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException {
dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);
selectors = new ArrayList<>(selectorCount);
try {
List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount);
for (int i = 0; i < selectorCount; ++i) {
RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>();
suppliersToSet.add(supplier);
NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));
selectors.add(selector);
}
for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) {
supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));
assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";
}
for (int i = 0; i < dedicatedAcceptorCount; ++i) {
RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));
dedicatedAcceptors.add(acceptor);
}
if (dedicatedAcceptorCount != 0) {
acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));
} else {
acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
}
selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";
assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";
startSelectors(selectors, selectorThreadFactory);
startSelectors(dedicatedAcceptors, acceptorThreadFactory);
} catch (Exception e) {
try {
close();
} catch (Exception e1) {
e.addSuppressed(e1);
}
throw e;
}
}
@Override
public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
throws IOException {
ensureOpen();
return factory.openNioServerSocketChannel(address, acceptorSupplier);
}
@Override
public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
ensureOpen();
return factory.openNioChannel(address, selectorSupplier);
}
@Override
public void close() throws IOException {
if (isOpen.compareAndSet(true, false)) {
List<NioSelector> toClose = Stream.concat(dedicatedAcceptors.stream(), selectors.stream()).collect(Collectors.toList());
List<IOException> closingExceptions = new ArrayList<>();
for (NioSelector selector : toClose) {
try {
selector.close();
} catch (IOException e) {
closingExceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(closingExceptions);
}
}
private static void startSelectors(Iterable<NioSelector> selectors, ThreadFactory threadFactory) {
for (NioSelector selector : selectors) {
if (selector.isRunning() == false) {
threadFactory.newThread(selector::runLoop).start();
try {
selector.isRunningFuture().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while waiting for selector to start.", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException("Exception during selector start.", e);
}
}
}
}
}
private void ensureOpen() {
if (isOpen.get() == false) {
throw new IllegalStateException("NioGroup is closed.");
}
}
}

View File

@ -30,16 +30,16 @@ import java.util.function.Consumer;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.mockito.Mockito.mock;
public class NioGroupTests extends ESTestCase {
public class NioSelectorGroupTests extends ESTestCase {
private NioGroup nioGroup;
private NioSelectorGroup nioGroup;
@Override
@SuppressWarnings("unchecked")
public void setUp() throws Exception {
super.setUp();
nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, daemonThreadFactory(Settings.EMPTY, "selector"), 1,
(s) -> new EventHandler(mock(Consumer.class), s));
nioGroup = new NioSelectorGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1,
daemonThreadFactory(Settings.EMPTY, "selector"), 1, (s) -> new EventHandler(mock(Consumer.class), s));
}
@Override
@ -74,7 +74,7 @@ public class NioGroupTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testExceptionAtStartIsHandled() throws IOException {
RuntimeException ex = new RuntimeException();
CheckedRunnable<IOException> ctor = () -> new NioGroup(r -> {throw ex;}, 1,
CheckedRunnable<IOException> ctor = () -> new NioSelectorGroup(r -> {throw ex;}, 1,
daemonThreadFactory(Settings.EMPTY, "selector"),
1, (s) -> new EventHandler(mock(Consumer.class), s));
RuntimeException runtimeException = expectThrows(RuntimeException.class, ctor::run);

View File

@ -26,13 +26,11 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.HttpChannel;
@ -41,7 +39,6 @@ import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelector;
@ -50,6 +47,7 @@ import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.nio.NioGroupFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -61,8 +59,6 @@ import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import static org.elasticsearch.common.settings.Setting.intSetting;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS;
@ -83,15 +79,9 @@ import static org.elasticsearch.http.nio.cors.NioCorsHandler.ANY_ORIGIN;
public class NioHttpServerTransport extends AbstractHttpServerTransport {
private static final Logger logger = LogManager.getLogger(NioHttpServerTransport.class);
public static final Setting<Integer> NIO_HTTP_ACCEPTOR_COUNT =
intSetting("http.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
public static final Setting<Integer> NIO_HTTP_WORKER_COUNT =
new Setting<>("http.nio.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope);
protected final PageCacheRecycler pageCacheRecycler;
protected final NioCorsConfig corsConfig;
private final NioGroupFactory nioGroupFactory;
protected final boolean tcpNoDelay;
protected final boolean tcpKeepAlive;
@ -99,14 +89,15 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
protected final int tcpSendBufferSize;
protected final int tcpReceiveBufferSize;
private NioGroup nioGroup;
private volatile NioGroup nioGroup;
private ChannelFactory<NioHttpServerChannel, NioHttpChannel> channelFactory;
public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher) {
Dispatcher dispatcher, NioGroupFactory nioGroupFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
this.pageCacheRecycler = pageCacheRecycler;
this.nioGroupFactory = nioGroupFactory;
ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
@ -134,11 +125,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
protected void doStart() {
boolean success = false;
try {
int acceptorCount = NIO_HTTP_ACCEPTOR_COUNT.get(settings);
int workerCount = NIO_HTTP_WORKER_COUNT.get(settings);
nioGroup = new NioGroup(daemonThreadFactory(this.settings, HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount,
daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount,
(s) -> new EventHandler(this::onNonChannelException, s));
nioGroup = nioGroupFactory.getHttpGroup();
channelFactory = channelFactory();
bindServer();
success = true;

View File

@ -0,0 +1,172 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelectorGroup;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.transport.TcpTransport;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
/**
* Creates and returns {@link NioSelectorGroup} instances. It will return a shared group for
* both {@link #getHttpGroup()} and {@link #getTransportGroup()} if
* {@link NioTransportPlugin#NIO_HTTP_WORKER_COUNT} is configured to be 0. If that setting is not 0, then it
* will return a different group in the {@link #getHttpGroup()} call.
*/
public final class NioGroupFactory {
private final Logger logger;
private final Settings settings;
private final int httpWorkerCount;
private RefCountedNioGroup refCountedGroup;
public NioGroupFactory(Settings settings, Logger logger) {
this.logger = logger;
this.settings = settings;
this.httpWorkerCount = NioTransportPlugin.NIO_HTTP_WORKER_COUNT.get(settings);
}
public Settings getSettings() {
return settings;
}
public synchronized NioGroup getTransportGroup() throws IOException {
return getGenericGroup();
}
public synchronized NioGroup getHttpGroup() throws IOException {
if (httpWorkerCount == 0) {
return getGenericGroup();
} else {
return new NioSelectorGroup(daemonThreadFactory(this.settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX),
httpWorkerCount, (s) -> new EventHandler(this::onException, s));
}
}
private NioGroup getGenericGroup() throws IOException {
if (refCountedGroup == null) {
ThreadFactory threadFactory = daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX);
NioSelectorGroup nioGroup = new NioSelectorGroup(threadFactory, NioTransportPlugin.NIO_WORKER_COUNT.get(settings),
(s) -> new EventHandler(this::onException, s));
this.refCountedGroup = new RefCountedNioGroup(nioGroup);
return new WrappedNioGroup(refCountedGroup);
} else {
refCountedGroup.incRef();
return new WrappedNioGroup(refCountedGroup);
}
}
private void onException(Exception exception) {
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()),
exception);
}
private static class RefCountedNioGroup extends AbstractRefCounted implements NioGroup {
public static final String NAME = "ref-counted-nio-group";
private final NioSelectorGroup nioGroup;
private RefCountedNioGroup(NioSelectorGroup nioGroup) {
super(NAME);
this.nioGroup = nioGroup;
}
@Override
protected void closeInternal() {
try {
nioGroup.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
throws IOException {
return nioGroup.bindServerChannel(address, factory);
}
@Override
public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
return nioGroup.openChannel(address, factory);
}
@Override
public void close() throws IOException {
throw new UnsupportedOperationException("Should not close. Instead use decRef call.");
}
}
/**
* Wraps the {@link RefCountedNioGroup}. Calls {@link RefCountedNioGroup#decRef()} on close. After close,
* this wrapped instance can no longer be used.
*/
private class WrappedNioGroup implements NioGroup {
private final RefCountedNioGroup refCountedNioGroup;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private WrappedNioGroup(RefCountedNioGroup refCountedNioGroup) {
this.refCountedNioGroup = refCountedNioGroup;
}
public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
throws IOException {
ensureOpen();
return refCountedNioGroup.bindServerChannel(address, factory);
}
public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
ensureOpen();
return refCountedNioGroup.openChannel(address, factory);
}
@Override
public void close() {
if (isOpen.compareAndSet(true, false)) {
refCountedNioGroup.decRef();
}
}
private void ensureOpen() {
if (isOpen.get() == false) {
throw new IllegalStateException("NioGroup is closed.");
}
}
}
}

View File

@ -27,14 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelector;
@ -54,25 +51,21 @@ import java.util.function.Function;
import java.util.function.Supplier;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
public class NioTransport extends TcpTransport {
private static final Logger logger = LogManager.getLogger(NioTransport.class);
public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope);
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final NioGroupFactory groupFactory;
private volatile NioGroup nioGroup;
private volatile Function<DiscoveryNode, TcpChannelFactory> clientChannelFactory;
protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
CircuitBreakerService circuitBreakerService, NioGroupFactory groupFactory) {
super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
this.groupFactory = groupFactory;
}
@Override
@ -91,8 +84,7 @@ public class NioTransport extends TcpTransport {
protected void doStart() {
boolean success = false;
try {
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX),
NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s));
nioGroup = groupFactory.getTransportGroup();
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = clientChannelFactoryFunction(clientProfileSettings);

View File

@ -19,6 +19,9 @@
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
@ -26,6 +29,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.nio.NioHttpServerTransport;
@ -41,17 +45,29 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.common.settings.Setting.intSetting;
public class NioTransportPlugin extends Plugin implements NetworkPlugin {
public static final String NIO_TRANSPORT_NAME = "nio-transport";
public static final String NIO_HTTP_TRANSPORT_NAME = "nio-http-transport";
private static final Logger logger = LogManager.getLogger(NioTransportPlugin.class);
public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope);
public static final Setting<Integer> NIO_HTTP_WORKER_COUNT =
intSetting("http.nio.worker_count", 0, 0, Setting.Property.NodeScope);
private final SetOnce<NioGroupFactory> groupFactory = new SetOnce<>();
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
NioHttpServerTransport.NIO_HTTP_ACCEPTOR_COUNT,
NioHttpServerTransport.NIO_HTTP_WORKER_COUNT,
NioTransport.NIO_WORKER_COUNT
NIO_HTTP_WORKER_COUNT,
NIO_WORKER_COUNT
);
}
@ -61,7 +77,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin {
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NIO_TRANSPORT_NAME,
() -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry,
circuitBreakerService));
circuitBreakerService, getNioGroupFactory(settings)));
}
@Override
@ -73,6 +89,17 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin {
HttpServerTransport.Dispatcher dispatcher) {
return Collections.singletonMap(NIO_HTTP_TRANSPORT_NAME,
() -> new NioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry,
dispatcher));
dispatcher, getNioGroupFactory(settings)));
}
private synchronized NioGroupFactory getNioGroupFactory(Settings settings) {
NioGroupFactory nioGroupFactory = groupFactory.get();
if (nioGroupFactory != null) {
assert nioGroupFactory.getSettings().equals(settings) : "Different settings than originally provided";
return nioGroupFactory;
} else {
groupFactory.set(new NioGroupFactory(settings, logger));
return groupFactory.get();
}
}
}

View File

@ -20,10 +20,8 @@ package org.elasticsearch;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.nio.NioHttpServerTransport;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.nio.NioTransport;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import java.util.Collection;
@ -46,8 +44,8 @@ public abstract class NioIntegTestCase extends ESIntegTestCase {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
// randomize nio settings
if (randomBoolean()) {
builder.put(NioTransport.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1);
builder.put(NioHttpServerTransport.NIO_HTTP_WORKER_COUNT.getKey(), random().nextInt(3) + 1);
builder.put(NioTransportPlugin.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1);
builder.put(NioTransportPlugin.NIO_HTTP_WORKER_COUNT.getKey(), random().nextInt(3) + 1);
}
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NioTransportPlugin.NIO_TRANSPORT_NAME);
builder.put(NetworkModule.HTTP_TYPE_KEY, NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME);

View File

@ -45,7 +45,7 @@ import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelectorGroup;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
@ -88,11 +88,11 @@ class NioHttpClient implements Closeable {
private static final Logger logger = LogManager.getLogger(NioHttpClient.class);
private final NioGroup nioGroup;
private final NioSelectorGroup nioGroup;
NioHttpClient() {
try {
nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "nio-http-client"), 1,
nioGroup = new NioSelectorGroup(daemonThreadFactory(Settings.EMPTY, "nio-http-client"), 1,
(s) -> new EventHandler(this::onException, s));
} catch (IOException e) {
throw new UncheckedIOException(e);

View File

@ -55,6 +55,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.nio.NioGroupFactory;
import org.junit.After;
import org.junit.Before;
@ -202,7 +203,7 @@ public class NioHttpServerTransportTests extends ESTestCase {
}
};
try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler, threadPool,
xContentRegistry(), dispatcher)) {
xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
try (NioHttpClient client = new NioHttpClient()) {
@ -235,12 +236,12 @@ public class NioHttpServerTransportTests extends ESTestCase {
public void testBindUnavailableAddress() {
try (NioHttpServerTransport transport = new NioHttpServerTransport(Settings.EMPTY, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), new NullDispatcher())) {
threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settings.EMPTY, logger))) {
transport.start();
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build();
try (NioHttpServerTransport otherTransport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), new NullDispatcher())) {
threadPool, xContentRegistry(), new NullDispatcher(), new NioGroupFactory(Settings.EMPTY, logger))) {
BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start());
assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage());
}
@ -284,7 +285,7 @@ public class NioHttpServerTransportTests extends ESTestCase {
}
try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), dispatcher)) {
threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.function.Consumer;
public class NioGroupFactoryTests extends ESTestCase {
public void testSharedGroupStillWorksWhenOneInstanceClosed() throws IOException {
NioGroupFactory groupFactory = new NioGroupFactory(Settings.EMPTY, logger);
InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
NioGroup httpGroup = groupFactory.getHttpGroup();
try {
NioGroup transportGroup = groupFactory.getTransportGroup();
transportGroup.close();
expectThrows(IllegalStateException.class, () -> transportGroup.bindServerChannel(inetSocketAddress, new BindingFactory()));
httpGroup.bindServerChannel(inetSocketAddress, new BindingFactory());
} finally {
httpGroup.close();
}
expectThrows(IllegalStateException.class, () -> httpGroup.bindServerChannel(inetSocketAddress, new BindingFactory()));
}
private static class BindingFactory extends ChannelFactory<NioServerSocketChannel, NioSocketChannel> {
private BindingFactory() {
super(new ChannelFactory.RawChannelFactory(false, false, false, -1, -1));
}
@Override
public NioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
throw new IOException("boom");
}
@Override
public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel);
Consumer<Exception> exceptionHandler = (e) -> {};
Consumer<NioSocketChannel> acceptor = (c) -> {};
ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler);
nioChannel.setContext(context);
return nioChannel;
}
}
}

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.NioIntegTestCase;
import org.elasticsearch.Version;
@ -54,6 +56,7 @@ import static org.hamcrest.Matchers.is;
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class NioTransportIT extends NioIntegTestCase {
// static so we can use it in anonymous classes
private static String channelProfileName = null;
@ -86,6 +89,8 @@ public class NioTransportIT extends NioIntegTestCase {
public static final class ExceptionThrowingNioTransport extends NioTransport {
private static final Logger logger = LogManager.getLogger(ExceptionThrowingNioTransport.class);
public static class TestPlugin extends Plugin implements NetworkPlugin {
@Override
@ -103,7 +108,8 @@ public class NioTransportIT extends NioIntegTestCase {
ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService,
new NioGroupFactory(settings, logger));
}
@Override

View File

@ -52,12 +52,12 @@ import static org.hamcrest.Matchers.instanceOf;
public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
public static MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
public MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
NetworkService networkService = new NetworkService(Collections.emptyList());
Transport transport = new NioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings),
namedWriteableRegistry, new NoneCircuitBreakerService()) {
namedWriteableRegistry, new NoneCircuitBreakerService(), new NioGroupFactory(settings, logger)) {
@Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,

View File

@ -278,16 +278,6 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e);
}
/**
* Exception handler for exceptions that are not associated with a specific channel.
*
* @param exception the exception
*/
protected void onNonChannelException(Exception exception) {
String threadName = Thread.currentThread().getName();
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception);
}
protected void serverAcceptedChannel(HttpChannel httpChannel) {
boolean addedOnThisCall = httpChannels.add(httpChannel);
assert addedOnThisCall : "Channel should only be added to http channel set once";

View File

@ -29,8 +29,6 @@ public interface HttpServerTransport extends LifecycleComponent {
String HTTP_SERVER_WORKER_THREAD_NAME_PREFIX = "http_server_worker";
String HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX = "http_server_acceptor";
BoundTransportAddress boundAddress();
HttpInfo info();

View File

@ -660,16 +660,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", channel), e);
}
/**
* Exception handler for exceptions that are not associated with a specific channel.
*
* @param exception the exception
*/
protected void onNonChannelException(Exception exception) {
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()),
exception);
}
protected void serverAcceptedChannel(TcpChannel channel) {
boolean addedOnThisCall = acceptedChannels.add(channel);
assert addedOnThisCall : "Channel should only be added to accepted channel set once";

View File

@ -38,7 +38,6 @@ public enum Transports {
final String threadName = t.getName();
for (String s : Arrays.asList(
HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
HttpServerTransport.HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
if (threadName.contains(s)) {

View File

@ -37,7 +37,7 @@ import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.BytesWriteHandler;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelectorGroup;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
@ -69,7 +69,7 @@ public class MockNioTransport extends TcpTransport {
private static final Logger logger = LogManager.getLogger(MockNioTransport.class);
private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private volatile NioGroup nioGroup;
private volatile NioSelectorGroup nioGroup;
private volatile MockTcpChannelFactory clientChannelFactory;
public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
@ -94,7 +94,7 @@ public class MockNioTransport extends TcpTransport {
protected void doStart() {
boolean success = false;
try {
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestingSocketEventHandler(this::onNonChannelException, s));
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
@ -160,6 +160,11 @@ public class MockNioTransport extends TcpTransport {
return builder.build();
}
private void onNonChannelException(Exception exception) {
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()),
exception);
}
private void exceptionCaught(NioSocketChannel channel, Exception exception) {
onException((TcpChannel) channel, exception);
}

View File

@ -76,6 +76,7 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.nio.NioGroupFactory;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
@ -286,6 +287,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
private final SetOnce<SecurityActionFilter> securityActionFilter = new SetOnce<>();
private final SetOnce<SecurityIndexManager> securityIndex = new SetOnce<>();
private final SetOnce<IndexAuditTrail> indexAuditTrail = new SetOnce<>();
private final SetOnce<NioGroupFactory> groupFactory = new SetOnce<>();
private final List<BootstrapCheck> bootstrapChecks;
private final List<SecurityExtension> securityExtensions = new ArrayList<>();
@ -943,11 +945,12 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
return Collections.emptyMap();
}
IPFilter ipFilter = this.ipFilter.get();
Map<String, Supplier<Transport>> transports = new HashMap<>();
transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService()));
transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, Version.CURRENT, threadPool, networkService,
pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter, getSslService(), getNioGroupFactory(settings)));
return Collections.unmodifiableMap(transports);
}
@ -967,7 +970,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
httpTransports.put(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings, networkService, bigArrays,
ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher));
httpTransports.put(SecurityField.NIO, () -> new SecurityNioHttpServerTransport(settings, networkService, bigArrays,
pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService()));
pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService(), getNioGroupFactory(settings)));
return httpTransports;
}
@ -1122,4 +1125,14 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
public void reloadSPI(ClassLoader loader) {
securityExtensions.addAll(SecurityExtension.loadExtensions(loader));
}
private synchronized NioGroupFactory getNioGroupFactory(Settings settings) {
if (groupFactory.get() != null) {
assert groupFactory.get().getSettings().equals(settings) : "Different settings than originally provided";
return groupFactory.get();
} else {
groupFactory.set(new NioGroupFactory(settings, logger));
return groupFactory.get();
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.nio.NioGroupFactory;
import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.security.transport.SecurityHttpExceptionHandler;
@ -54,8 +55,8 @@ public class SecurityNioHttpServerTransport extends NioHttpServerTransport {
public SecurityNioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, IPFilter ipFilter,
SSLService sslService) {
super(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, dispatcher);
SSLService sslService, NioGroupFactory nioGroupFactory) {
super(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry, dispatcher, nioGroupFactory);
this.securityExceptionHandler = new SecurityHttpExceptionHandler(logger, lifecycle, (c, e) -> super.onException(c, e));
this.ipFilter = ipFilter;
this.nioIpFilter = new NioIPFilter(ipFilter, IPFilter.HTTP_PROFILE_NAME);

View File

@ -29,6 +29,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.nio.NioGroupFactory;
import org.elasticsearch.transport.nio.NioTcpChannel;
import org.elasticsearch.transport.nio.NioTcpServerChannel;
import org.elasticsearch.transport.nio.NioTransport;
@ -76,8 +77,9 @@ public class SecurityNioTransport extends NioTransport {
public SecurityNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator,
SSLService sslService) {
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
SSLService sslService, NioGroupFactory groupFactory) {
super(settings, version, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService,
groupFactory);
this.authenticator = authenticator;
this.sslService = sslService;
this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings);

View File

@ -17,6 +17,7 @@ import org.elasticsearch.http.nio.NioHttpChannel;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.nio.NioGroupFactory;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ssl.SSLClientAuth;
import org.elasticsearch.xpack.core.ssl.SSLService;
@ -46,6 +47,7 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
private SSLService sslService;
private Environment env;
private InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
private NioGroupFactory nioGroupFactory;
@Before
public void createSSLService() {
@ -67,10 +69,11 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
Settings settings = Settings.builder()
.put(env.settings())
.put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true).build();
nioGroupFactory = new NioGroupFactory(settings, logger);
sslService = new SSLService(settings, env);
SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class),
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService);
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory);
SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory();
SocketChannel socketChannel = mock(SocketChannel.class);
when(socketChannel.getRemoteAddress()).thenReturn(address);
@ -88,9 +91,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
.put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true)
.put("xpack.security.http.ssl.client_authentication", value).build();
sslService = new SSLService(settings, env);
nioGroupFactory = new NioGroupFactory(settings, logger);
SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class),
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService);
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory);
SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory();
SocketChannel socketChannel = mock(SocketChannel.class);
@ -107,10 +111,11 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
.put(env.settings())
.put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true)
.put("xpack.security.http.ssl.client_authentication", value).build();
nioGroupFactory = new NioGroupFactory(settings, logger);
sslService = new SSLService(settings, env);
SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class),
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService);
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory);
SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory();
SocketChannel socketChannel = mock(SocketChannel.class);
@ -128,9 +133,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
.put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true)
.put("xpack.security.http.ssl.client_authentication", value).build();
sslService = new SSLService(settings, env);
nioGroupFactory = new NioGroupFactory(settings, logger);
SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class),
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService);
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory);
SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory();
SocketChannel socketChannel = mock(SocketChannel.class);
@ -146,9 +152,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
.put(env.settings())
.put(XPackSettings.HTTP_SSL_ENABLED.getKey(), true).build();
sslService = new SSLService(settings, env);
nioGroupFactory = new NioGroupFactory(settings, logger);
SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class),
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService);
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory);
SecurityNioHttpServerTransport.SecurityHttpChannelFactory factory = transport.channelFactory();
SocketChannel socketChannel = mock(SocketChannel.class);
when(socketChannel.getRemoteAddress()).thenReturn(address);
@ -161,9 +168,10 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
.put("xpack.security.http.ssl.supported_protocols", "TLSv1.2")
.build();
sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings));
nioGroupFactory = new NioGroupFactory(settings, logger);
transport = new SecurityNioHttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class),
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService);
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory);
factory = transport.channelFactory();
channel = factory.createChannel(mock(NioSelector.class), socketChannel);
SSLEngine customEngine = SSLEngineUtils.getSSLEngine(channel);
@ -183,11 +191,12 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
.build();
env = TestEnvironment.newEnvironment(settings);
sslService = new SSLService(settings, env);
nioGroupFactory = new NioGroupFactory(settings, logger);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> new SecurityNioHttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class),
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService));
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory));
assertThat(e.getMessage(), containsString("key must be provided"));
}
@ -202,8 +211,9 @@ public class SecurityNioHttpServerTransportTests extends ESTestCase {
.build();
env = TestEnvironment.newEnvironment(settings);
sslService = new SSLService(settings, env);
nioGroupFactory = new NioGroupFactory(settings, logger);
SecurityNioHttpServerTransport transport = new SecurityNioHttpServerTransport(settings,
new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(PageCacheRecycler.class), mock(ThreadPool.class),
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService);
xContentRegistry(), new NullDispatcher(), mock(IPFilter.class), sslService, nioGroupFactory);
}
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.nio.NioGroupFactory;
import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase;
import java.util.Collections;
@ -34,7 +35,8 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans
.put(settings)
.put("xpack.security.transport.ssl.enabled", true).build();
Transport transport = new SecurityNioTransport(settings1, version, threadPool, networkService, new MockPageCacheRecycler(settings),
namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1)) {
namedWriteableRegistry, new NoneCircuitBreakerService(), null, createSSLService(settings1),
new NioGroupFactory(settings, logger)) {
@Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,