From 67e73b4df481f3902a4aeee78d34532297ff7b6b Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 6 Jun 2018 11:59:54 -0600 Subject: [PATCH] Combine accepting selector and socket selector (#31115) This is related to #27260. This commit combines the AcceptingSelector and SocketSelector classes into a single NioSelector. This change allows the same selector to handle both server and socket channels. This is valuable as we do not necessarily want a dedicated thread running for accepting channels. With this change, this commit removes the configuration for dedicated accepting selectors for the normal transport class. The accepting workload for new node connections is likely low, meaning that there is no need to dedicate a thread to this process. --- .../elasticsearch/nio/AcceptingSelector.java | 98 ---- .../nio/AcceptorEventHandler.java | 81 ---- .../nio/BytesChannelContext.java | 2 +- .../org/elasticsearch/nio/ChannelContext.java | 2 +- .../org/elasticsearch/nio/ChannelFactory.java | 24 +- .../org/elasticsearch/nio/ESSelector.java | 248 ---------- .../org/elasticsearch/nio/EventHandler.java | 152 ++++++- .../org/elasticsearch/nio/NioChannel.java | 3 +- .../java/org/elasticsearch/nio/NioGroup.java | 109 +++-- .../org/elasticsearch/nio/NioSelector.java | 428 ++++++++++++++++++ .../elasticsearch/nio/RoundRobinSupplier.java | 28 +- .../nio/ServerChannelContext.java | 8 +- .../nio/SocketChannelContext.java | 8 +- .../elasticsearch/nio/SocketEventHandler.java | 150 ------ .../org/elasticsearch/nio/SocketSelector.java | 224 --------- .../nio/AcceptingSelectorTests.java | 129 ------ .../nio/AcceptorEventHandlerTests.java | 123 ----- .../nio/BytesChannelContextTests.java | 4 +- .../nio/ChannelContextTests.java | 2 +- .../nio/ChannelFactoryTests.java | 16 +- .../elasticsearch/nio/ESSelectorTests.java | 114 ----- ...ndlerTests.java => EventHandlerTests.java} | 107 ++++- .../org/elasticsearch/nio/NioGroupTests.java | 9 +- ...lectorTests.java => NioSelectorTests.java} | 200 ++++++-- .../nio/SocketChannelContextTests.java | 6 +- .../http/nio/NioHttpServerTransport.java | 13 +- .../transport/nio/NioTransport.java | 28 +- .../transport/nio/NioTransportPlugin.java | 3 +- .../nio/TcpNioServerSocketChannel.java | 2 - .../transport/nio/TcpNioSocketChannel.java | 1 - .../elasticsearch/transport/Transports.java | 4 +- .../transport/nio/MockNioTransport.java | 26 +- .../nio/TestingSocketEventHandler.java | 10 +- .../transport/nio/SSLChannelContext.java | 6 +- .../transport/nio/SecurityNioTransport.java | 7 +- .../transport/nio/SSLChannelContextTests.java | 6 +- 36 files changed, 988 insertions(+), 1393 deletions(-) delete mode 100644 libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptingSelector.java delete mode 100644 libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java delete mode 100644 libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ESSelector.java create mode 100644 libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSelector.java delete mode 100644 libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java delete mode 100644 libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java delete mode 100644 libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptingSelectorTests.java delete mode 100644 libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java delete mode 100644 libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java rename libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/{SocketEventHandlerTests.java => EventHandlerTests.java} (66%) rename libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/{SocketSelectorTests.java => NioSelectorTests.java} (61%) diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptingSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptingSelector.java deleted file mode 100644 index da64020daa8..00000000000 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptingSelector.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.nio; - -import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.stream.Collectors; - -/** - * Selector implementation that handles {@link NioServerSocketChannel}. It's main piece of functionality is - * accepting new channels. - */ -public class AcceptingSelector extends ESSelector { - - private final AcceptorEventHandler eventHandler; - private final ConcurrentLinkedQueue newChannels = new ConcurrentLinkedQueue<>(); - - public AcceptingSelector(AcceptorEventHandler eventHandler) throws IOException { - super(eventHandler); - this.eventHandler = eventHandler; - } - - public AcceptingSelector(AcceptorEventHandler eventHandler, Selector selector) throws IOException { - super(eventHandler, selector); - this.eventHandler = eventHandler; - } - - @Override - void processKey(SelectionKey selectionKey) { - ServerChannelContext channelContext = (ServerChannelContext) selectionKey.attachment(); - if (selectionKey.isAcceptable()) { - try { - eventHandler.acceptChannel(channelContext); - } catch (IOException e) { - eventHandler.acceptException(channelContext, e); - } - } - } - - @Override - void preSelect() { - setUpNewServerChannels(); - } - - @Override - void cleanup() { - channelsToClose.addAll(newChannels.stream().map(NioServerSocketChannel::getContext).collect(Collectors.toList())); - } - - /** - * Schedules a NioServerSocketChannel to be registered with this selector. The channel will by queued and - * eventually registered next time through the event loop. - * - * @param serverSocketChannel the channel to register - */ - public void scheduleForRegistration(NioServerSocketChannel serverSocketChannel) { - newChannels.add(serverSocketChannel); - ensureSelectorOpenForEnqueuing(newChannels, serverSocketChannel); - wakeup(); - } - - private void setUpNewServerChannels() { - NioServerSocketChannel newChannel; - while ((newChannel = this.newChannels.poll()) != null) { - ServerChannelContext context = newChannel.getContext(); - assert context.getSelector() == this : "The channel must be registered with the selector with which it was created"; - try { - if (context.isOpen()) { - eventHandler.handleRegistration(context); - } else { - eventHandler.registrationException(context, new ClosedChannelException()); - } - } catch (Exception e) { - eventHandler.registrationException(context, e); - } - } - } -} diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java deleted file mode 100644 index f3aab9c9be1..00000000000 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.nio; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.util.function.Consumer; -import java.util.function.Supplier; - -/** - * Event handler designed to handle events from server sockets - */ -public class AcceptorEventHandler extends EventHandler { - - private final Supplier selectorSupplier; - - public AcceptorEventHandler(Supplier selectorSupplier, Consumer exceptionHandler) { - super(exceptionHandler); - this.selectorSupplier = selectorSupplier; - } - - /** - * This method is called when a NioServerSocketChannel is being registered with the selector. It should - * only be called once per channel. - * - * @param context that was registered - */ - protected void handleRegistration(ServerChannelContext context) throws IOException { - context.register(); - SelectionKey selectionKey = context.getSelectionKey(); - selectionKey.attach(context); - SelectionKeyUtils.setAcceptInterested(selectionKey); - } - - /** - * This method is called when an attempt to register a server channel throws an exception. - * - * @param context that was registered - * @param exception that occurred - */ - protected void registrationException(ServerChannelContext context, Exception exception) { - context.handleException(exception); - } - - /** - * This method is called when a server channel signals it is ready to accept a connection. All of the - * accept logic should occur in this call. - * - * @param context that can accept a connection - */ - protected void acceptChannel(ServerChannelContext context) throws IOException { - context.acceptChannels(selectorSupplier); - } - - /** - * This method is called when an attempt to accept a connection throws an exception. - * - * @param context that accepting a connection - * @param exception that occurred - */ - protected void acceptException(ServerChannelContext context, Exception exception) { - context.handleException(exception); - } -} diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java index ef1e188a22e..a82d381951b 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java @@ -24,7 +24,7 @@ import java.util.function.Consumer; public class BytesChannelContext extends SocketChannelContext { - public BytesChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer exceptionHandler, + public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, ReadWriteHandler handler, InboundChannelBuffer channelBuffer) { super(channel, selector, exceptionHandler, handler, channelBuffer); } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java index 93930bbabf0..e3702c2880a 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java @@ -105,7 +105,7 @@ public abstract class ChannelContext supplier) throws IOException { + public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier supplier) throws IOException { SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); - SocketSelector selector = supplier.get(); + NioSelector selector = supplier.get(); Socket channel = internalCreateChannel(selector, rawChannel); scheduleChannel(channel, selector); return channel; } - public Socket acceptNioChannel(ServerChannelContext serverContext, Supplier supplier) throws IOException { + public Socket acceptNioChannel(ServerChannelContext serverContext, Supplier supplier) throws IOException { SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverContext); // Null is returned if there are no pending sockets to accept if (rawChannel == null) { return null; } else { - SocketSelector selector = supplier.get(); + NioSelector selector = supplier.get(); Socket channel = internalCreateChannel(selector, rawChannel); scheduleChannel(channel, selector); return channel; } } - public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier supplier) throws IOException { + public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier supplier) throws IOException { ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address); - AcceptingSelector selector = supplier.get(); + NioSelector selector = supplier.get(); ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel); scheduleServerChannel(serverChannel, selector); return serverChannel; @@ -81,7 +81,7 @@ public abstract class ChannelFactory - * Children of this class should implement the specific {@link #processKey(SelectionKey)}, - * {@link #preSelect()}, and {@link #cleanup()} functionality. - */ -public abstract class ESSelector implements Closeable { - - final Selector selector; - final ConcurrentLinkedQueue> channelsToClose = new ConcurrentLinkedQueue<>(); - - private final EventHandler eventHandler; - private final ReentrantLock runLock = new ReentrantLock(); - private final CountDownLatch exitedLoop = new CountDownLatch(1); - private final AtomicBoolean isClosed = new AtomicBoolean(false); - private final CompletableFuture isRunningFuture = new CompletableFuture<>(); - private volatile Thread thread; - - ESSelector(EventHandler eventHandler) throws IOException { - this(eventHandler, Selector.open()); - } - - ESSelector(EventHandler eventHandler, Selector selector) { - this.eventHandler = eventHandler; - this.selector = selector; - } - - /** - * Starts this selector. The selector will run until {@link #close()} is called. - */ - public void runLoop() { - if (runLock.tryLock()) { - isRunningFuture.complete(null); - try { - setThread(); - while (isOpen()) { - singleLoop(); - } - } finally { - try { - cleanupAndCloseChannels(); - } finally { - try { - selector.close(); - } catch (IOException e) { - eventHandler.selectorException(e); - } finally { - runLock.unlock(); - exitedLoop.countDown(); - } - } - } - } else { - throw new IllegalStateException("selector is already running"); - } - } - - void singleLoop() { - try { - closePendingChannels(); - preSelect(); - - int ready = selector.select(300); - if (ready > 0) { - Set selectionKeys = selector.selectedKeys(); - Iterator keyIterator = selectionKeys.iterator(); - while (keyIterator.hasNext()) { - SelectionKey sk = keyIterator.next(); - keyIterator.remove(); - if (sk.isValid()) { - try { - processKey(sk); - } catch (CancelledKeyException cke) { - eventHandler.genericChannelException((ChannelContext) sk.attachment(), cke); - } - } else { - eventHandler.genericChannelException((ChannelContext) sk.attachment(), new CancelledKeyException()); - } - } - } - } catch (ClosedSelectorException e) { - if (isOpen()) { - throw e; - } - } catch (IOException e) { - eventHandler.selectorException(e); - } catch (Exception e) { - eventHandler.uncaughtException(e); - } - } - - void cleanupAndCloseChannels() { - cleanup(); - channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext) sk.attachment()).collect(Collectors.toList())); - closePendingChannels(); - } - - /** - * Called by the base {@link ESSelector} class when there is a {@link SelectionKey} to be handled. - * - * @param selectionKey the key to be handled - * @throws CancelledKeyException thrown when the key has already been cancelled - */ - abstract void processKey(SelectionKey selectionKey) throws CancelledKeyException; - - /** - * Called immediately prior to a raw {@link Selector#select()} call. Should be used to implement - * channel registration, handling queued writes, and other work that is not specifically processing - * a selection key. - */ - abstract void preSelect(); - - /** - * Called once as the selector is being closed. - */ - abstract void cleanup(); - - void setThread() { - thread = Thread.currentThread(); - } - - public boolean isOnCurrentThread() { - return Thread.currentThread() == thread; - } - - public void assertOnSelectorThread() { - assert isOnCurrentThread() : "Must be on selector thread to perform this operation. Currently on thread [" - + Thread.currentThread().getName() + "]."; - } - - void wakeup() { - // TODO: Do we need the wakeup optimizations that some other libraries use? - selector.wakeup(); - } - - @Override - public void close() throws IOException { - if (isClosed.compareAndSet(false, true)) { - wakeup(); - if (isRunning()) { - try { - exitedLoop.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Thread was interrupted while waiting for selector to close", e); - } - } else if (selector.isOpen()) { - selector.close(); - } - } - } - - public void queueChannelClose(NioChannel channel) { - ChannelContext context = channel.getContext(); - assert context.getSelector() == this : "Must schedule a channel for closure with its selector"; - channelsToClose.offer(context); - ensureSelectorOpenForEnqueuing(channelsToClose, context); - wakeup(); - } - - public Selector rawSelector() { - return selector; - } - - public boolean isOpen() { - return isClosed.get() == false; - } - - public boolean isRunning() { - return runLock.isLocked(); - } - - public Future isRunningFuture() { - return isRunningFuture; - } - - /** - * This is a convenience method to be called after some object (normally channels) are enqueued with this - * selector. This method will check if the selector is still open. If it is open, normal operation can - * proceed. - * - * If the selector is closed, then we attempt to remove the object from the queue. If the removal - * succeeds then we throw an {@link IllegalStateException} indicating that normal operation failed. If - * the object cannot be removed from the queue, then the object has already been handled by the selector - * and operation can proceed normally. - * - * If this method is called from the selector thread, we will not throw an exception as the selector - * thread can manipulate its queues internally even if it is no longer open. - * - * @param queue the queue to which the object was added - * @param objectAdded the objected added - * @param the object type - */ - void ensureSelectorOpenForEnqueuing(ConcurrentLinkedQueue queue, O objectAdded) { - if (isOpen() == false && isOnCurrentThread() == false) { - if (queue.remove(objectAdded)) { - throw new IllegalStateException("selector is already closed"); - } - } - } - - private void closePendingChannels() { - ChannelContext channelContext; - while ((channelContext = channelsToClose.poll()) != null) { - eventHandler.handleClose(channelContext); - } - } -} diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java index cb4d43af4fd..3c52423c7af 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -20,15 +20,163 @@ package org.elasticsearch.nio; import java.io.IOException; +import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.function.Consumer; +import java.util.function.Supplier; -public abstract class EventHandler { +public class EventHandler { protected final Consumer exceptionHandler; + private final Supplier selectorSupplier; - protected EventHandler(Consumer exceptionHandler) { + public EventHandler(Consumer exceptionHandler, Supplier selectorSupplier) { this.exceptionHandler = exceptionHandler; + this.selectorSupplier = selectorSupplier; + } + + /** + * This method is called when a server channel signals it is ready to accept a connection. All of the + * accept logic should occur in this call. + * + * @param context that can accept a connection + */ + protected void acceptChannel(ServerChannelContext context) throws IOException { + context.acceptChannels(selectorSupplier); + } + + /** + * This method is called when an attempt to accept a connection throws an exception. + * + * @param context that accepting a connection + * @param exception that occurred + */ + protected void acceptException(ServerChannelContext context, Exception exception) { + context.handleException(exception); + } + + /** + * This method is called when a NioChannel is being registered with the selector. It should + * only be called once per channel. + * + * @param context that was registered + */ + protected void handleRegistration(ChannelContext context) throws IOException { + context.register(); + SelectionKey selectionKey = context.getSelectionKey(); + selectionKey.attach(context); + if (context instanceof SocketChannelContext) { + if (((SocketChannelContext) context).readyForFlush()) { + SelectionKeyUtils.setConnectReadAndWriteInterested(context.getSelectionKey()); + } else { + SelectionKeyUtils.setConnectAndReadInterested(context.getSelectionKey()); + } + } else { + assert context instanceof ServerChannelContext : "If not SocketChannelContext the context must be a ServerChannelContext"; + SelectionKeyUtils.setAcceptInterested(context.getSelectionKey()); + } + } + + /** + * This method is called when an attempt to register a channel throws an exception. + * + * @param context that was registered + * @param exception that occurred + */ + protected void registrationException(ChannelContext context, Exception exception) { + context.handleException(exception); + } + + /** + * This method is called when a NioSocketChannel has just been accepted or if it has receive an + * OP_CONNECT event. + * + * @param context that was registered + */ + protected void handleConnect(SocketChannelContext context) throws IOException { + if (context.connect()) { + SelectionKeyUtils.removeConnectInterested(context.getSelectionKey()); + } + } + + /** + * This method is called when an attempt to connect a channel throws an exception. + * + * @param context that was connecting + * @param exception that occurred + */ + protected void connectException(SocketChannelContext context, Exception exception) { + context.handleException(exception); + } + + /** + * This method is called when a channel signals it is ready for be read. All of the read logic should + * occur in this call. + * + * @param context that can be read + */ + protected void handleRead(SocketChannelContext context) throws IOException { + context.read(); + } + + /** + * This method is called when an attempt to read from a channel throws an exception. + * + * @param context that was being read + * @param exception that occurred + */ + protected void readException(SocketChannelContext context, Exception exception) { + context.handleException(exception); + } + + /** + * This method is called when a channel signals it is ready to receive writes. All of the write logic + * should occur in this call. + * + * @param context that can be written to + */ + protected void handleWrite(SocketChannelContext context) throws IOException { + context.flushChannel(); + } + + /** + * This method is called when an attempt to write to a channel throws an exception. + * + * @param context that was being written to + * @param exception that occurred + */ + protected void writeException(SocketChannelContext context, Exception exception) { + context.handleException(exception); + } + + /** + * This method is called when a listener attached to a channel operation throws an exception. + * + * @param exception that occurred + */ + protected void listenerException(Exception exception) { + exceptionHandler.accept(exception); + } + + /** + * This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a + * channel. + * + * @param context that was handled + */ + protected void postHandling(SocketChannelContext context) { + if (context.selectorShouldClose()) { + handleClose(context); + } else { + SelectionKey selectionKey = context.getSelectionKey(); + boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey); + boolean pendingWrites = context.readyForFlush(); + if (currentlyWriteInterested == false && pendingWrites) { + SelectionKeyUtils.setWriteInterested(selectionKey); + } else if (currentlyWriteInterested && pendingWrites == false) { + SelectionKeyUtils.removeWriteInterested(selectionKey); + } + } } /** diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java index ea633bd3276..8262d9c87e3 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java @@ -22,10 +22,11 @@ package org.elasticsearch.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.NetworkChannel; +import java.nio.channels.SocketChannel; import java.util.function.BiConsumer; /** - * This is a basic channel abstraction used by the {@link ESSelector}. + * This is a basic channel abstraction used by the {@link NioSelector}. *

* A channel is open once it is constructed. The channel remains open and {@link #isOpen()} will return * true until the channel is explicitly closed. diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java index 3f2fd44259c..fe1bc1cf404 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java @@ -35,10 +35,9 @@ import java.util.stream.Stream; /** * The NioGroup is a group of selectors for interfacing with java nio. When it is started it will create the - * configured number of socket and acceptor selectors. Each selector will be running in a dedicated thread. - * Server connections can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} - * method. Client connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} - * method. + * configured number of selectors. Each selector will be running in a dedicated thread. Server connections + * can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} method. Client + * connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} method. *

* The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method * when the channel is created. This is what allows an NioGroup to support different channel types. @@ -46,35 +45,75 @@ import java.util.stream.Stream; public class NioGroup implements AutoCloseable { - private final ArrayList acceptors; - private final RoundRobinSupplier acceptorSupplier; + private final List dedicatedAcceptors; + private final RoundRobinSupplier acceptorSupplier; - private final ArrayList socketSelectors; - private final RoundRobinSupplier socketSelectorSupplier; + private final List selectors; + private final RoundRobinSupplier selectorSupplier; private final AtomicBoolean isOpen = new AtomicBoolean(true); - public NioGroup(ThreadFactory acceptorThreadFactory, int acceptorCount, - Function, AcceptorEventHandler> acceptorEventHandlerFunction, - ThreadFactory socketSelectorThreadFactory, int socketSelectorCount, - Supplier socketEventHandlerFunction) throws IOException { - acceptors = new ArrayList<>(acceptorCount); - socketSelectors = new ArrayList<>(socketSelectorCount); + /** + * This will create an NioGroup with no dedicated acceptors. All server channels will be handled by the + * same selectors that are handling child channels. + * + * @param threadFactory factory to create selector threads + * @param selectorCount the number of selectors to be created + * @param eventHandlerFunction function for creating event handlers + * @throws IOException occurs if there is a problem while opening a java.nio.Selector + */ + public NioGroup(ThreadFactory threadFactory, int selectorCount, Function, EventHandler> eventHandlerFunction) + throws IOException { + this(null, 0, threadFactory, selectorCount, eventHandlerFunction); + } + + /** + * This will create an NioGroup with dedicated acceptors. All server channels will be handled by a group + * of selectors dedicated to accepting channels. These accepted channels will be handed off the + * non-server selectors. + * + * @param acceptorThreadFactory factory to create acceptor selector threads + * @param dedicatedAcceptorCount the number of dedicated acceptor selectors to be created + * @param selectorThreadFactory factory to create non-acceptor selector threads + * @param selectorCount the number of non-acceptor selectors to be created + * @param eventHandlerFunction function for creating event handlers + * @throws IOException occurs if there is a problem while opening a java.nio.Selector + */ + public NioGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount, + Function, EventHandler> eventHandlerFunction) throws IOException { + dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); + selectors = new ArrayList<>(selectorCount); try { - for (int i = 0; i < socketSelectorCount; ++i) { - SocketSelector selector = new SocketSelector(socketEventHandlerFunction.get()); - socketSelectors.add(selector); + List> suppliersToSet = new ArrayList<>(selectorCount); + for (int i = 0; i < selectorCount; ++i) { + RoundRobinSupplier supplier = new RoundRobinSupplier<>(); + suppliersToSet.add(supplier); + NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); + selectors.add(selector); + } + for (RoundRobinSupplier supplierToSet : suppliersToSet) { + supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); + assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; } - startSelectors(socketSelectors, socketSelectorThreadFactory); - for (int i = 0; i < acceptorCount; ++i) { - SocketSelector[] childSelectors = this.socketSelectors.toArray(new SocketSelector[this.socketSelectors.size()]); - Supplier selectorSupplier = new RoundRobinSupplier<>(childSelectors); - AcceptingSelector acceptor = new AcceptingSelector(acceptorEventHandlerFunction.apply(selectorSupplier)); - acceptors.add(acceptor); + for (int i = 0; i < dedicatedAcceptorCount; ++i) { + RoundRobinSupplier supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); + dedicatedAcceptors.add(acceptor); } - startSelectors(acceptors, acceptorThreadFactory); + + if (dedicatedAcceptorCount != 0) { + acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); + } else { + acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + } + selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; + assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; + + startSelectors(selectors, selectorThreadFactory); + startSelectors(dedicatedAcceptors, acceptorThreadFactory); } catch (Exception e) { try { close(); @@ -83,31 +122,25 @@ public class NioGroup implements AutoCloseable { } throw e; } - - socketSelectorSupplier = new RoundRobinSupplier<>(socketSelectors.toArray(new SocketSelector[socketSelectors.size()])); - acceptorSupplier = new RoundRobinSupplier<>(acceptors.toArray(new AcceptingSelector[acceptors.size()])); } public S bindServerChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { ensureOpen(); - if (acceptors.isEmpty()) { - throw new IllegalArgumentException("There are no acceptors configured. Without acceptors, server channels are not supported."); - } return factory.openNioServerSocketChannel(address, acceptorSupplier); } public S openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { ensureOpen(); - return factory.openNioChannel(address, socketSelectorSupplier); + return factory.openNioChannel(address, selectorSupplier); } @Override public void close() throws IOException { if (isOpen.compareAndSet(true, false)) { - List toClose = Stream.concat(acceptors.stream(), socketSelectors.stream()).collect(Collectors.toList()); + List toClose = Stream.concat(dedicatedAcceptors.stream(), selectors.stream()).collect(Collectors.toList()); List closingExceptions = new ArrayList<>(); - for (ESSelector selector : toClose) { + for (NioSelector selector : toClose) { try { selector.close(); } catch (IOException e) { @@ -118,12 +151,12 @@ public class NioGroup implements AutoCloseable { } } - private static void startSelectors(Iterable selectors, ThreadFactory threadFactory) { - for (ESSelector acceptor : selectors) { - if (acceptor.isRunning() == false) { - threadFactory.newThread(acceptor::runLoop).start(); + private static void startSelectors(Iterable selectors, ThreadFactory threadFactory) { + for (NioSelector selector : selectors) { + if (selector.isRunning() == false) { + threadFactory.newThread(selector::runLoop).start(); try { - acceptor.isRunningFuture().get(); + selector.isRunningFuture().get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("Interrupted while waiting for selector to start.", e); diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSelector.java new file mode 100644 index 00000000000..ab6709bcc5b --- /dev/null +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSelector.java @@ -0,0 +1,428 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ClosedSelectorException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +/** + * This is a nio selector implementation. This selector wraps a raw nio {@link Selector}. When you call + * {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing + * of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by + * this selector. + *

+ * Children of this class should implement the specific {@link #processKey(SelectionKey)}, + * {@link #preSelect()}, and {@link #cleanup()} functionality. + */ +public class NioSelector implements Closeable { + + private final ConcurrentLinkedQueue queuedWrites = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue> channelsToClose = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue> channelsToRegister = new ConcurrentLinkedQueue<>(); + private final EventHandler eventHandler; + private final Selector selector; + + private final ReentrantLock runLock = new ReentrantLock(); + private final CountDownLatch exitedLoop = new CountDownLatch(1); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final CompletableFuture isRunningFuture = new CompletableFuture<>(); + private final AtomicReference thread = new AtomicReference<>(null); + + public NioSelector(EventHandler eventHandler) throws IOException { + this(eventHandler, Selector.open()); + } + + public NioSelector(EventHandler eventHandler, Selector selector) throws IOException { + this.selector = selector; + this.eventHandler = eventHandler; + } + + public Selector rawSelector() { + return selector; + } + + public boolean isOpen() { + return isClosed.get() == false; + } + + public boolean isRunning() { + return runLock.isLocked(); + } + + Future isRunningFuture() { + return isRunningFuture; + } + + void setThread() { + boolean wasSet = thread.compareAndSet(null, Thread.currentThread()); + assert wasSet : "Failed to set thread as it was already set. Should only set once."; + } + + public boolean isOnCurrentThread() { + return Thread.currentThread() == thread.get(); + } + + public void assertOnSelectorThread() { + assert isOnCurrentThread() : "Must be on selector thread [" + thread.get().getName() + "} to perform this operation. " + + "Currently on thread [" + Thread.currentThread().getName() + "]."; + } + + /** + * Starts this selector. The selector will run until {@link #close()} is called. + */ + public void runLoop() { + if (runLock.tryLock()) { + isRunningFuture.complete(null); + try { + setThread(); + while (isOpen()) { + singleLoop(); + } + } finally { + try { + cleanupAndCloseChannels(); + } finally { + try { + selector.close(); + } catch (IOException e) { + eventHandler.selectorException(e); + } finally { + runLock.unlock(); + exitedLoop.countDown(); + } + } + } + } else { + throw new IllegalStateException("selector is already running"); + } + } + + void singleLoop() { + try { + closePendingChannels(); + preSelect(); + + int ready = selector.select(300); + if (ready > 0) { + Set selectionKeys = selector.selectedKeys(); + Iterator keyIterator = selectionKeys.iterator(); + while (keyIterator.hasNext()) { + SelectionKey sk = keyIterator.next(); + keyIterator.remove(); + if (sk.isValid()) { + try { + processKey(sk); + } catch (CancelledKeyException cke) { + eventHandler.genericChannelException((ChannelContext) sk.attachment(), cke); + } + } else { + eventHandler.genericChannelException((ChannelContext) sk.attachment(), new CancelledKeyException()); + } + } + } + } catch (ClosedSelectorException e) { + if (isOpen()) { + throw e; + } + } catch (IOException e) { + eventHandler.selectorException(e); + } catch (Exception e) { + eventHandler.uncaughtException(e); + } + } + + void cleanupAndCloseChannels() { + cleanup(); + channelsToClose.addAll(channelsToRegister); + channelsToRegister.clear(); + channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext) sk.attachment()).collect(Collectors.toList())); + closePendingChannels(); + } + + @Override + public void close() throws IOException { + if (isClosed.compareAndSet(false, true)) { + wakeup(); + if (isRunning()) { + try { + exitedLoop.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Thread was interrupted while waiting for selector to close", e); + } + } else if (selector.isOpen()) { + selector.close(); + } + } + } + + void processKey(SelectionKey selectionKey) { + ChannelContext context = (ChannelContext) selectionKey.attachment(); + if (selectionKey.isAcceptable()) { + assert context instanceof ServerChannelContext : "Only server channels can receive accept events"; + ServerChannelContext serverChannelContext = (ServerChannelContext) context; + int ops = selectionKey.readyOps(); + if ((ops & SelectionKey.OP_ACCEPT) != 0) { + try { + eventHandler.acceptChannel(serverChannelContext); + } catch (IOException e) { + eventHandler.acceptException(serverChannelContext, e); + } + } + } else { + assert context instanceof SocketChannelContext : "Only sockets channels can receive non-accept events"; + SocketChannelContext channelContext = (SocketChannelContext) context; + int ops = selectionKey.readyOps(); + if ((ops & SelectionKey.OP_CONNECT) != 0) { + attemptConnect(channelContext, true); + } + + if (channelContext.isConnectComplete()) { + if ((ops & SelectionKey.OP_WRITE) != 0) { + handleWrite(channelContext); + } + + if ((ops & SelectionKey.OP_READ) != 0) { + handleRead(channelContext); + } + } + eventHandler.postHandling(channelContext); + } + + } + + /** + * Called immediately prior to a raw {@link Selector#select()} call. Should be used to implement + * channel registration, handling queued writes, and other work that is not specifically processing + * a selection key. + */ + void preSelect() { + setUpNewChannels(); + handleQueuedWrites(); + } + + /** + * Called once as the selector is being closed. + */ + void cleanup() { + WriteOperation op; + while ((op = queuedWrites.poll()) != null) { + executeFailedListener(op.getListener(), new ClosedSelectorException()); + } + } + + /** + * Queues a write operation to be handled by the event loop. This can be called by any thread and is the + * api available for non-selector threads to schedule writes. + * + * @param writeOperation to be queued + */ + public void queueWrite(WriteOperation writeOperation) { + queuedWrites.offer(writeOperation); + if (isOpen() == false) { + boolean wasRemoved = queuedWrites.remove(writeOperation); + if (wasRemoved) { + writeOperation.getListener().accept(null, new ClosedSelectorException()); + } + } else { + wakeup(); + } + } + + public void queueChannelClose(NioChannel channel) { + ChannelContext context = channel.getContext(); + assert context.getSelector() == this : "Must schedule a channel for closure with its selector"; + channelsToClose.offer(context); + ensureSelectorOpenForEnqueuing(channelsToClose, context); + wakeup(); + } + + /** + * Schedules a NioChannel to be registered with this selector. The channel will by queued and + * eventually registered next time through the event loop. + * + * @param channel to register + */ + public void scheduleForRegistration(NioChannel channel) { + ChannelContext context = channel.getContext(); + channelsToRegister.add(context); + ensureSelectorOpenForEnqueuing(channelsToRegister, context); + wakeup(); + } + + /** + * Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed + * by the selector thread. As a result, this method should only be called by the selector thread. + * + * @param writeOperation to be queued in a channel's buffer + */ + public void queueWriteInChannelBuffer(WriteOperation writeOperation) { + assertOnSelectorThread(); + SocketChannelContext context = writeOperation.getChannel(); + try { + SelectionKeyUtils.setWriteInterested(context.getSelectionKey()); + context.queueWriteOperation(writeOperation); + } catch (Exception e) { + executeFailedListener(writeOperation.getListener(), e); + } + } + + /** + * Executes a success listener with consistent exception handling. This can only be called from current + * selector thread. + * + * @param listener to be executed + * @param value to provide to listener + */ + public void executeListener(BiConsumer listener, V value) { + assertOnSelectorThread(); + try { + listener.accept(value, null); + } catch (Exception e) { + eventHandler.listenerException(e); + } + } + + /** + * Executes a failed listener with consistent exception handling. This can only be called from current + * selector thread. + * + * @param listener to be executed + * @param exception to provide to listener + */ + public void executeFailedListener(BiConsumer listener, Exception exception) { + assertOnSelectorThread(); + try { + listener.accept(null, exception); + } catch (Exception e) { + eventHandler.listenerException(e); + } + } + + private void wakeup() { + // TODO: Do we need the wakeup optimizations that some other libraries use? + selector.wakeup(); + } + + private void handleWrite(SocketChannelContext context) { + try { + eventHandler.handleWrite(context); + } catch (Exception e) { + eventHandler.writeException(context, e); + } + } + + private void handleRead(SocketChannelContext context) { + try { + eventHandler.handleRead(context); + } catch (Exception e) { + eventHandler.readException(context, e); + } + } + + private void attemptConnect(SocketChannelContext context, boolean connectEvent) { + try { + eventHandler.handleConnect(context); + if (connectEvent && context.isConnectComplete() == false) { + eventHandler.connectException(context, new IOException("Received OP_CONNECT but connect failed")); + } + } catch (Exception e) { + eventHandler.connectException(context, e); + } + } + + private void setUpNewChannels() { + ChannelContext newChannel; + while ((newChannel = this.channelsToRegister.poll()) != null) { + assert newChannel.getSelector() == this : "The channel must be registered with the selector with which it was created"; + try { + if (newChannel.isOpen()) { + eventHandler.handleRegistration(newChannel); + if (newChannel instanceof SocketChannelContext) { + attemptConnect((SocketChannelContext) newChannel, false); + } + } else { + eventHandler.registrationException(newChannel, new ClosedChannelException()); + } + } catch (Exception e) { + eventHandler.registrationException(newChannel, e); + } + } + } + + private void closePendingChannels() { + ChannelContext channelContext; + while ((channelContext = channelsToClose.poll()) != null) { + eventHandler.handleClose(channelContext); + } + } + + private void handleQueuedWrites() { + WriteOperation writeOperation; + while ((writeOperation = queuedWrites.poll()) != null) { + if (writeOperation.getChannel().isOpen()) { + queueWriteInChannelBuffer(writeOperation); + } else { + executeFailedListener(writeOperation.getListener(), new ClosedChannelException()); + } + } + } + + /** + * This is a convenience method to be called after some object (normally channels) are enqueued with this + * selector. This method will check if the selector is still open. If it is open, normal operation can + * proceed. + * + * If the selector is closed, then we attempt to remove the object from the queue. If the removal + * succeeds then we throw an {@link IllegalStateException} indicating that normal operation failed. If + * the object cannot be removed from the queue, then the object has already been handled by the selector + * and operation can proceed normally. + * + * If this method is called from the selector thread, we will not allow the queuing to occur as the + * selector thread can manipulate its queues internally even if it is no longer open. + * + * @param queue the queue to which the object was added + * @param objectAdded the objected added + * @param the object type + */ + private void ensureSelectorOpenForEnqueuing(ConcurrentLinkedQueue queue, O objectAdded) { + if (isOpen() == false && isOnCurrentThread() == false) { + if (queue.remove(objectAdded)) { + throw new IllegalStateException("selector is already closed"); + } + } + } +} diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java index 311403a4885..4d4de689fe7 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java @@ -19,21 +19,39 @@ package org.elasticsearch.nio; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -public class RoundRobinSupplier implements Supplier { +final class RoundRobinSupplier implements Supplier { - private final S[] selectors; - private final int count; + private final AtomicBoolean selectorsSet = new AtomicBoolean(false); + private volatile S[] selectors; private AtomicInteger counter = new AtomicInteger(0); + RoundRobinSupplier() { + this.selectors = null; + } + RoundRobinSupplier(S[] selectors) { - this.count = selectors.length; this.selectors = selectors; + this.selectorsSet.set(true); } public S get() { - return selectors[counter.getAndIncrement() % count]; + S[] selectors = this.selectors; + return selectors[counter.getAndIncrement() % selectors.length]; + } + + void setSelectors(S[] selectors) { + if (selectorsSet.compareAndSet(false, true)) { + this.selectors = selectors; + } else { + throw new AssertionError("Selectors already set. Should only be set once."); + } + } + + int count() { + return selectors.length; } } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ServerChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ServerChannelContext.java index 4b47ce063f9..9e1af3e9973 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ServerChannelContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ServerChannelContext.java @@ -28,12 +28,12 @@ import java.util.function.Supplier; public class ServerChannelContext extends ChannelContext { private final NioServerSocketChannel channel; - private final AcceptingSelector selector; + private final NioSelector selector; private final Consumer acceptor; private final AtomicBoolean isClosing = new AtomicBoolean(false); private final ChannelFactory channelFactory; - public ServerChannelContext(NioServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector, + public ServerChannelContext(NioServerSocketChannel channel, ChannelFactory channelFactory, NioSelector selector, Consumer acceptor, Consumer exceptionHandler) { super(channel.getRawChannel(), exceptionHandler); this.channel = channel; @@ -42,7 +42,7 @@ public class ServerChannelContext extends ChannelContext { this.acceptor = acceptor; } - public void acceptChannels(Supplier selectorSupplier) throws IOException { + public void acceptChannels(Supplier selectorSupplier) throws IOException { NioSocketChannel acceptedChannel; while ((acceptedChannel = channelFactory.acceptNioChannel(this, selectorSupplier)) != null) { acceptor.accept(acceptedChannel); @@ -57,7 +57,7 @@ public class ServerChannelContext extends ChannelContext { } @Override - public AcceptingSelector getSelector() { + public NioSelector getSelector() { return selector; } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java index 6a769b4d173..53be0e7f89f 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java @@ -47,14 +47,14 @@ public abstract class SocketChannelContext extends ChannelContext protected final InboundChannelBuffer channelBuffer; protected final AtomicBoolean isClosing = new AtomicBoolean(false); private final ReadWriteHandler readWriteHandler; - private final SocketSelector selector; + private final NioSelector selector; private final CompletableContext connectContext = new CompletableContext<>(); private final LinkedList pendingFlushes = new LinkedList<>(); private boolean ioException; private boolean peerClosed; private Exception connectException; - protected SocketChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer exceptionHandler, + protected SocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { super(channel.getRawChannel(), exceptionHandler); this.selector = selector; @@ -64,7 +64,7 @@ public abstract class SocketChannelContext extends ChannelContext } @Override - public SocketSelector getSelector() { + public NioSelector getSelector() { return selector; } @@ -129,7 +129,7 @@ public abstract class SocketChannelContext extends ChannelContext WriteOperation writeOperation = readWriteHandler.createWriteOperation(this, message, listener); - SocketSelector selector = getSelector(); + NioSelector selector = getSelector(); if (selector.isOnCurrentThread() == false) { selector.queueWrite(writeOperation); return; diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java deleted file mode 100644 index b486243f219..00000000000 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.nio; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.util.function.Consumer; - -/** - * Event handler designed to handle events from non-server sockets - */ -public class SocketEventHandler extends EventHandler { - - public SocketEventHandler(Consumer exceptionHandler) { - super(exceptionHandler); - } - - /** - * This method is called when a NioSocketChannel is successfully registered. It should only be called - * once per channel. - * - * @param context that was registered - */ - protected void handleRegistration(SocketChannelContext context) throws IOException { - context.register(); - SelectionKey selectionKey = context.getSelectionKey(); - selectionKey.attach(context); - if (context.readyForFlush()) { - SelectionKeyUtils.setConnectReadAndWriteInterested(selectionKey); - } else { - SelectionKeyUtils.setConnectAndReadInterested(selectionKey); - } - } - - /** - * This method is called when an attempt to register a channel throws an exception. - * - * @param context that was registered - * @param exception that occurred - */ - protected void registrationException(SocketChannelContext context, Exception exception) { - context.handleException(exception); - } - - /** - * This method is called when a NioSocketChannel has just been accepted or if it has receive an - * OP_CONNECT event. - * - * @param context that was registered - */ - protected void handleConnect(SocketChannelContext context) throws IOException { - if (context.connect()) { - SelectionKeyUtils.removeConnectInterested(context.getSelectionKey()); - } - } - - /** - * This method is called when an attempt to connect a channel throws an exception. - * - * @param context that was connecting - * @param exception that occurred - */ - protected void connectException(SocketChannelContext context, Exception exception) { - context.handleException(exception); - } - - /** - * This method is called when a channel signals it is ready for be read. All of the read logic should - * occur in this call. - * - * @param context that can be read - */ - protected void handleRead(SocketChannelContext context) throws IOException { - context.read(); - } - - /** - * This method is called when an attempt to read from a channel throws an exception. - * - * @param context that was being read - * @param exception that occurred - */ - protected void readException(SocketChannelContext context, Exception exception) { - context.handleException(exception); - } - - /** - * This method is called when a channel signals it is ready to receive writes. All of the write logic - * should occur in this call. - * - * @param context that can be written to - */ - protected void handleWrite(SocketChannelContext context) throws IOException { - context.flushChannel(); - } - - /** - * This method is called when an attempt to write to a channel throws an exception. - * - * @param context that was being written to - * @param exception that occurred - */ - protected void writeException(SocketChannelContext context, Exception exception) { - context.handleException(exception); - } - - /** - * This method is called when a listener attached to a channel operation throws an exception. - * - * @param exception that occurred - */ - protected void listenerException(Exception exception) { - exceptionHandler.accept(exception); - } - - /** - * @param context that was handled - */ - protected void postHandling(SocketChannelContext context) { - if (context.selectorShouldClose()) { - handleClose(context); - } else { - SelectionKey selectionKey = context.getSelectionKey(); - boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey); - boolean pendingWrites = context.readyForFlush(); - if (currentlyWriteInterested == false && pendingWrites) { - SelectionKeyUtils.setWriteInterested(selectionKey); - } else if (currentlyWriteInterested && pendingWrites == false) { - SelectionKeyUtils.removeWriteInterested(selectionKey); - } - } - } -} diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java deleted file mode 100644 index 30ef7b317a3..00000000000 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.nio; - -import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.BiConsumer; - -/** - * Selector implementation that handles {@link NioSocketChannel}. It's main piece of functionality is - * handling connect, read, and write events. - */ -public class SocketSelector extends ESSelector { - - private final ConcurrentLinkedQueue newChannels = new ConcurrentLinkedQueue<>(); - private final ConcurrentLinkedQueue queuedWrites = new ConcurrentLinkedQueue<>(); - private final SocketEventHandler eventHandler; - - public SocketSelector(SocketEventHandler eventHandler) throws IOException { - super(eventHandler); - this.eventHandler = eventHandler; - } - - public SocketSelector(SocketEventHandler eventHandler, Selector selector) throws IOException { - super(eventHandler, selector); - this.eventHandler = eventHandler; - } - - @Override - void processKey(SelectionKey selectionKey) { - SocketChannelContext channelContext = (SocketChannelContext) selectionKey.attachment(); - int ops = selectionKey.readyOps(); - if ((ops & SelectionKey.OP_CONNECT) != 0) { - attemptConnect(channelContext, true); - } - - if (channelContext.isConnectComplete()) { - if ((ops & SelectionKey.OP_WRITE) != 0) { - handleWrite(channelContext); - } - - if ((ops & SelectionKey.OP_READ) != 0) { - handleRead(channelContext); - } - } - - eventHandler.postHandling(channelContext); - } - - @Override - void preSelect() { - setUpNewChannels(); - handleQueuedWrites(); - } - - @Override - void cleanup() { - WriteOperation op; - while ((op = queuedWrites.poll()) != null) { - executeFailedListener(op.getListener(), new ClosedSelectorException()); - } - channelsToClose.addAll(newChannels); - } - - /** - * Schedules a NioSocketChannel to be registered by this selector. The channel will by queued and eventually - * registered next time through the event loop. - * @param nioSocketChannel the channel to register - */ - public void scheduleForRegistration(NioSocketChannel nioSocketChannel) { - SocketChannelContext channelContext = nioSocketChannel.getContext(); - newChannels.offer(channelContext); - ensureSelectorOpenForEnqueuing(newChannels, channelContext); - wakeup(); - } - - - /** - * Queues a write operation to be handled by the event loop. This can be called by any thread and is the - * api available for non-selector threads to schedule writes. - * - * @param writeOperation to be queued - */ - public void queueWrite(WriteOperation writeOperation) { - queuedWrites.offer(writeOperation); - if (isOpen() == false) { - boolean wasRemoved = queuedWrites.remove(writeOperation); - if (wasRemoved) { - writeOperation.getListener().accept(null, new ClosedSelectorException()); - } - } else { - wakeup(); - } - } - - /** - * Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed - * by the selector thread. As a result, this method should only be called by the selector thread. - * - * @param writeOperation to be queued in a channel's buffer - */ - public void queueWriteInChannelBuffer(WriteOperation writeOperation) { - assertOnSelectorThread(); - SocketChannelContext context = writeOperation.getChannel(); - try { - SelectionKeyUtils.setWriteInterested(context.getSelectionKey()); - context.queueWriteOperation(writeOperation); - } catch (Exception e) { - executeFailedListener(writeOperation.getListener(), e); - } - } - - /** - * Executes a success listener with consistent exception handling. This can only be called from current - * selector thread. - * - * @param listener to be executed - * @param value to provide to listener - */ - public void executeListener(BiConsumer listener, V value) { - assertOnSelectorThread(); - try { - listener.accept(value, null); - } catch (Exception e) { - eventHandler.listenerException(e); - } - } - - /** - * Executes a failed listener with consistent exception handling. This can only be called from current - * selector thread. - * - * @param listener to be executed - * @param exception to provide to listener - */ - public void executeFailedListener(BiConsumer listener, Exception exception) { - assertOnSelectorThread(); - try { - listener.accept(null, exception); - } catch (Exception e) { - eventHandler.listenerException(e); - } - } - - private void handleWrite(SocketChannelContext context) { - try { - eventHandler.handleWrite(context); - } catch (Exception e) { - eventHandler.writeException(context, e); - } - } - - private void handleRead(SocketChannelContext context) { - try { - eventHandler.handleRead(context); - } catch (Exception e) { - eventHandler.readException(context, e); - } - } - - private void handleQueuedWrites() { - WriteOperation writeOperation; - while ((writeOperation = queuedWrites.poll()) != null) { - if (writeOperation.getChannel().isOpen()) { - queueWriteInChannelBuffer(writeOperation); - } else { - executeFailedListener(writeOperation.getListener(), new ClosedChannelException()); - } - } - } - - private void setUpNewChannels() { - SocketChannelContext channelContext; - while ((channelContext = this.newChannels.poll()) != null) { - setupChannel(channelContext); - } - } - - private void setupChannel(SocketChannelContext context) { - assert context.getSelector() == this : "The channel must be registered with the selector with which it was created"; - try { - if (context.isOpen()) { - eventHandler.handleRegistration(context); - attemptConnect(context, false); - } else { - eventHandler.registrationException(context, new ClosedChannelException()); - } - } catch (Exception e) { - eventHandler.registrationException(context, e); - } - } - - private void attemptConnect(SocketChannelContext context, boolean connectEvent) { - try { - eventHandler.handleConnect(context); - if (connectEvent && context.isConnectComplete() == false) { - eventHandler.connectException(context, new IOException("Received OP_CONNECT but connect failed")); - } - } catch (Exception e) { - eventHandler.connectException(context, e); - } - } -} diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptingSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptingSelectorTests.java deleted file mode 100644 index 7536ad9d1e1..00000000000 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptingSelectorTests.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.nio; - -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.security.PrivilegedActionException; -import java.util.Collections; -import java.util.HashSet; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class AcceptingSelectorTests extends ESTestCase { - - private AcceptingSelector selector; - private NioServerSocketChannel serverChannel; - private AcceptorEventHandler eventHandler; - private TestSelectionKey selectionKey; - private Selector rawSelector; - private ServerChannelContext context; - - @Before - public void setUp() throws Exception { - super.setUp(); - - eventHandler = mock(AcceptorEventHandler.class); - serverChannel = mock(NioServerSocketChannel.class); - - rawSelector = mock(Selector.class); - selector = new AcceptingSelector(eventHandler, rawSelector); - this.selector.setThread(); - - context = mock(ServerChannelContext.class); - selectionKey = new TestSelectionKey(0); - selectionKey.attach(context); - when(context.getSelectionKey()).thenReturn(selectionKey); - when(context.getSelector()).thenReturn(selector); - when(context.isOpen()).thenReturn(true); - when(serverChannel.getContext()).thenReturn(context); - } - - public void testRegisteredChannel() throws IOException { - selector.scheduleForRegistration(serverChannel); - - selector.preSelect(); - - verify(eventHandler).handleRegistration(context); - } - - public void testClosedChannelWillNotBeRegistered() { - when(context.isOpen()).thenReturn(false); - selector.scheduleForRegistration(serverChannel); - - selector.preSelect(); - - verify(eventHandler).registrationException(same(context), any(ClosedChannelException.class)); - } - - public void testRegisterChannelFailsDueToException() throws Exception { - selector.scheduleForRegistration(serverChannel); - - ClosedChannelException closedChannelException = new ClosedChannelException(); - doThrow(closedChannelException).when(eventHandler).handleRegistration(context); - - selector.preSelect(); - - verify(eventHandler).registrationException(context, closedChannelException); - } - - public void testAcceptEvent() throws IOException { - selectionKey.setReadyOps(SelectionKey.OP_ACCEPT); - - selector.processKey(selectionKey); - - verify(eventHandler).acceptChannel(context); - } - - public void testAcceptException() throws IOException { - selectionKey.setReadyOps(SelectionKey.OP_ACCEPT); - IOException ioException = new IOException(); - - doThrow(ioException).when(eventHandler).acceptChannel(context); - - selector.processKey(selectionKey); - - verify(eventHandler).acceptException(context, ioException); - } - - public void testCleanup() throws IOException { - selector.scheduleForRegistration(serverChannel); - - selector.preSelect(); - - TestSelectionKey key = new TestSelectionKey(0); - key.attach(context); - when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(key))); - - selector.cleanupAndCloseChannels(); - - verify(eventHandler).handleClose(context); - } -} diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java deleted file mode 100644 index a162a8e234c..00000000000 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.nio; - -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.function.Consumer; - -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class AcceptorEventHandlerTests extends ESTestCase { - - private AcceptorEventHandler handler; - private ChannelFactory channelFactory; - private NioServerSocketChannel channel; - private DoNotRegisterContext context; - private RoundRobinSupplier selectorSupplier; - - @Before - @SuppressWarnings("unchecked") - public void setUpHandler() throws IOException { - channelFactory = mock(ChannelFactory.class); - ArrayList selectors = new ArrayList<>(); - selectors.add(mock(SocketSelector.class)); - selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new SocketSelector[selectors.size()])); - handler = new AcceptorEventHandler(selectorSupplier, mock(Consumer.class)); - - channel = new NioServerSocketChannel(mock(ServerSocketChannel.class)); - context = new DoNotRegisterContext(channel, mock(AcceptingSelector.class), mock(Consumer.class)); - channel.setContext(context); - } - - public void testHandleRegisterSetsOP_ACCEPTInterest() throws IOException { - assertNull(context.getSelectionKey()); - - handler.handleRegistration(context); - - assertEquals(SelectionKey.OP_ACCEPT, channel.getContext().getSelectionKey().interestOps()); - } - - public void testRegisterAddsAttachment() throws IOException { - assertNull(context.getSelectionKey()); - - handler.handleRegistration(context); - - assertEquals(context, context.getSelectionKey().attachment()); - } - - public void testHandleAcceptCallsChannelFactory() throws IOException { - NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class)); - NioSocketChannel nullChannel = null; - when(channelFactory.acceptNioChannel(same(context), same(selectorSupplier))).thenReturn(childChannel, nullChannel); - - handler.acceptChannel(context); - - verify(channelFactory, times(2)).acceptNioChannel(same(context), same(selectorSupplier)); - } - - @SuppressWarnings("unchecked") - public void testHandleAcceptCallsServerAcceptCallback() throws IOException { - NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class)); - SocketChannelContext childContext = mock(SocketChannelContext.class); - childChannel.setContext(childContext); - ServerChannelContext serverChannelContext = mock(ServerChannelContext.class); - channel = new NioServerSocketChannel(mock(ServerSocketChannel.class)); - channel.setContext(serverChannelContext); - when(serverChannelContext.getChannel()).thenReturn(channel); - when(channelFactory.acceptNioChannel(same(context), same(selectorSupplier))).thenReturn(childChannel); - - handler.acceptChannel(serverChannelContext); - - verify(serverChannelContext).acceptChannels(selectorSupplier); - } - - public void testAcceptExceptionCallsExceptionHandler() throws IOException { - ServerChannelContext serverChannelContext = mock(ServerChannelContext.class); - IOException exception = new IOException(); - handler.acceptException(serverChannelContext, exception); - - verify(serverChannelContext).handleException(exception); - } - - private class DoNotRegisterContext extends ServerChannelContext { - - - @SuppressWarnings("unchecked") - DoNotRegisterContext(NioServerSocketChannel channel, AcceptingSelector selector, Consumer acceptor) { - super(channel, channelFactory, selector, acceptor, mock(Consumer.class)); - } - - @Override - public void register() { - setSelectionKey(new TestSelectionKey(0)); - } - } -} diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java index e5c236e48a8..2ab20522db6 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java @@ -44,7 +44,7 @@ public class BytesChannelContextTests extends ESTestCase { private SocketChannel rawChannel; private BytesChannelContext context; private InboundChannelBuffer channelBuffer; - private SocketSelector selector; + private NioSelector selector; private BiConsumer listener; private int messageLength; @@ -54,7 +54,7 @@ public class BytesChannelContextTests extends ESTestCase { readConsumer = mock(CheckedFunction.class); messageLength = randomInt(96) + 20; - selector = mock(SocketSelector.class); + selector = mock(NioSelector.class); listener = mock(BiConsumer.class); channel = mock(NioSocketChannel.class); rawChannel = mock(SocketChannel.class); diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java index 586dae83d08..13372055668 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java @@ -115,7 +115,7 @@ public class ChannelContextTests extends ESTestCase { } @Override - public ESSelector getSelector() { + public NioSelector getSelector() { throw new UnsupportedOperationException("not implemented"); } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java index 858f547f8a6..8ff0cfcd0c8 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java @@ -43,18 +43,18 @@ public class ChannelFactoryTests extends ESTestCase { private ChannelFactory.RawChannelFactory rawChannelFactory; private SocketChannel rawChannel; private ServerSocketChannel rawServerChannel; - private SocketSelector socketSelector; - private Supplier socketSelectorSupplier; - private Supplier acceptingSelectorSupplier; - private AcceptingSelector acceptingSelector; + private NioSelector socketSelector; + private Supplier socketSelectorSupplier; + private Supplier acceptingSelectorSupplier; + private NioSelector acceptingSelector; @Before @SuppressWarnings("unchecked") public void setupFactory() throws IOException { rawChannelFactory = mock(ChannelFactory.RawChannelFactory.class); channelFactory = new TestChannelFactory(rawChannelFactory); - socketSelector = mock(SocketSelector.class); - acceptingSelector = mock(AcceptingSelector.class); + socketSelector = mock(NioSelector.class); + acceptingSelector = mock(NioSelector.class); socketSelectorSupplier = mock(Supplier.class); acceptingSelectorSupplier = mock(Supplier.class); rawChannel = SocketChannel.open(); @@ -139,14 +139,14 @@ public class ChannelFactoryTests extends ESTestCase { @SuppressWarnings("unchecked") @Override - public NioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException { + public NioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { NioSocketChannel nioSocketChannel = new NioSocketChannel(channel); nioSocketChannel.setContext(mock(SocketChannelContext.class)); return nioSocketChannel; } @Override - public NioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { + public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { return new NioServerSocketChannel(channel); } } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java deleted file mode 100644 index 05b84345f45..00000000000 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.nio; - -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.io.IOException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedSelectorException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; - -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class ESSelectorTests extends ESTestCase { - - private ESSelector selector; - private EventHandler handler; - private Selector rawSelector; - - @Before - public void setUp() throws Exception { - super.setUp(); - handler = mock(EventHandler.class); - rawSelector = mock(Selector.class); - selector = new TestSelector(handler, rawSelector); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - public void testQueueChannelForClosed() throws IOException { - NioChannel channel = mock(NioChannel.class); - ChannelContext context = mock(ChannelContext.class); - when(channel.getContext()).thenReturn(context); - when(context.getSelector()).thenReturn(selector); - - selector.queueChannelClose(channel); - - selector.singleLoop(); - - verify(handler).handleClose(context); - } - - public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException { - boolean closedSelectorExceptionCaught = false; - when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException()); - try { - this.selector.singleLoop(); - } catch (ClosedSelectorException e) { - closedSelectorExceptionCaught = true; - } - - assertTrue(closedSelectorExceptionCaught); - } - - public void testIOExceptionWhileSelect() throws IOException { - IOException ioException = new IOException(); - - when(rawSelector.select(anyInt())).thenThrow(ioException); - - this.selector.singleLoop(); - - verify(handler).selectorException(ioException); - } - - public void testSelectorClosedIfOpenAndEventLoopNotRunning() throws IOException { - when(rawSelector.isOpen()).thenReturn(true); - selector.close(); - verify(rawSelector).close(); - } - - private static class TestSelector extends ESSelector { - - TestSelector(EventHandler eventHandler, Selector selector) throws IOException { - super(eventHandler, selector); - } - - @Override - void processKey(SelectionKey selectionKey) throws CancelledKeyException { - - } - - @Override - void preSelect() { - - } - - @Override - void cleanup() { - - } - } - -} diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java similarity index 66% rename from libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java index c85d9c0c5a8..a9e1836199e 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java @@ -25,25 +25,29 @@ import org.junit.Before; import java.io.IOException; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.Collections; import java.util.function.Consumer; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class SocketEventHandlerTests extends ESTestCase { +public class EventHandlerTests extends ESTestCase { private Consumer channelExceptionHandler; private Consumer genericExceptionHandler; private ReadWriteHandler readWriteHandler; - private SocketEventHandler handler; - private NioSocketChannel channel; - private SocketChannel rawChannel; - private DoNotRegisterContext context; + private EventHandler handler; + private DoNotRegisterSocketContext context; + private DoNotRegisterServerContext serverContext; + private ChannelFactory channelFactory; + private RoundRobinSupplier selectorSupplier; @Before @SuppressWarnings("unchecked") @@ -51,16 +55,24 @@ public class SocketEventHandlerTests extends ESTestCase { channelExceptionHandler = mock(Consumer.class); genericExceptionHandler = mock(Consumer.class); readWriteHandler = mock(ReadWriteHandler.class); - SocketSelector selector = mock(SocketSelector.class); - handler = new SocketEventHandler(genericExceptionHandler); - rawChannel = mock(SocketChannel.class); - channel = new NioSocketChannel(rawChannel); - when(rawChannel.finishConnect()).thenReturn(true); + channelFactory = mock(ChannelFactory.class); + NioSelector selector = mock(NioSelector.class); + ArrayList selectors = new ArrayList<>(); + selectors.add(selector); + selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); + handler = new EventHandler(genericExceptionHandler, selectorSupplier); - context = new DoNotRegisterContext(channel, selector, channelExceptionHandler, new TestSelectionKey(0), readWriteHandler); + SocketChannel rawChannel = mock(SocketChannel.class); + when(rawChannel.finishConnect()).thenReturn(true); + NioSocketChannel channel = new NioSocketChannel(rawChannel); + context = new DoNotRegisterSocketContext(channel, selector, channelExceptionHandler, readWriteHandler); channel.setContext(context); handler.handleRegistration(context); + NioServerSocketChannel serverChannel = new NioServerSocketChannel(mock(ServerSocketChannel.class)); + serverContext = new DoNotRegisterServerContext(serverChannel, mock(NioSelector.class), mock(Consumer.class)); + serverChannel.setContext(serverContext); + when(selector.isOnCurrentThread()).thenReturn(true); } @@ -73,7 +85,7 @@ public class SocketEventHandlerTests extends ESTestCase { verify(channelContext).register(); } - public void testRegisterAddsOP_CONNECTAndOP_READInterest() throws IOException { + public void testRegisterNonServerAddsOP_CONNECTAndOP_READInterest() throws IOException { SocketChannelContext context = mock(SocketChannelContext.class); when(context.getSelectionKey()).thenReturn(new TestSelectionKey(0)); handler.handleRegistration(context); @@ -81,16 +93,55 @@ public class SocketEventHandlerTests extends ESTestCase { } public void testRegisterAddsAttachment() throws IOException { - SocketChannelContext context = mock(SocketChannelContext.class); + ChannelContext context = randomBoolean() ? mock(SocketChannelContext.class) : mock(ServerChannelContext.class); when(context.getSelectionKey()).thenReturn(new TestSelectionKey(0)); handler.handleRegistration(context); assertEquals(context, context.getSelectionKey().attachment()); } + public void testHandleServerRegisterSetsOP_ACCEPTInterest() throws IOException { + assertNull(serverContext.getSelectionKey()); + + handler.handleRegistration(serverContext); + + assertEquals(SelectionKey.OP_ACCEPT, serverContext.getSelectionKey().interestOps()); + } + + public void testHandleAcceptCallsChannelFactory() throws IOException { + NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class)); + NioSocketChannel nullChannel = null; + when(channelFactory.acceptNioChannel(same(serverContext), same(selectorSupplier))).thenReturn(childChannel, nullChannel); + + handler.acceptChannel(serverContext); + + verify(channelFactory, times(2)).acceptNioChannel(same(serverContext), same(selectorSupplier)); + } + + @SuppressWarnings("unchecked") + public void testHandleAcceptCallsServerAcceptCallback() throws IOException { + NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class)); + SocketChannelContext childContext = mock(SocketChannelContext.class); + childChannel.setContext(childContext); + ServerChannelContext serverChannelContext = mock(ServerChannelContext.class); + when(channelFactory.acceptNioChannel(same(serverContext), same(selectorSupplier))).thenReturn(childChannel); + + handler.acceptChannel(serverChannelContext); + + verify(serverChannelContext).acceptChannels(selectorSupplier); + } + + public void testAcceptExceptionCallsExceptionHandler() throws IOException { + ServerChannelContext serverChannelContext = mock(ServerChannelContext.class); + IOException exception = new IOException(); + handler.acceptException(serverChannelContext, exception); + + verify(serverChannelContext).handleException(exception); + } + public void testRegisterWithPendingWritesAddsOP_CONNECTAndOP_READAndOP_WRITEInterest() throws IOException { FlushReadyWrite flushReadyWrite = mock(FlushReadyWrite.class); when(readWriteHandler.writeToBytes(flushReadyWrite)).thenReturn(Collections.singletonList(flushReadyWrite)); - channel.getContext().queueWriteOperation(flushReadyWrite); + context.queueWriteOperation(flushReadyWrite); handler.handleRegistration(context); assertEquals(SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE, context.getSelectionKey().interestOps()); } @@ -120,11 +171,7 @@ public class SocketEventHandlerTests extends ESTestCase { } public void testHandleReadDelegatesToContext() throws IOException { - NioSocketChannel channel = new NioSocketChannel(rawChannel); SocketChannelContext context = mock(SocketChannelContext.class); - channel.setContext(context); - - when(context.read()).thenReturn(1); handler.handleRead(context); verify(context).read(); } @@ -200,19 +247,31 @@ public class SocketEventHandlerTests extends ESTestCase { verify(genericExceptionHandler).accept(listenerException); } - private class DoNotRegisterContext extends BytesChannelContext { + private class DoNotRegisterSocketContext extends BytesChannelContext { - private final TestSelectionKey selectionKey; - DoNotRegisterContext(NioSocketChannel channel, SocketSelector selector, Consumer exceptionHandler, - TestSelectionKey selectionKey, ReadWriteHandler handler) { + DoNotRegisterSocketContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, + ReadWriteHandler handler) { super(channel, selector, exceptionHandler, handler, InboundChannelBuffer.allocatingInstance()); - this.selectionKey = selectionKey; } @Override public void register() { - setSelectionKey(selectionKey); + setSelectionKey(new TestSelectionKey(0)); + } + } + + private class DoNotRegisterServerContext extends ServerChannelContext { + + + @SuppressWarnings("unchecked") + DoNotRegisterServerContext(NioServerSocketChannel channel, NioSelector selector, Consumer acceptor) { + super(channel, channelFactory, selector, acceptor, mock(Consumer.class)); + } + + @Override + public void register() { + setSelectionKey(new TestSelectionKey(0)); } } } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java index 13ce2c13654..027f1255a59 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java @@ -38,9 +38,8 @@ public class NioGroupTests extends ESTestCase { @SuppressWarnings("unchecked") public void setUp() throws Exception { super.setUp(); - nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, - (s) -> new AcceptorEventHandler(s, mock(Consumer.class)), daemonThreadFactory(Settings.EMPTY, "selector"), 1, - () -> new SocketEventHandler(mock(Consumer.class))); + nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, daemonThreadFactory(Settings.EMPTY, "selector"), 1, + (s) -> new EventHandler(mock(Consumer.class), s)); } @Override @@ -76,8 +75,8 @@ public class NioGroupTests extends ESTestCase { public void testExceptionAtStartIsHandled() throws IOException { RuntimeException ex = new RuntimeException(); CheckedRunnable ctor = () -> new NioGroup(r -> {throw ex;}, 1, - (s) -> new AcceptorEventHandler(s, mock(Consumer.class)), daemonThreadFactory(Settings.EMPTY, "selector"), - 1, () -> new SocketEventHandler(mock(Consumer.class))); + daemonThreadFactory(Settings.EMPTY, "selector"), + 1, (s) -> new EventHandler(mock(Consumer.class), s)); RuntimeException runtimeException = expectThrows(RuntimeException.class, ctor::run); assertSame(ex, runtimeException); // ctor starts threads. So we are testing that a failure to construct will stop threads. Our thread diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java similarity index 61% rename from libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java index f8775d03b42..dd3fea8bf50 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java @@ -43,13 +43,15 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class SocketSelectorTests extends ESTestCase { +public class NioSelectorTests extends ESTestCase { - private SocketSelector socketSelector; - private SocketEventHandler eventHandler; + private NioSelector selector; + private EventHandler eventHandler; private NioSocketChannel channel; + private NioServerSocketChannel serverChannel; private TestSelectionKey selectionKey; private SocketChannelContext channelContext; + private ServerChannelContext serverChannelContext; private BiConsumer listener; private ByteBuffer[] buffers = {ByteBuffer.allocate(1)}; private Selector rawSelector; @@ -59,75 +61,172 @@ public class SocketSelectorTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); rawSelector = mock(Selector.class); - eventHandler = mock(SocketEventHandler.class); + eventHandler = mock(EventHandler.class); channel = mock(NioSocketChannel.class); channelContext = mock(SocketChannelContext.class); + serverChannel = mock(NioServerSocketChannel.class); + serverChannelContext = mock(ServerChannelContext.class); listener = mock(BiConsumer.class); selectionKey = new TestSelectionKey(0); - selectionKey.attach(channelContext); - this.socketSelector = new SocketSelector(eventHandler, rawSelector); - this.socketSelector.setThread(); + this.selector = new NioSelector(eventHandler, rawSelector); + this.selector.setThread(); when(channel.getContext()).thenReturn(channelContext); when(channelContext.isOpen()).thenReturn(true); - when(channelContext.getSelector()).thenReturn(socketSelector); + when(channelContext.getSelector()).thenReturn(selector); when(channelContext.getSelectionKey()).thenReturn(selectionKey); when(channelContext.isConnectComplete()).thenReturn(true); + + when(serverChannel.getContext()).thenReturn(serverChannelContext); + when(serverChannelContext.isOpen()).thenReturn(true); + when(serverChannelContext.getSelector()).thenReturn(selector); + when(serverChannelContext.getSelectionKey()).thenReturn(selectionKey); } - public void testRegisterChannel() throws Exception { - socketSelector.scheduleForRegistration(channel); + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testQueueChannelForClosed() throws IOException { + NioChannel channel = mock(NioChannel.class); + ChannelContext context = mock(ChannelContext.class); + when(channel.getContext()).thenReturn(context); + when(context.getSelector()).thenReturn(selector); - socketSelector.preSelect(); + selector.queueChannelClose(channel); - verify(eventHandler).handleRegistration(channelContext); + selector.singleLoop(); + + verify(eventHandler).handleClose(context); } - public void testClosedChannelWillNotBeRegistered() throws Exception { + public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException { + boolean closedSelectorExceptionCaught = false; + when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException()); + try { + this.selector.singleLoop(); + } catch (ClosedSelectorException e) { + closedSelectorExceptionCaught = true; + } + + assertTrue(closedSelectorExceptionCaught); + } + + public void testIOExceptionWhileSelect() throws IOException { + IOException ioException = new IOException(); + + when(rawSelector.select(anyInt())).thenThrow(ioException); + + this.selector.singleLoop(); + + verify(eventHandler).selectorException(ioException); + } + + public void testSelectorClosedIfOpenAndEventLoopNotRunning() throws IOException { + when(rawSelector.isOpen()).thenReturn(true); + selector.close(); + verify(rawSelector).close(); + } + + public void testRegisteredChannel() throws IOException { + selector.scheduleForRegistration(serverChannel); + + selector.preSelect(); + + verify(eventHandler).handleRegistration(serverChannelContext); + } + + public void testClosedServerChannelWillNotBeRegistered() { + when(serverChannelContext.isOpen()).thenReturn(false); + selector.scheduleForRegistration(serverChannel); + + selector.preSelect(); + + verify(eventHandler).registrationException(same(serverChannelContext), any(ClosedChannelException.class)); + } + + public void testRegisterServerChannelFailsDueToException() throws Exception { + selector.scheduleForRegistration(serverChannel); + + ClosedChannelException closedChannelException = new ClosedChannelException(); + doThrow(closedChannelException).when(eventHandler).handleRegistration(serverChannelContext); + + selector.preSelect(); + + verify(eventHandler).registrationException(serverChannelContext, closedChannelException); + } + + public void testClosedSocketChannelWillNotBeRegistered() throws Exception { when(channelContext.isOpen()).thenReturn(false); - socketSelector.scheduleForRegistration(channel); + selector.scheduleForRegistration(channel); - socketSelector.preSelect(); + selector.preSelect(); verify(eventHandler).registrationException(same(channelContext), any(ClosedChannelException.class)); verify(eventHandler, times(0)).handleConnect(channelContext); } - public void testRegisterChannelFailsDueToException() throws Exception { - socketSelector.scheduleForRegistration(channel); + public void testRegisterSocketChannelFailsDueToException() throws Exception { + selector.scheduleForRegistration(channel); ClosedChannelException closedChannelException = new ClosedChannelException(); doThrow(closedChannelException).when(eventHandler).handleRegistration(channelContext); - socketSelector.preSelect(); + selector.preSelect(); verify(eventHandler).registrationException(channelContext, closedChannelException); verify(eventHandler, times(0)).handleConnect(channelContext); } - public void testSuccessfullyRegisterChannelWillAttemptConnect() throws Exception { - socketSelector.scheduleForRegistration(channel); + public void testAcceptEvent() throws IOException { + selectionKey.setReadyOps(SelectionKey.OP_ACCEPT); - socketSelector.preSelect(); + selectionKey.attach(serverChannelContext); + selector.processKey(selectionKey); + + verify(eventHandler).acceptChannel(serverChannelContext); + } + + public void testAcceptException() throws IOException { + selectionKey.setReadyOps(SelectionKey.OP_ACCEPT); + IOException ioException = new IOException(); + + doThrow(ioException).when(eventHandler).acceptChannel(serverChannelContext); + + selectionKey.attach(serverChannelContext); + selector.processKey(selectionKey); + + verify(eventHandler).acceptException(serverChannelContext, ioException); + } + + public void testRegisterChannel() throws Exception { + selector.scheduleForRegistration(channel); + + selector.preSelect(); + + verify(eventHandler).handleRegistration(channelContext); + } + + public void testSuccessfullyRegisterChannelWillAttemptConnect() throws Exception { + selector.scheduleForRegistration(channel); + + selector.preSelect(); verify(eventHandler).handleConnect(channelContext); } public void testQueueWriteWhenNotRunning() throws Exception { - socketSelector.close(); + selector.close(); - socketSelector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener)); + selector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener)); verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class)); } public void testQueueWriteChannelIsClosed() throws Exception { WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); - socketSelector.queueWrite(writeOperation); + selector.queueWrite(writeOperation); when(channelContext.isOpen()).thenReturn(false); - socketSelector.preSelect(); + selector.preSelect(); verify(channelContext, times(0)).queueWriteOperation(writeOperation); verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class)); @@ -138,11 +237,11 @@ public class SocketSelectorTests extends ESTestCase { WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); CancelledKeyException cancelledKeyException = new CancelledKeyException(); - socketSelector.queueWrite(writeOperation); + selector.queueWrite(writeOperation); when(channelContext.getSelectionKey()).thenReturn(selectionKey); when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException); - socketSelector.preSelect(); + selector.preSelect(); verify(channelContext, times(0)).queueWriteOperation(writeOperation); verify(listener).accept(null, cancelledKeyException); @@ -150,11 +249,11 @@ public class SocketSelectorTests extends ESTestCase { public void testQueueWriteSuccessful() throws Exception { WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); - socketSelector.queueWrite(writeOperation); + selector.queueWrite(writeOperation); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0); - socketSelector.preSelect(); + selector.preSelect(); verify(channelContext).queueWriteOperation(writeOperation); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0); @@ -165,7 +264,7 @@ public class SocketSelectorTests extends ESTestCase { assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0); - socketSelector.queueWriteInChannelBuffer(writeOperation); + selector.queueWriteInChannelBuffer(writeOperation); verify(channelContext).queueWriteOperation(writeOperation); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0); @@ -179,7 +278,7 @@ public class SocketSelectorTests extends ESTestCase { when(channelContext.getSelectionKey()).thenReturn(selectionKey); when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException); - socketSelector.queueWriteInChannelBuffer(writeOperation); + selector.queueWriteInChannelBuffer(writeOperation); verify(channelContext, times(0)).queueWriteOperation(writeOperation); verify(listener).accept(null, cancelledKeyException); @@ -188,7 +287,8 @@ public class SocketSelectorTests extends ESTestCase { public void testConnectEvent() throws Exception { selectionKey.setReadyOps(SelectionKey.OP_CONNECT); - socketSelector.processKey(selectionKey); + selectionKey.attach(channelContext); + selector.processKey(selectionKey); verify(eventHandler).handleConnect(channelContext); } @@ -199,7 +299,8 @@ public class SocketSelectorTests extends ESTestCase { selectionKey.setReadyOps(SelectionKey.OP_CONNECT); doThrow(ioException).when(eventHandler).handleConnect(channelContext); - socketSelector.processKey(selectionKey); + selectionKey.attach(channelContext); + selector.processKey(selectionKey); verify(eventHandler).connectException(channelContext, ioException); } @@ -212,7 +313,8 @@ public class SocketSelectorTests extends ESTestCase { doThrow(ioException).when(eventHandler).handleWrite(channelContext); when(channelContext.isConnectComplete()).thenReturn(false); - socketSelector.processKey(selectionKey); + selectionKey.attach(channelContext); + selector.processKey(selectionKey); verify(eventHandler, times(0)).handleWrite(channelContext); verify(eventHandler, times(0)).handleRead(channelContext); @@ -221,7 +323,8 @@ public class SocketSelectorTests extends ESTestCase { public void testSuccessfulWriteEvent() throws Exception { selectionKey.setReadyOps(SelectionKey.OP_WRITE); - socketSelector.processKey(selectionKey); + selectionKey.attach(channelContext); + selector.processKey(selectionKey); verify(eventHandler).handleWrite(channelContext); } @@ -229,11 +332,13 @@ public class SocketSelectorTests extends ESTestCase { public void testWriteEventWithException() throws Exception { IOException ioException = new IOException(); + selectionKey.attach(channelContext); selectionKey.setReadyOps(SelectionKey.OP_WRITE); doThrow(ioException).when(eventHandler).handleWrite(channelContext); - socketSelector.processKey(selectionKey); + selectionKey.attach(channelContext); + selector.processKey(selectionKey); verify(eventHandler).writeException(channelContext, ioException); } @@ -241,7 +346,8 @@ public class SocketSelectorTests extends ESTestCase { public void testSuccessfulReadEvent() throws Exception { selectionKey.setReadyOps(SelectionKey.OP_READ); - socketSelector.processKey(selectionKey); + selectionKey.attach(channelContext); + selector.processKey(selectionKey); verify(eventHandler).handleRead(channelContext); } @@ -253,7 +359,8 @@ public class SocketSelectorTests extends ESTestCase { doThrow(ioException).when(eventHandler).handleRead(channelContext); - socketSelector.processKey(selectionKey); + selectionKey.attach(channelContext); + selector.processKey(selectionKey); verify(eventHandler).readException(channelContext, ioException); } @@ -261,7 +368,8 @@ public class SocketSelectorTests extends ESTestCase { public void testWillCallPostHandleAfterChannelHandling() throws Exception { selectionKey.setReadyOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); - socketSelector.processKey(selectionKey); + selectionKey.attach(channelContext); + selector.processKey(selectionKey); verify(eventHandler).handleWrite(channelContext); verify(eventHandler).handleRead(channelContext); @@ -273,18 +381,18 @@ public class SocketSelectorTests extends ESTestCase { SocketChannelContext unregisteredContext = mock(SocketChannelContext.class); when(unregisteredChannel.getContext()).thenReturn(unregisteredContext); - socketSelector.scheduleForRegistration(channel); + selector.scheduleForRegistration(channel); - socketSelector.preSelect(); + selector.preSelect(); - socketSelector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener)); - socketSelector.scheduleForRegistration(unregisteredChannel); + selector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener)); + selector.scheduleForRegistration(unregisteredChannel); TestSelectionKey testSelectionKey = new TestSelectionKey(0); testSelectionKey.attach(channelContext); when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(testSelectionKey))); - socketSelector.cleanupAndCloseChannels(); + selector.cleanupAndCloseChannels(); verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class)); verify(eventHandler).handleClose(channelContext); @@ -295,7 +403,7 @@ public class SocketSelectorTests extends ESTestCase { RuntimeException exception = new RuntimeException(); doThrow(exception).when(listener).accept(null, null); - socketSelector.executeListener(listener, null); + selector.executeListener(listener, null); verify(eventHandler).listenerException(exception); } @@ -305,7 +413,7 @@ public class SocketSelectorTests extends ESTestCase { RuntimeException exception = new RuntimeException(); doThrow(exception).when(listener).accept(null, ioException); - socketSelector.executeFailedListener(listener, ioException); + selector.executeFailedListener(listener, ioException); verify(eventHandler).listenerException(exception); } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java index f27052ac5d5..fdb4a77b922 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java @@ -51,7 +51,7 @@ public class SocketChannelContextTests extends ESTestCase { private Consumer exceptionHandler; private NioSocketChannel channel; private BiConsumer listener; - private SocketSelector selector; + private NioSelector selector; private ReadWriteHandler readWriteHandler; @SuppressWarnings("unchecked") @@ -64,7 +64,7 @@ public class SocketChannelContextTests extends ESTestCase { listener = mock(BiConsumer.class); when(channel.getRawChannel()).thenReturn(rawChannel); exceptionHandler = mock(Consumer.class); - selector = mock(SocketSelector.class); + selector = mock(NioSelector.class); readWriteHandler = mock(ReadWriteHandler.class); InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance(); context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer); @@ -275,7 +275,7 @@ public class SocketChannelContextTests extends ESTestCase { private static class TestSocketChannelContext extends SocketChannelContext { - private TestSocketChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer exceptionHandler, + private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index ce0ed83aad4..495dbf97293 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -47,10 +47,9 @@ import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpStats; import org.elasticsearch.http.nio.cors.NioCorsConfig; import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder; -import org.elasticsearch.nio.AcceptingSelector; -import org.elasticsearch.nio.AcceptorEventHandler; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; +import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioChannel; import org.elasticsearch.nio.NioGroup; @@ -58,8 +57,7 @@ import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.nio.SocketChannelContext; -import org.elasticsearch.nio.SocketEventHandler; -import org.elasticsearch.nio.SocketSelector; +import org.elasticsearch.nio.NioSelector; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -180,9 +178,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { int acceptorCount = NIO_HTTP_ACCEPTOR_COUNT.get(settings); int workerCount = NIO_HTTP_WORKER_COUNT.get(settings); nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - (s) -> new AcceptorEventHandler(s, this::nonChannelExceptionCaught), daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), workerCount, - () -> new SocketEventHandler(this::nonChannelExceptionCaught)); + (s) -> new EventHandler(this::nonChannelExceptionCaught, s)); channelFactory = new HttpChannelFactory(); this.boundAddress = createBoundHttpAddress(); @@ -360,7 +357,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { } @Override - public NioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException { + public NioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { NioSocketChannel nioChannel = new NioSocketChannel(channel); HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this, httpHandlingSettings, xContentRegistry, corsConfig, threadPool.getThreadContext()); @@ -372,7 +369,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { } @Override - public NioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { + public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel); Consumer exceptionHandler = (e) -> logger.error(() -> new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 2ef49d77912..b85d707dcd9 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -31,16 +31,14 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.nio.AcceptingSelector; -import org.elasticsearch.nio.AcceptorEventHandler; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.ChannelFactory; +import org.elasticsearch.nio.EventHandler; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; -import org.elasticsearch.nio.SocketEventHandler; -import org.elasticsearch.nio.SocketSelector; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; @@ -55,23 +53,18 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import java.util.function.Supplier; -import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; public class NioTransport extends TcpTransport { private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; - private static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", (s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2), (s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope); - public static final Setting NIO_ACCEPTOR_COUNT = - intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); - protected final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private volatile NioGroup nioGroup; @@ -101,20 +94,13 @@ public class NioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - int acceptorCount = 0; - boolean useNetworkServer = NetworkService.NETWORK_SERVER.get(settings); - if (useNetworkServer) { - acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); - } - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - (s) -> new AcceptorEventHandler(s, this::onNonChannelException), - daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), NioTransport.NIO_WORKER_COUNT.get(settings), - () -> new SocketEventHandler(this::onNonChannelException)); + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), + NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = channelFactory(clientProfileSettings, true); - if (useNetworkServer) { + if (NetworkService.NETWORK_SERVER.get(settings)) { // loop through all profiles and start them up, special handling for default one for (ProfileSettings profileSettings : profileSettings) { String profileName = profileSettings.profileName; @@ -178,7 +164,7 @@ public class NioTransport extends TcpTransport { } @Override - public TcpNioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException { + public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { TcpNioSocketChannel nioChannel = new TcpNioSocketChannel(profileName, channel); Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); @@ -193,7 +179,7 @@ public class NioTransport extends TcpTransport { } @Override - public TcpNioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { + public TcpNioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel); Consumer exceptionHandler = (e) -> logger.error(() -> new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java index 422e3e9b833..1cc94f18dd3 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -50,8 +50,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin { return Arrays.asList( NioHttpServerTransport.NIO_HTTP_ACCEPTOR_COUNT, NioHttpServerTransport.NIO_HTTP_WORKER_COUNT, - NioTransport.NIO_WORKER_COUNT, - NioTransport.NIO_ACCEPTOR_COUNT + NioTransport.NIO_WORKER_COUNT ); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java index c63acc9f4de..946563225c6 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java @@ -21,8 +21,6 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.nio.AcceptingSelector; -import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.transport.TcpChannel; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java index 44ab17457e8..ef2bc875aa9 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java @@ -22,7 +22,6 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.nio.NioSocketChannel; -import org.elasticsearch.nio.SocketSelector; import org.elasticsearch.transport.TcpChannel; import java.io.IOException; diff --git a/server/src/main/java/org/elasticsearch/transport/Transports.java b/server/src/main/java/org/elasticsearch/transport/Transports.java index d07846835c2..2adf07bad23 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transports.java +++ b/server/src/main/java/org/elasticsearch/transport/Transports.java @@ -30,7 +30,6 @@ public enum Transports { public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker"; - public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor"; /** * Utility method to detect whether a thread is a network thread. Typically @@ -44,8 +43,7 @@ public enum Transports { TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, TEST_MOCK_TRANSPORT_THREAD_PREFIX, - NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX, - NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) { + NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX)) { if (threadName.contains(s)) { return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 9481f60d933..cb9e243660a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -30,17 +30,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.nio.AcceptingSelector; -import org.elasticsearch.nio.AcceptorEventHandler; import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.BytesWriteHandler; import org.elasticsearch.nio.ChannelFactory; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; -import org.elasticsearch.nio.SocketSelector; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; @@ -62,7 +60,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class MockNioTransport extends TcpTransport { private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; - private static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; private final PageCacheRecycler pageCacheRecycler; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); @@ -93,20 +90,13 @@ public class MockNioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - int acceptorCount = 0; - boolean useNetworkServer = NetworkService.NETWORK_SERVER.get(settings); - if (useNetworkServer) { - acceptorCount = 1; - } - nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - (s) -> new AcceptorEventHandler(s, this::onNonChannelException), - daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, - () -> new TestingSocketEventHandler(this::onNonChannelException)); + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, + (s) -> new TestingSocketEventHandler(this::onNonChannelException, s)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = new MockTcpChannelFactory(clientProfileSettings, "client"); - if (useNetworkServer) { + if (NetworkService.NETWORK_SERVER.get(settings)) { // loop through all profiles and start them up, special handling for default one for (ProfileSettings profileSettings : profileSettings) { String profileName = profileSettings.profileName; @@ -159,7 +149,7 @@ public class MockNioTransport extends TcpTransport { } @Override - public MockSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException { + public MockSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { MockSocketChannel nioChannel = new MockSocketChannel(profileName, channel, selector); Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); @@ -173,7 +163,7 @@ public class MockNioTransport extends TcpTransport { } @Override - public MockServerChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { + public MockServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { MockServerChannel nioServerChannel = new MockServerChannel(profileName, channel, this, selector); Consumer exceptionHandler = (e) -> logger.error(() -> new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); @@ -205,7 +195,7 @@ public class MockNioTransport extends TcpTransport { private final String profile; - MockServerChannel(String profile, ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) + MockServerChannel(String profile, ServerSocketChannel channel, ChannelFactory channelFactory, NioSelector selector) throws IOException { super(channel); this.profile = profile; @@ -246,7 +236,7 @@ public class MockNioTransport extends TcpTransport { private final String profile; - private MockSocketChannel(String profile, java.nio.channels.SocketChannel socketChannel, SocketSelector selector) + private MockSocketChannel(String profile, java.nio.channels.SocketChannel socketChannel, NioSelector selector) throws IOException { super(socketChannel); this.profile = profile; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java index 810e4201022..cecd3c60613 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java @@ -19,21 +19,23 @@ package org.elasticsearch.transport.nio; +import org.elasticsearch.nio.EventHandler; +import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.SocketChannelContext; -import org.elasticsearch.nio.SocketEventHandler; import java.io.IOException; import java.util.Collections; import java.util.Set; import java.util.WeakHashMap; import java.util.function.Consumer; +import java.util.function.Supplier; -public class TestingSocketEventHandler extends SocketEventHandler { +public class TestingSocketEventHandler extends EventHandler { private Set hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>()); - public TestingSocketEventHandler(Consumer exceptionHandler) { - super(exceptionHandler); + public TestingSocketEventHandler(Consumer exceptionHandler, Supplier selectorSupplier) { + super(exceptionHandler, selectorSupplier); } public void handleConnect(SocketChannelContext context) throws IOException { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java index 171507de741..95af7665157 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java @@ -11,7 +11,7 @@ import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ReadWriteHandler; import org.elasticsearch.nio.SocketChannelContext; -import org.elasticsearch.nio.SocketSelector; +import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.WriteOperation; import java.io.IOException; @@ -28,7 +28,7 @@ public final class SSLChannelContext extends SocketChannelContext { private final SSLDriver sslDriver; - SSLChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer exceptionHandler, SSLDriver sslDriver, + SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, SSLDriver sslDriver, ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer); this.sslDriver = sslDriver; @@ -140,7 +140,7 @@ public final class SSLChannelContext extends SocketChannelContext { public void closeChannel() { if (isClosing.compareAndSet(false, true)) { WriteOperation writeOperation = new CloseNotifyOperation(this); - SocketSelector selector = getSelector(); + NioSelector selector = getSelector(); if (selector.isOnCurrentThread() == false) { selector.queueWrite(writeOperation); return; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 1c9d779c2cc..39ce1a0150c 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -13,11 +13,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.nio.AcceptingSelector; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; -import org.elasticsearch.nio.SocketSelector; +import org.elasticsearch.nio.NioSelector; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.NioTransport; @@ -117,7 +116,7 @@ public class SecurityNioTransport extends NioTransport { } @Override - public TcpNioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException { + public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { SSLConfiguration defaultConfig = profileConfiguration.get(TcpTransport.DEFAULT_PROFILE); SSLEngine sslEngine = sslService.createSSLEngine(profileConfiguration.getOrDefault(profileName, defaultConfig), null, -1); SSLDriver sslDriver = new SSLDriver(sslEngine, isClient); @@ -136,7 +135,7 @@ public class SecurityNioTransport extends NioTransport { } @Override - public TcpNioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { + public TcpNioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel); Consumer exceptionHandler = (e) -> logger.error(() -> new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java index 168dcd64e6c..14a22d300d1 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java @@ -11,7 +11,7 @@ import org.elasticsearch.nio.BytesWriteHandler; import org.elasticsearch.nio.FlushReadyWrite; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSocketChannel; -import org.elasticsearch.nio.SocketSelector; +import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.WriteOperation; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -40,7 +40,7 @@ public class SSLChannelContextTests extends ESTestCase { private SocketChannel rawChannel; private SSLChannelContext context; private InboundChannelBuffer channelBuffer; - private SocketSelector selector; + private NioSelector selector; private BiConsumer listener; private Consumer exceptionHandler; private SSLDriver sslDriver; @@ -55,7 +55,7 @@ public class SSLChannelContextTests extends ESTestCase { TestReadWriteHandler readWriteHandler = new TestReadWriteHandler(readConsumer); messageLength = randomInt(96) + 20; - selector = mock(SocketSelector.class); + selector = mock(NioSelector.class); listener = mock(BiConsumer.class); channel = mock(NioSocketChannel.class); rawChannel = mock(SocketChannel.class);