Combine accepting selector and socket selector (#31115)

This is related to #27260. This commit combines the AcceptingSelector
and SocketSelector classes into a single NioSelector. This change
allows the same selector to handle both server and socket channels. This
is valuable as we do not necessarily want a dedicated thread running for
accepting channels.

With this change, this commit removes the configuration for dedicated
accepting selectors for the normal transport class. The accepting
workload for new node connections is likely low, meaning that there is
no need to dedicate a thread to this process.
This commit is contained in:
Tim Brooks 2018-06-06 11:59:54 -06:00 committed by GitHub
parent dc4bb62a78
commit 67e73b4df4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 988 additions and 1393 deletions

View File

@ -1,98 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
/**
* Selector implementation that handles {@link NioServerSocketChannel}. It's main piece of functionality is
* accepting new channels.
*/
public class AcceptingSelector extends ESSelector {
private final AcceptorEventHandler eventHandler;
private final ConcurrentLinkedQueue<NioServerSocketChannel> newChannels = new ConcurrentLinkedQueue<>();
public AcceptingSelector(AcceptorEventHandler eventHandler) throws IOException {
super(eventHandler);
this.eventHandler = eventHandler;
}
public AcceptingSelector(AcceptorEventHandler eventHandler, Selector selector) throws IOException {
super(eventHandler, selector);
this.eventHandler = eventHandler;
}
@Override
void processKey(SelectionKey selectionKey) {
ServerChannelContext channelContext = (ServerChannelContext) selectionKey.attachment();
if (selectionKey.isAcceptable()) {
try {
eventHandler.acceptChannel(channelContext);
} catch (IOException e) {
eventHandler.acceptException(channelContext, e);
}
}
}
@Override
void preSelect() {
setUpNewServerChannels();
}
@Override
void cleanup() {
channelsToClose.addAll(newChannels.stream().map(NioServerSocketChannel::getContext).collect(Collectors.toList()));
}
/**
* Schedules a NioServerSocketChannel to be registered with this selector. The channel will by queued and
* eventually registered next time through the event loop.
*
* @param serverSocketChannel the channel to register
*/
public void scheduleForRegistration(NioServerSocketChannel serverSocketChannel) {
newChannels.add(serverSocketChannel);
ensureSelectorOpenForEnqueuing(newChannels, serverSocketChannel);
wakeup();
}
private void setUpNewServerChannels() {
NioServerSocketChannel newChannel;
while ((newChannel = this.newChannels.poll()) != null) {
ServerChannelContext context = newChannel.getContext();
assert context.getSelector() == this : "The channel must be registered with the selector with which it was created";
try {
if (context.isOpen()) {
eventHandler.handleRegistration(context);
} else {
eventHandler.registrationException(context, new ClosedChannelException());
}
} catch (Exception e) {
eventHandler.registrationException(context, e);
}
}
}
}

View File

@ -1,81 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* Event handler designed to handle events from server sockets
*/
public class AcceptorEventHandler extends EventHandler {
private final Supplier<SocketSelector> selectorSupplier;
public AcceptorEventHandler(Supplier<SocketSelector> selectorSupplier, Consumer<Exception> exceptionHandler) {
super(exceptionHandler);
this.selectorSupplier = selectorSupplier;
}
/**
* This method is called when a NioServerSocketChannel is being registered with the selector. It should
* only be called once per channel.
*
* @param context that was registered
*/
protected void handleRegistration(ServerChannelContext context) throws IOException {
context.register();
SelectionKey selectionKey = context.getSelectionKey();
selectionKey.attach(context);
SelectionKeyUtils.setAcceptInterested(selectionKey);
}
/**
* This method is called when an attempt to register a server channel throws an exception.
*
* @param context that was registered
* @param exception that occurred
*/
protected void registrationException(ServerChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a server channel signals it is ready to accept a connection. All of the
* accept logic should occur in this call.
*
* @param context that can accept a connection
*/
protected void acceptChannel(ServerChannelContext context) throws IOException {
context.acceptChannels(selectorSupplier);
}
/**
* This method is called when an attempt to accept a connection throws an exception.
*
* @param context that accepting a connection
* @param exception that occurred
*/
protected void acceptException(ServerChannelContext context, Exception exception) {
context.handleException(exception);
}
}

View File

@ -24,7 +24,7 @@ import java.util.function.Consumer;
public class BytesChannelContext extends SocketChannelContext {
public BytesChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer<Exception> exceptionHandler,
public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler handler, InboundChannelBuffer channelBuffer) {
super(channel, selector, exceptionHandler, handler, channelBuffer);
}

View File

@ -105,7 +105,7 @@ public abstract class ChannelContext<S extends SelectableChannel & NetworkChanne
*/
public abstract void closeChannel();
public abstract ESSelector getSelector();
public abstract NioSelector getSelector();
public abstract NioChannel getChannel();

View File

@ -42,30 +42,30 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
this.rawChannelFactory = rawChannelFactory;
}
public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<SocketSelector> supplier) throws IOException {
public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<NioSelector> supplier) throws IOException {
SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
SocketSelector selector = supplier.get();
NioSelector selector = supplier.get();
Socket channel = internalCreateChannel(selector, rawChannel);
scheduleChannel(channel, selector);
return channel;
}
public Socket acceptNioChannel(ServerChannelContext serverContext, Supplier<SocketSelector> supplier) throws IOException {
public Socket acceptNioChannel(ServerChannelContext serverContext, Supplier<NioSelector> supplier) throws IOException {
SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverContext);
// Null is returned if there are no pending sockets to accept
if (rawChannel == null) {
return null;
} else {
SocketSelector selector = supplier.get();
NioSelector selector = supplier.get();
Socket channel = internalCreateChannel(selector, rawChannel);
scheduleChannel(channel, selector);
return channel;
}
}
public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<AcceptingSelector> supplier) throws IOException {
public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<NioSelector> supplier) throws IOException {
ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);
AcceptingSelector selector = supplier.get();
NioSelector selector = supplier.get();
ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel);
scheduleServerChannel(serverChannel, selector);
return serverChannel;
@ -81,7 +81,7 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
* @return the channel
* @throws IOException related to the creation of the channel
*/
public abstract Socket createChannel(SocketSelector selector, SocketChannel channel) throws IOException;
public abstract Socket createChannel(NioSelector selector, SocketChannel channel) throws IOException;
/**
* This method should return a new {@link NioServerSocketChannel} implementation. When this method has
@ -92,9 +92,9 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
* @return the server channel
* @throws IOException related to the creation of the channel
*/
public abstract ServerSocket createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException;
public abstract ServerSocket createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException;
private Socket internalCreateChannel(SocketSelector selector, SocketChannel rawChannel) throws IOException {
private Socket internalCreateChannel(NioSelector selector, SocketChannel rawChannel) throws IOException {
try {
Socket channel = createChannel(selector, rawChannel);
assert channel.getContext() != null : "channel context should have been set on channel";
@ -105,7 +105,7 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
}
}
private ServerSocket internalCreateServerChannel(AcceptingSelector selector, ServerSocketChannel rawChannel) throws IOException {
private ServerSocket internalCreateServerChannel(NioSelector selector, ServerSocketChannel rawChannel) throws IOException {
try {
return createServerChannel(selector, rawChannel);
} catch (Exception e) {
@ -114,7 +114,7 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
}
}
private void scheduleChannel(Socket channel, SocketSelector selector) {
private void scheduleChannel(Socket channel, NioSelector selector) {
try {
selector.scheduleForRegistration(channel);
} catch (IllegalStateException e) {
@ -123,7 +123,7 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
}
}
private void scheduleServerChannel(ServerSocket channel, AcceptingSelector selector) {
private void scheduleServerChannel(ServerSocket channel, NioSelector selector) {
try {
selector.scheduleForRegistration(channel);
} catch (IllegalStateException e) {

View File

@ -1,248 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* This is a basic selector abstraction. This selector wraps a raw nio {@link Selector}. When you call
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
* this selector.
* <p>
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
* {@link #preSelect()}, and {@link #cleanup()} functionality.
*/
public abstract class ESSelector implements Closeable {
final Selector selector;
final ConcurrentLinkedQueue<ChannelContext<?>> channelsToClose = new ConcurrentLinkedQueue<>();
private final EventHandler eventHandler;
private final ReentrantLock runLock = new ReentrantLock();
private final CountDownLatch exitedLoop = new CountDownLatch(1);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final CompletableFuture<Void> isRunningFuture = new CompletableFuture<>();
private volatile Thread thread;
ESSelector(EventHandler eventHandler) throws IOException {
this(eventHandler, Selector.open());
}
ESSelector(EventHandler eventHandler, Selector selector) {
this.eventHandler = eventHandler;
this.selector = selector;
}
/**
* Starts this selector. The selector will run until {@link #close()} is called.
*/
public void runLoop() {
if (runLock.tryLock()) {
isRunningFuture.complete(null);
try {
setThread();
while (isOpen()) {
singleLoop();
}
} finally {
try {
cleanupAndCloseChannels();
} finally {
try {
selector.close();
} catch (IOException e) {
eventHandler.selectorException(e);
} finally {
runLock.unlock();
exitedLoop.countDown();
}
}
}
} else {
throw new IllegalStateException("selector is already running");
}
}
void singleLoop() {
try {
closePendingChannels();
preSelect();
int ready = selector.select(300);
if (ready > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = keyIterator.next();
keyIterator.remove();
if (sk.isValid()) {
try {
processKey(sk);
} catch (CancelledKeyException cke) {
eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), cke);
}
} else {
eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), new CancelledKeyException());
}
}
}
} catch (ClosedSelectorException e) {
if (isOpen()) {
throw e;
}
} catch (IOException e) {
eventHandler.selectorException(e);
} catch (Exception e) {
eventHandler.uncaughtException(e);
}
}
void cleanupAndCloseChannels() {
cleanup();
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
closePendingChannels();
}
/**
* Called by the base {@link ESSelector} class when there is a {@link SelectionKey} to be handled.
*
* @param selectionKey the key to be handled
* @throws CancelledKeyException thrown when the key has already been cancelled
*/
abstract void processKey(SelectionKey selectionKey) throws CancelledKeyException;
/**
* Called immediately prior to a raw {@link Selector#select()} call. Should be used to implement
* channel registration, handling queued writes, and other work that is not specifically processing
* a selection key.
*/
abstract void preSelect();
/**
* Called once as the selector is being closed.
*/
abstract void cleanup();
void setThread() {
thread = Thread.currentThread();
}
public boolean isOnCurrentThread() {
return Thread.currentThread() == thread;
}
public void assertOnSelectorThread() {
assert isOnCurrentThread() : "Must be on selector thread to perform this operation. Currently on thread ["
+ Thread.currentThread().getName() + "].";
}
void wakeup() {
// TODO: Do we need the wakeup optimizations that some other libraries use?
selector.wakeup();
}
@Override
public void close() throws IOException {
if (isClosed.compareAndSet(false, true)) {
wakeup();
if (isRunning()) {
try {
exitedLoop.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Thread was interrupted while waiting for selector to close", e);
}
} else if (selector.isOpen()) {
selector.close();
}
}
}
public void queueChannelClose(NioChannel channel) {
ChannelContext<?> context = channel.getContext();
assert context.getSelector() == this : "Must schedule a channel for closure with its selector";
channelsToClose.offer(context);
ensureSelectorOpenForEnqueuing(channelsToClose, context);
wakeup();
}
public Selector rawSelector() {
return selector;
}
public boolean isOpen() {
return isClosed.get() == false;
}
public boolean isRunning() {
return runLock.isLocked();
}
public Future<Void> isRunningFuture() {
return isRunningFuture;
}
/**
* This is a convenience method to be called after some object (normally channels) are enqueued with this
* selector. This method will check if the selector is still open. If it is open, normal operation can
* proceed.
*
* If the selector is closed, then we attempt to remove the object from the queue. If the removal
* succeeds then we throw an {@link IllegalStateException} indicating that normal operation failed. If
* the object cannot be removed from the queue, then the object has already been handled by the selector
* and operation can proceed normally.
*
* If this method is called from the selector thread, we will not throw an exception as the selector
* thread can manipulate its queues internally even if it is no longer open.
*
* @param queue the queue to which the object was added
* @param objectAdded the objected added
* @param <O> the object type
*/
<O> void ensureSelectorOpenForEnqueuing(ConcurrentLinkedQueue<O> queue, O objectAdded) {
if (isOpen() == false && isOnCurrentThread() == false) {
if (queue.remove(objectAdded)) {
throw new IllegalStateException("selector is already closed");
}
}
}
private void closePendingChannels() {
ChannelContext<?> channelContext;
while ((channelContext = channelsToClose.poll()) != null) {
eventHandler.handleClose(channelContext);
}
}
}

View File

@ -20,15 +20,163 @@
package org.elasticsearch.nio;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.function.Consumer;
import java.util.function.Supplier;
public abstract class EventHandler {
public class EventHandler {
protected final Consumer<Exception> exceptionHandler;
private final Supplier<NioSelector> selectorSupplier;
protected EventHandler(Consumer<Exception> exceptionHandler) {
public EventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier) {
this.exceptionHandler = exceptionHandler;
this.selectorSupplier = selectorSupplier;
}
/**
* This method is called when a server channel signals it is ready to accept a connection. All of the
* accept logic should occur in this call.
*
* @param context that can accept a connection
*/
protected void acceptChannel(ServerChannelContext context) throws IOException {
context.acceptChannels(selectorSupplier);
}
/**
* This method is called when an attempt to accept a connection throws an exception.
*
* @param context that accepting a connection
* @param exception that occurred
*/
protected void acceptException(ServerChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a NioChannel is being registered with the selector. It should
* only be called once per channel.
*
* @param context that was registered
*/
protected void handleRegistration(ChannelContext<?> context) throws IOException {
context.register();
SelectionKey selectionKey = context.getSelectionKey();
selectionKey.attach(context);
if (context instanceof SocketChannelContext) {
if (((SocketChannelContext) context).readyForFlush()) {
SelectionKeyUtils.setConnectReadAndWriteInterested(context.getSelectionKey());
} else {
SelectionKeyUtils.setConnectAndReadInterested(context.getSelectionKey());
}
} else {
assert context instanceof ServerChannelContext : "If not SocketChannelContext the context must be a ServerChannelContext";
SelectionKeyUtils.setAcceptInterested(context.getSelectionKey());
}
}
/**
* This method is called when an attempt to register a channel throws an exception.
*
* @param context that was registered
* @param exception that occurred
*/
protected void registrationException(ChannelContext<?> context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a NioSocketChannel has just been accepted or if it has receive an
* OP_CONNECT event.
*
* @param context that was registered
*/
protected void handleConnect(SocketChannelContext context) throws IOException {
if (context.connect()) {
SelectionKeyUtils.removeConnectInterested(context.getSelectionKey());
}
}
/**
* This method is called when an attempt to connect a channel throws an exception.
*
* @param context that was connecting
* @param exception that occurred
*/
protected void connectException(SocketChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a channel signals it is ready for be read. All of the read logic should
* occur in this call.
*
* @param context that can be read
*/
protected void handleRead(SocketChannelContext context) throws IOException {
context.read();
}
/**
* This method is called when an attempt to read from a channel throws an exception.
*
* @param context that was being read
* @param exception that occurred
*/
protected void readException(SocketChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a channel signals it is ready to receive writes. All of the write logic
* should occur in this call.
*
* @param context that can be written to
*/
protected void handleWrite(SocketChannelContext context) throws IOException {
context.flushChannel();
}
/**
* This method is called when an attempt to write to a channel throws an exception.
*
* @param context that was being written to
* @param exception that occurred
*/
protected void writeException(SocketChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a listener attached to a channel operation throws an exception.
*
* @param exception that occurred
*/
protected void listenerException(Exception exception) {
exceptionHandler.accept(exception);
}
/**
* This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a
* channel.
*
* @param context that was handled
*/
protected void postHandling(SocketChannelContext context) {
if (context.selectorShouldClose()) {
handleClose(context);
} else {
SelectionKey selectionKey = context.getSelectionKey();
boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey);
boolean pendingWrites = context.readyForFlush();
if (currentlyWriteInterested == false && pendingWrites) {
SelectionKeyUtils.setWriteInterested(selectionKey);
} else if (currentlyWriteInterested && pendingWrites == false) {
SelectionKeyUtils.removeWriteInterested(selectionKey);
}
}
}
/**

View File

@ -22,10 +22,11 @@ package org.elasticsearch.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SocketChannel;
import java.util.function.BiConsumer;
/**
* This is a basic channel abstraction used by the {@link ESSelector}.
* This is a basic channel abstraction used by the {@link NioSelector}.
* <p>
* A channel is open once it is constructed. The channel remains open and {@link #isOpen()} will return
* true until the channel is explicitly closed.

View File

@ -35,10 +35,9 @@ import java.util.stream.Stream;
/**
* The NioGroup is a group of selectors for interfacing with java nio. When it is started it will create the
* configured number of socket and acceptor selectors. Each selector will be running in a dedicated thread.
* Server connections can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)}
* method. Client connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)}
* method.
* configured number of selectors. Each selector will be running in a dedicated thread. Server connections
* can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)} method. Client
* connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)} method.
* <p>
* The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method
* when the channel is created. This is what allows an NioGroup to support different channel types.
@ -46,35 +45,75 @@ import java.util.stream.Stream;
public class NioGroup implements AutoCloseable {
private final ArrayList<AcceptingSelector> acceptors;
private final RoundRobinSupplier<AcceptingSelector> acceptorSupplier;
private final List<NioSelector> dedicatedAcceptors;
private final RoundRobinSupplier<NioSelector> acceptorSupplier;
private final ArrayList<SocketSelector> socketSelectors;
private final RoundRobinSupplier<SocketSelector> socketSelectorSupplier;
private final List<NioSelector> selectors;
private final RoundRobinSupplier<NioSelector> selectorSupplier;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
public NioGroup(ThreadFactory acceptorThreadFactory, int acceptorCount,
Function<Supplier<SocketSelector>, AcceptorEventHandler> acceptorEventHandlerFunction,
ThreadFactory socketSelectorThreadFactory, int socketSelectorCount,
Supplier<SocketEventHandler> socketEventHandlerFunction) throws IOException {
acceptors = new ArrayList<>(acceptorCount);
socketSelectors = new ArrayList<>(socketSelectorCount);
/**
* This will create an NioGroup with no dedicated acceptors. All server channels will be handled by the
* same selectors that are handling child channels.
*
* @param threadFactory factory to create selector threads
* @param selectorCount the number of selectors to be created
* @param eventHandlerFunction function for creating event handlers
* @throws IOException occurs if there is a problem while opening a java.nio.Selector
*/
public NioGroup(ThreadFactory threadFactory, int selectorCount, Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction)
throws IOException {
this(null, 0, threadFactory, selectorCount, eventHandlerFunction);
}
/**
* This will create an NioGroup with dedicated acceptors. All server channels will be handled by a group
* of selectors dedicated to accepting channels. These accepted channels will be handed off the
* non-server selectors.
*
* @param acceptorThreadFactory factory to create acceptor selector threads
* @param dedicatedAcceptorCount the number of dedicated acceptor selectors to be created
* @param selectorThreadFactory factory to create non-acceptor selector threads
* @param selectorCount the number of non-acceptor selectors to be created
* @param eventHandlerFunction function for creating event handlers
* @throws IOException occurs if there is a problem while opening a java.nio.Selector
*/
public NioGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount,
Function<Supplier<NioSelector>, EventHandler> eventHandlerFunction) throws IOException {
dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);
selectors = new ArrayList<>(selectorCount);
try {
for (int i = 0; i < socketSelectorCount; ++i) {
SocketSelector selector = new SocketSelector(socketEventHandlerFunction.get());
socketSelectors.add(selector);
List<RoundRobinSupplier<NioSelector>> suppliersToSet = new ArrayList<>(selectorCount);
for (int i = 0; i < selectorCount; ++i) {
RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>();
suppliersToSet.add(supplier);
NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));
selectors.add(selector);
}
for (RoundRobinSupplier<NioSelector> supplierToSet : suppliersToSet) {
supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));
assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";
}
startSelectors(socketSelectors, socketSelectorThreadFactory);
for (int i = 0; i < acceptorCount; ++i) {
SocketSelector[] childSelectors = this.socketSelectors.toArray(new SocketSelector[this.socketSelectors.size()]);
Supplier<SocketSelector> selectorSupplier = new RoundRobinSupplier<>(childSelectors);
AcceptingSelector acceptor = new AcceptingSelector(acceptorEventHandlerFunction.apply(selectorSupplier));
acceptors.add(acceptor);
for (int i = 0; i < dedicatedAcceptorCount; ++i) {
RoundRobinSupplier<NioSelector> supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));
dedicatedAcceptors.add(acceptor);
}
startSelectors(acceptors, acceptorThreadFactory);
if (dedicatedAcceptorCount != 0) {
acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));
} else {
acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
}
selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";
assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";
startSelectors(selectors, selectorThreadFactory);
startSelectors(dedicatedAcceptors, acceptorThreadFactory);
} catch (Exception e) {
try {
close();
@ -83,31 +122,25 @@ public class NioGroup implements AutoCloseable {
}
throw e;
}
socketSelectorSupplier = new RoundRobinSupplier<>(socketSelectors.toArray(new SocketSelector[socketSelectors.size()]));
acceptorSupplier = new RoundRobinSupplier<>(acceptors.toArray(new AcceptingSelector[acceptors.size()]));
}
public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
throws IOException {
ensureOpen();
if (acceptors.isEmpty()) {
throw new IllegalArgumentException("There are no acceptors configured. Without acceptors, server channels are not supported.");
}
return factory.openNioServerSocketChannel(address, acceptorSupplier);
}
public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
ensureOpen();
return factory.openNioChannel(address, socketSelectorSupplier);
return factory.openNioChannel(address, selectorSupplier);
}
@Override
public void close() throws IOException {
if (isOpen.compareAndSet(true, false)) {
List<ESSelector> toClose = Stream.concat(acceptors.stream(), socketSelectors.stream()).collect(Collectors.toList());
List<NioSelector> toClose = Stream.concat(dedicatedAcceptors.stream(), selectors.stream()).collect(Collectors.toList());
List<IOException> closingExceptions = new ArrayList<>();
for (ESSelector selector : toClose) {
for (NioSelector selector : toClose) {
try {
selector.close();
} catch (IOException e) {
@ -118,12 +151,12 @@ public class NioGroup implements AutoCloseable {
}
}
private static <S extends ESSelector> void startSelectors(Iterable<S> selectors, ThreadFactory threadFactory) {
for (ESSelector acceptor : selectors) {
if (acceptor.isRunning() == false) {
threadFactory.newThread(acceptor::runLoop).start();
private static void startSelectors(Iterable<NioSelector> selectors, ThreadFactory threadFactory) {
for (NioSelector selector : selectors) {
if (selector.isRunning() == false) {
threadFactory.newThread(selector::runLoop).start();
try {
acceptor.isRunningFuture().get();
selector.isRunningFuture().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while waiting for selector to start.", e);

View File

@ -0,0 +1,428 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* This is a nio selector implementation. This selector wraps a raw nio {@link Selector}. When you call
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
* this selector.
* <p>
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
* {@link #preSelect()}, and {@link #cleanup()} functionality.
*/
public class NioSelector implements Closeable {
private final ConcurrentLinkedQueue<WriteOperation> queuedWrites = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ChannelContext<?>> channelsToClose = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ChannelContext<?>> channelsToRegister = new ConcurrentLinkedQueue<>();
private final EventHandler eventHandler;
private final Selector selector;
private final ReentrantLock runLock = new ReentrantLock();
private final CountDownLatch exitedLoop = new CountDownLatch(1);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final CompletableFuture<Void> isRunningFuture = new CompletableFuture<>();
private final AtomicReference<Thread> thread = new AtomicReference<>(null);
public NioSelector(EventHandler eventHandler) throws IOException {
this(eventHandler, Selector.open());
}
public NioSelector(EventHandler eventHandler, Selector selector) throws IOException {
this.selector = selector;
this.eventHandler = eventHandler;
}
public Selector rawSelector() {
return selector;
}
public boolean isOpen() {
return isClosed.get() == false;
}
public boolean isRunning() {
return runLock.isLocked();
}
Future<Void> isRunningFuture() {
return isRunningFuture;
}
void setThread() {
boolean wasSet = thread.compareAndSet(null, Thread.currentThread());
assert wasSet : "Failed to set thread as it was already set. Should only set once.";
}
public boolean isOnCurrentThread() {
return Thread.currentThread() == thread.get();
}
public void assertOnSelectorThread() {
assert isOnCurrentThread() : "Must be on selector thread [" + thread.get().getName() + "} to perform this operation. " +
"Currently on thread [" + Thread.currentThread().getName() + "].";
}
/**
* Starts this selector. The selector will run until {@link #close()} is called.
*/
public void runLoop() {
if (runLock.tryLock()) {
isRunningFuture.complete(null);
try {
setThread();
while (isOpen()) {
singleLoop();
}
} finally {
try {
cleanupAndCloseChannels();
} finally {
try {
selector.close();
} catch (IOException e) {
eventHandler.selectorException(e);
} finally {
runLock.unlock();
exitedLoop.countDown();
}
}
}
} else {
throw new IllegalStateException("selector is already running");
}
}
void singleLoop() {
try {
closePendingChannels();
preSelect();
int ready = selector.select(300);
if (ready > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = keyIterator.next();
keyIterator.remove();
if (sk.isValid()) {
try {
processKey(sk);
} catch (CancelledKeyException cke) {
eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), cke);
}
} else {
eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), new CancelledKeyException());
}
}
}
} catch (ClosedSelectorException e) {
if (isOpen()) {
throw e;
}
} catch (IOException e) {
eventHandler.selectorException(e);
} catch (Exception e) {
eventHandler.uncaughtException(e);
}
}
void cleanupAndCloseChannels() {
cleanup();
channelsToClose.addAll(channelsToRegister);
channelsToRegister.clear();
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
closePendingChannels();
}
@Override
public void close() throws IOException {
if (isClosed.compareAndSet(false, true)) {
wakeup();
if (isRunning()) {
try {
exitedLoop.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Thread was interrupted while waiting for selector to close", e);
}
} else if (selector.isOpen()) {
selector.close();
}
}
}
void processKey(SelectionKey selectionKey) {
ChannelContext<?> context = (ChannelContext<?>) selectionKey.attachment();
if (selectionKey.isAcceptable()) {
assert context instanceof ServerChannelContext : "Only server channels can receive accept events";
ServerChannelContext serverChannelContext = (ServerChannelContext) context;
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_ACCEPT) != 0) {
try {
eventHandler.acceptChannel(serverChannelContext);
} catch (IOException e) {
eventHandler.acceptException(serverChannelContext, e);
}
}
} else {
assert context instanceof SocketChannelContext : "Only sockets channels can receive non-accept events";
SocketChannelContext channelContext = (SocketChannelContext) context;
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_CONNECT) != 0) {
attemptConnect(channelContext, true);
}
if (channelContext.isConnectComplete()) {
if ((ops & SelectionKey.OP_WRITE) != 0) {
handleWrite(channelContext);
}
if ((ops & SelectionKey.OP_READ) != 0) {
handleRead(channelContext);
}
}
eventHandler.postHandling(channelContext);
}
}
/**
* Called immediately prior to a raw {@link Selector#select()} call. Should be used to implement
* channel registration, handling queued writes, and other work that is not specifically processing
* a selection key.
*/
void preSelect() {
setUpNewChannels();
handleQueuedWrites();
}
/**
* Called once as the selector is being closed.
*/
void cleanup() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}
/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes.
*
* @param writeOperation to be queued
*/
public void queueWrite(WriteOperation writeOperation) {
queuedWrites.offer(writeOperation);
if (isOpen() == false) {
boolean wasRemoved = queuedWrites.remove(writeOperation);
if (wasRemoved) {
writeOperation.getListener().accept(null, new ClosedSelectorException());
}
} else {
wakeup();
}
}
public void queueChannelClose(NioChannel channel) {
ChannelContext<?> context = channel.getContext();
assert context.getSelector() == this : "Must schedule a channel for closure with its selector";
channelsToClose.offer(context);
ensureSelectorOpenForEnqueuing(channelsToClose, context);
wakeup();
}
/**
* Schedules a NioChannel to be registered with this selector. The channel will by queued and
* eventually registered next time through the event loop.
*
* @param channel to register
*/
public void scheduleForRegistration(NioChannel channel) {
ChannelContext<?> context = channel.getContext();
channelsToRegister.add(context);
ensureSelectorOpenForEnqueuing(channelsToRegister, context);
wakeup();
}
/**
* Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed
* by the selector thread. As a result, this method should only be called by the selector thread.
*
* @param writeOperation to be queued in a channel's buffer
*/
public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
assertOnSelectorThread();
SocketChannelContext context = writeOperation.getChannel();
try {
SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
context.queueWriteOperation(writeOperation);
} catch (Exception e) {
executeFailedListener(writeOperation.getListener(), e);
}
}
/**
* Executes a success listener with consistent exception handling. This can only be called from current
* selector thread.
*
* @param listener to be executed
* @param value to provide to listener
*/
public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
assertOnSelectorThread();
try {
listener.accept(value, null);
} catch (Exception e) {
eventHandler.listenerException(e);
}
}
/**
* Executes a failed listener with consistent exception handling. This can only be called from current
* selector thread.
*
* @param listener to be executed
* @param exception to provide to listener
*/
public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
assertOnSelectorThread();
try {
listener.accept(null, exception);
} catch (Exception e) {
eventHandler.listenerException(e);
}
}
private void wakeup() {
// TODO: Do we need the wakeup optimizations that some other libraries use?
selector.wakeup();
}
private void handleWrite(SocketChannelContext context) {
try {
eventHandler.handleWrite(context);
} catch (Exception e) {
eventHandler.writeException(context, e);
}
}
private void handleRead(SocketChannelContext context) {
try {
eventHandler.handleRead(context);
} catch (Exception e) {
eventHandler.readException(context, e);
}
}
private void attemptConnect(SocketChannelContext context, boolean connectEvent) {
try {
eventHandler.handleConnect(context);
if (connectEvent && context.isConnectComplete() == false) {
eventHandler.connectException(context, new IOException("Received OP_CONNECT but connect failed"));
}
} catch (Exception e) {
eventHandler.connectException(context, e);
}
}
private void setUpNewChannels() {
ChannelContext<?> newChannel;
while ((newChannel = this.channelsToRegister.poll()) != null) {
assert newChannel.getSelector() == this : "The channel must be registered with the selector with which it was created";
try {
if (newChannel.isOpen()) {
eventHandler.handleRegistration(newChannel);
if (newChannel instanceof SocketChannelContext) {
attemptConnect((SocketChannelContext) newChannel, false);
}
} else {
eventHandler.registrationException(newChannel, new ClosedChannelException());
}
} catch (Exception e) {
eventHandler.registrationException(newChannel, e);
}
}
}
private void closePendingChannels() {
ChannelContext<?> channelContext;
while ((channelContext = channelsToClose.poll()) != null) {
eventHandler.handleClose(channelContext);
}
}
private void handleQueuedWrites() {
WriteOperation writeOperation;
while ((writeOperation = queuedWrites.poll()) != null) {
if (writeOperation.getChannel().isOpen()) {
queueWriteInChannelBuffer(writeOperation);
} else {
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
}
}
}
/**
* This is a convenience method to be called after some object (normally channels) are enqueued with this
* selector. This method will check if the selector is still open. If it is open, normal operation can
* proceed.
*
* If the selector is closed, then we attempt to remove the object from the queue. If the removal
* succeeds then we throw an {@link IllegalStateException} indicating that normal operation failed. If
* the object cannot be removed from the queue, then the object has already been handled by the selector
* and operation can proceed normally.
*
* If this method is called from the selector thread, we will not allow the queuing to occur as the
* selector thread can manipulate its queues internally even if it is no longer open.
*
* @param queue the queue to which the object was added
* @param objectAdded the objected added
* @param <O> the object type
*/
private <O> void ensureSelectorOpenForEnqueuing(ConcurrentLinkedQueue<O> queue, O objectAdded) {
if (isOpen() == false && isOnCurrentThread() == false) {
if (queue.remove(objectAdded)) {
throw new IllegalStateException("selector is already closed");
}
}
}
}

View File

@ -19,21 +19,39 @@
package org.elasticsearch.nio;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class RoundRobinSupplier<S> implements Supplier<S> {
final class RoundRobinSupplier<S> implements Supplier<S> {
private final S[] selectors;
private final int count;
private final AtomicBoolean selectorsSet = new AtomicBoolean(false);
private volatile S[] selectors;
private AtomicInteger counter = new AtomicInteger(0);
RoundRobinSupplier() {
this.selectors = null;
}
RoundRobinSupplier(S[] selectors) {
this.count = selectors.length;
this.selectors = selectors;
this.selectorsSet.set(true);
}
public S get() {
return selectors[counter.getAndIncrement() % count];
S[] selectors = this.selectors;
return selectors[counter.getAndIncrement() % selectors.length];
}
void setSelectors(S[] selectors) {
if (selectorsSet.compareAndSet(false, true)) {
this.selectors = selectors;
} else {
throw new AssertionError("Selectors already set. Should only be set once.");
}
}
int count() {
return selectors.length;
}
}

View File

@ -28,12 +28,12 @@ import java.util.function.Supplier;
public class ServerChannelContext extends ChannelContext<ServerSocketChannel> {
private final NioServerSocketChannel channel;
private final AcceptingSelector selector;
private final NioSelector selector;
private final Consumer<NioSocketChannel> acceptor;
private final AtomicBoolean isClosing = new AtomicBoolean(false);
private final ChannelFactory<?, ?> channelFactory;
public ServerChannelContext(NioServerSocketChannel channel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector,
public ServerChannelContext(NioServerSocketChannel channel, ChannelFactory<?, ?> channelFactory, NioSelector selector,
Consumer<NioSocketChannel> acceptor, Consumer<Exception> exceptionHandler) {
super(channel.getRawChannel(), exceptionHandler);
this.channel = channel;
@ -42,7 +42,7 @@ public class ServerChannelContext extends ChannelContext<ServerSocketChannel> {
this.acceptor = acceptor;
}
public void acceptChannels(Supplier<SocketSelector> selectorSupplier) throws IOException {
public void acceptChannels(Supplier<NioSelector> selectorSupplier) throws IOException {
NioSocketChannel acceptedChannel;
while ((acceptedChannel = channelFactory.acceptNioChannel(this, selectorSupplier)) != null) {
acceptor.accept(acceptedChannel);
@ -57,7 +57,7 @@ public class ServerChannelContext extends ChannelContext<ServerSocketChannel> {
}
@Override
public AcceptingSelector getSelector() {
public NioSelector getSelector() {
return selector;
}

View File

@ -47,14 +47,14 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
protected final InboundChannelBuffer channelBuffer;
protected final AtomicBoolean isClosing = new AtomicBoolean(false);
private final ReadWriteHandler readWriteHandler;
private final SocketSelector selector;
private final NioSelector selector;
private final CompletableContext<Void> connectContext = new CompletableContext<>();
private final LinkedList<FlushOperation> pendingFlushes = new LinkedList<>();
private boolean ioException;
private boolean peerClosed;
private Exception connectException;
protected SocketChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer<Exception> exceptionHandler,
protected SocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
super(channel.getRawChannel(), exceptionHandler);
this.selector = selector;
@ -64,7 +64,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
}
@Override
public SocketSelector getSelector() {
public NioSelector getSelector() {
return selector;
}
@ -129,7 +129,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
WriteOperation writeOperation = readWriteHandler.createWriteOperation(this, message, listener);
SocketSelector selector = getSelector();
NioSelector selector = getSelector();
if (selector.isOnCurrentThread() == false) {
selector.queueWrite(writeOperation);
return;

View File

@ -1,150 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.util.function.Consumer;
/**
* Event handler designed to handle events from non-server sockets
*/
public class SocketEventHandler extends EventHandler {
public SocketEventHandler(Consumer<Exception> exceptionHandler) {
super(exceptionHandler);
}
/**
* This method is called when a NioSocketChannel is successfully registered. It should only be called
* once per channel.
*
* @param context that was registered
*/
protected void handleRegistration(SocketChannelContext context) throws IOException {
context.register();
SelectionKey selectionKey = context.getSelectionKey();
selectionKey.attach(context);
if (context.readyForFlush()) {
SelectionKeyUtils.setConnectReadAndWriteInterested(selectionKey);
} else {
SelectionKeyUtils.setConnectAndReadInterested(selectionKey);
}
}
/**
* This method is called when an attempt to register a channel throws an exception.
*
* @param context that was registered
* @param exception that occurred
*/
protected void registrationException(SocketChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a NioSocketChannel has just been accepted or if it has receive an
* OP_CONNECT event.
*
* @param context that was registered
*/
protected void handleConnect(SocketChannelContext context) throws IOException {
if (context.connect()) {
SelectionKeyUtils.removeConnectInterested(context.getSelectionKey());
}
}
/**
* This method is called when an attempt to connect a channel throws an exception.
*
* @param context that was connecting
* @param exception that occurred
*/
protected void connectException(SocketChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a channel signals it is ready for be read. All of the read logic should
* occur in this call.
*
* @param context that can be read
*/
protected void handleRead(SocketChannelContext context) throws IOException {
context.read();
}
/**
* This method is called when an attempt to read from a channel throws an exception.
*
* @param context that was being read
* @param exception that occurred
*/
protected void readException(SocketChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a channel signals it is ready to receive writes. All of the write logic
* should occur in this call.
*
* @param context that can be written to
*/
protected void handleWrite(SocketChannelContext context) throws IOException {
context.flushChannel();
}
/**
* This method is called when an attempt to write to a channel throws an exception.
*
* @param context that was being written to
* @param exception that occurred
*/
protected void writeException(SocketChannelContext context, Exception exception) {
context.handleException(exception);
}
/**
* This method is called when a listener attached to a channel operation throws an exception.
*
* @param exception that occurred
*/
protected void listenerException(Exception exception) {
exceptionHandler.accept(exception);
}
/**
* @param context that was handled
*/
protected void postHandling(SocketChannelContext context) {
if (context.selectorShouldClose()) {
handleClose(context);
} else {
SelectionKey selectionKey = context.getSelectionKey();
boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey);
boolean pendingWrites = context.readyForFlush();
if (currentlyWriteInterested == false && pendingWrites) {
SelectionKeyUtils.setWriteInterested(selectionKey);
} else if (currentlyWriteInterested && pendingWrites == false) {
SelectionKeyUtils.removeWriteInterested(selectionKey);
}
}
}
}

View File

@ -1,224 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
/**
* Selector implementation that handles {@link NioSocketChannel}. It's main piece of functionality is
* handling connect, read, and write events.
*/
public class SocketSelector extends ESSelector {
private final ConcurrentLinkedQueue<SocketChannelContext> newChannels = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<WriteOperation> queuedWrites = new ConcurrentLinkedQueue<>();
private final SocketEventHandler eventHandler;
public SocketSelector(SocketEventHandler eventHandler) throws IOException {
super(eventHandler);
this.eventHandler = eventHandler;
}
public SocketSelector(SocketEventHandler eventHandler, Selector selector) throws IOException {
super(eventHandler, selector);
this.eventHandler = eventHandler;
}
@Override
void processKey(SelectionKey selectionKey) {
SocketChannelContext channelContext = (SocketChannelContext) selectionKey.attachment();
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_CONNECT) != 0) {
attemptConnect(channelContext, true);
}
if (channelContext.isConnectComplete()) {
if ((ops & SelectionKey.OP_WRITE) != 0) {
handleWrite(channelContext);
}
if ((ops & SelectionKey.OP_READ) != 0) {
handleRead(channelContext);
}
}
eventHandler.postHandling(channelContext);
}
@Override
void preSelect() {
setUpNewChannels();
handleQueuedWrites();
}
@Override
void cleanup() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
channelsToClose.addAll(newChannels);
}
/**
* Schedules a NioSocketChannel to be registered by this selector. The channel will by queued and eventually
* registered next time through the event loop.
* @param nioSocketChannel the channel to register
*/
public void scheduleForRegistration(NioSocketChannel nioSocketChannel) {
SocketChannelContext channelContext = nioSocketChannel.getContext();
newChannels.offer(channelContext);
ensureSelectorOpenForEnqueuing(newChannels, channelContext);
wakeup();
}
/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes.
*
* @param writeOperation to be queued
*/
public void queueWrite(WriteOperation writeOperation) {
queuedWrites.offer(writeOperation);
if (isOpen() == false) {
boolean wasRemoved = queuedWrites.remove(writeOperation);
if (wasRemoved) {
writeOperation.getListener().accept(null, new ClosedSelectorException());
}
} else {
wakeup();
}
}
/**
* Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed
* by the selector thread. As a result, this method should only be called by the selector thread.
*
* @param writeOperation to be queued in a channel's buffer
*/
public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
assertOnSelectorThread();
SocketChannelContext context = writeOperation.getChannel();
try {
SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
context.queueWriteOperation(writeOperation);
} catch (Exception e) {
executeFailedListener(writeOperation.getListener(), e);
}
}
/**
* Executes a success listener with consistent exception handling. This can only be called from current
* selector thread.
*
* @param listener to be executed
* @param value to provide to listener
*/
public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
assertOnSelectorThread();
try {
listener.accept(value, null);
} catch (Exception e) {
eventHandler.listenerException(e);
}
}
/**
* Executes a failed listener with consistent exception handling. This can only be called from current
* selector thread.
*
* @param listener to be executed
* @param exception to provide to listener
*/
public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
assertOnSelectorThread();
try {
listener.accept(null, exception);
} catch (Exception e) {
eventHandler.listenerException(e);
}
}
private void handleWrite(SocketChannelContext context) {
try {
eventHandler.handleWrite(context);
} catch (Exception e) {
eventHandler.writeException(context, e);
}
}
private void handleRead(SocketChannelContext context) {
try {
eventHandler.handleRead(context);
} catch (Exception e) {
eventHandler.readException(context, e);
}
}
private void handleQueuedWrites() {
WriteOperation writeOperation;
while ((writeOperation = queuedWrites.poll()) != null) {
if (writeOperation.getChannel().isOpen()) {
queueWriteInChannelBuffer(writeOperation);
} else {
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
}
}
}
private void setUpNewChannels() {
SocketChannelContext channelContext;
while ((channelContext = this.newChannels.poll()) != null) {
setupChannel(channelContext);
}
}
private void setupChannel(SocketChannelContext context) {
assert context.getSelector() == this : "The channel must be registered with the selector with which it was created";
try {
if (context.isOpen()) {
eventHandler.handleRegistration(context);
attemptConnect(context, false);
} else {
eventHandler.registrationException(context, new ClosedChannelException());
}
} catch (Exception e) {
eventHandler.registrationException(context, e);
}
}
private void attemptConnect(SocketChannelContext context, boolean connectEvent) {
try {
eventHandler.handleConnect(context);
if (connectEvent && context.isConnectComplete() == false) {
eventHandler.connectException(context, new IOException("Received OP_CONNECT but connect failed"));
}
} catch (Exception e) {
eventHandler.connectException(context, e);
}
}
}

View File

@ -1,129 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.security.PrivilegedActionException;
import java.util.Collections;
import java.util.HashSet;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class AcceptingSelectorTests extends ESTestCase {
private AcceptingSelector selector;
private NioServerSocketChannel serverChannel;
private AcceptorEventHandler eventHandler;
private TestSelectionKey selectionKey;
private Selector rawSelector;
private ServerChannelContext context;
@Before
public void setUp() throws Exception {
super.setUp();
eventHandler = mock(AcceptorEventHandler.class);
serverChannel = mock(NioServerSocketChannel.class);
rawSelector = mock(Selector.class);
selector = new AcceptingSelector(eventHandler, rawSelector);
this.selector.setThread();
context = mock(ServerChannelContext.class);
selectionKey = new TestSelectionKey(0);
selectionKey.attach(context);
when(context.getSelectionKey()).thenReturn(selectionKey);
when(context.getSelector()).thenReturn(selector);
when(context.isOpen()).thenReturn(true);
when(serverChannel.getContext()).thenReturn(context);
}
public void testRegisteredChannel() throws IOException {
selector.scheduleForRegistration(serverChannel);
selector.preSelect();
verify(eventHandler).handleRegistration(context);
}
public void testClosedChannelWillNotBeRegistered() {
when(context.isOpen()).thenReturn(false);
selector.scheduleForRegistration(serverChannel);
selector.preSelect();
verify(eventHandler).registrationException(same(context), any(ClosedChannelException.class));
}
public void testRegisterChannelFailsDueToException() throws Exception {
selector.scheduleForRegistration(serverChannel);
ClosedChannelException closedChannelException = new ClosedChannelException();
doThrow(closedChannelException).when(eventHandler).handleRegistration(context);
selector.preSelect();
verify(eventHandler).registrationException(context, closedChannelException);
}
public void testAcceptEvent() throws IOException {
selectionKey.setReadyOps(SelectionKey.OP_ACCEPT);
selector.processKey(selectionKey);
verify(eventHandler).acceptChannel(context);
}
public void testAcceptException() throws IOException {
selectionKey.setReadyOps(SelectionKey.OP_ACCEPT);
IOException ioException = new IOException();
doThrow(ioException).when(eventHandler).acceptChannel(context);
selector.processKey(selectionKey);
verify(eventHandler).acceptException(context, ioException);
}
public void testCleanup() throws IOException {
selector.scheduleForRegistration(serverChannel);
selector.preSelect();
TestSelectionKey key = new TestSelectionKey(0);
key.attach(context);
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(key)));
selector.cleanupAndCloseChannels();
verify(eventHandler).handleClose(context);
}
}

View File

@ -1,123 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.function.Consumer;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class AcceptorEventHandlerTests extends ESTestCase {
private AcceptorEventHandler handler;
private ChannelFactory<NioServerSocketChannel, NioSocketChannel> channelFactory;
private NioServerSocketChannel channel;
private DoNotRegisterContext context;
private RoundRobinSupplier<SocketSelector> selectorSupplier;
@Before
@SuppressWarnings("unchecked")
public void setUpHandler() throws IOException {
channelFactory = mock(ChannelFactory.class);
ArrayList<SocketSelector> selectors = new ArrayList<>();
selectors.add(mock(SocketSelector.class));
selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new SocketSelector[selectors.size()]));
handler = new AcceptorEventHandler(selectorSupplier, mock(Consumer.class));
channel = new NioServerSocketChannel(mock(ServerSocketChannel.class));
context = new DoNotRegisterContext(channel, mock(AcceptingSelector.class), mock(Consumer.class));
channel.setContext(context);
}
public void testHandleRegisterSetsOP_ACCEPTInterest() throws IOException {
assertNull(context.getSelectionKey());
handler.handleRegistration(context);
assertEquals(SelectionKey.OP_ACCEPT, channel.getContext().getSelectionKey().interestOps());
}
public void testRegisterAddsAttachment() throws IOException {
assertNull(context.getSelectionKey());
handler.handleRegistration(context);
assertEquals(context, context.getSelectionKey().attachment());
}
public void testHandleAcceptCallsChannelFactory() throws IOException {
NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class));
NioSocketChannel nullChannel = null;
when(channelFactory.acceptNioChannel(same(context), same(selectorSupplier))).thenReturn(childChannel, nullChannel);
handler.acceptChannel(context);
verify(channelFactory, times(2)).acceptNioChannel(same(context), same(selectorSupplier));
}
@SuppressWarnings("unchecked")
public void testHandleAcceptCallsServerAcceptCallback() throws IOException {
NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class));
SocketChannelContext childContext = mock(SocketChannelContext.class);
childChannel.setContext(childContext);
ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
channel = new NioServerSocketChannel(mock(ServerSocketChannel.class));
channel.setContext(serverChannelContext);
when(serverChannelContext.getChannel()).thenReturn(channel);
when(channelFactory.acceptNioChannel(same(context), same(selectorSupplier))).thenReturn(childChannel);
handler.acceptChannel(serverChannelContext);
verify(serverChannelContext).acceptChannels(selectorSupplier);
}
public void testAcceptExceptionCallsExceptionHandler() throws IOException {
ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
IOException exception = new IOException();
handler.acceptException(serverChannelContext, exception);
verify(serverChannelContext).handleException(exception);
}
private class DoNotRegisterContext extends ServerChannelContext {
@SuppressWarnings("unchecked")
DoNotRegisterContext(NioServerSocketChannel channel, AcceptingSelector selector, Consumer<NioSocketChannel> acceptor) {
super(channel, channelFactory, selector, acceptor, mock(Consumer.class));
}
@Override
public void register() {
setSelectionKey(new TestSelectionKey(0));
}
}
}

View File

@ -44,7 +44,7 @@ public class BytesChannelContextTests extends ESTestCase {
private SocketChannel rawChannel;
private BytesChannelContext context;
private InboundChannelBuffer channelBuffer;
private SocketSelector selector;
private NioSelector selector;
private BiConsumer<Void, Exception> listener;
private int messageLength;
@ -54,7 +54,7 @@ public class BytesChannelContextTests extends ESTestCase {
readConsumer = mock(CheckedFunction.class);
messageLength = randomInt(96) + 20;
selector = mock(SocketSelector.class);
selector = mock(NioSelector.class);
listener = mock(BiConsumer.class);
channel = mock(NioSocketChannel.class);
rawChannel = mock(SocketChannel.class);

View File

@ -115,7 +115,7 @@ public class ChannelContextTests extends ESTestCase {
}
@Override
public ESSelector getSelector() {
public NioSelector getSelector() {
throw new UnsupportedOperationException("not implemented");
}

View File

@ -43,18 +43,18 @@ public class ChannelFactoryTests extends ESTestCase {
private ChannelFactory.RawChannelFactory rawChannelFactory;
private SocketChannel rawChannel;
private ServerSocketChannel rawServerChannel;
private SocketSelector socketSelector;
private Supplier<SocketSelector> socketSelectorSupplier;
private Supplier<AcceptingSelector> acceptingSelectorSupplier;
private AcceptingSelector acceptingSelector;
private NioSelector socketSelector;
private Supplier<NioSelector> socketSelectorSupplier;
private Supplier<NioSelector> acceptingSelectorSupplier;
private NioSelector acceptingSelector;
@Before
@SuppressWarnings("unchecked")
public void setupFactory() throws IOException {
rawChannelFactory = mock(ChannelFactory.RawChannelFactory.class);
channelFactory = new TestChannelFactory(rawChannelFactory);
socketSelector = mock(SocketSelector.class);
acceptingSelector = mock(AcceptingSelector.class);
socketSelector = mock(NioSelector.class);
acceptingSelector = mock(NioSelector.class);
socketSelectorSupplier = mock(Supplier.class);
acceptingSelectorSupplier = mock(Supplier.class);
rawChannel = SocketChannel.open();
@ -139,14 +139,14 @@ public class ChannelFactoryTests extends ESTestCase {
@SuppressWarnings("unchecked")
@Override
public NioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException {
public NioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioSocketChannel nioSocketChannel = new NioSocketChannel(channel);
nioSocketChannel.setContext(mock(SocketChannelContext.class));
return nioSocketChannel;
}
@Override
public NioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException {
public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
return new NioServerSocketChannel(channel);
}
}

View File

@ -1,114 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ESSelectorTests extends ESTestCase {
private ESSelector selector;
private EventHandler handler;
private Selector rawSelector;
@Before
public void setUp() throws Exception {
super.setUp();
handler = mock(EventHandler.class);
rawSelector = mock(Selector.class);
selector = new TestSelector(handler, rawSelector);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testQueueChannelForClosed() throws IOException {
NioChannel channel = mock(NioChannel.class);
ChannelContext context = mock(ChannelContext.class);
when(channel.getContext()).thenReturn(context);
when(context.getSelector()).thenReturn(selector);
selector.queueChannelClose(channel);
selector.singleLoop();
verify(handler).handleClose(context);
}
public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {
boolean closedSelectorExceptionCaught = false;
when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException());
try {
this.selector.singleLoop();
} catch (ClosedSelectorException e) {
closedSelectorExceptionCaught = true;
}
assertTrue(closedSelectorExceptionCaught);
}
public void testIOExceptionWhileSelect() throws IOException {
IOException ioException = new IOException();
when(rawSelector.select(anyInt())).thenThrow(ioException);
this.selector.singleLoop();
verify(handler).selectorException(ioException);
}
public void testSelectorClosedIfOpenAndEventLoopNotRunning() throws IOException {
when(rawSelector.isOpen()).thenReturn(true);
selector.close();
verify(rawSelector).close();
}
private static class TestSelector extends ESSelector {
TestSelector(EventHandler eventHandler, Selector selector) throws IOException {
super(eventHandler, selector);
}
@Override
void processKey(SelectionKey selectionKey) throws CancelledKeyException {
}
@Override
void preSelect() {
}
@Override
void cleanup() {
}
}
}

View File

@ -25,25 +25,29 @@ import org.junit.Before;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Consumer;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class SocketEventHandlerTests extends ESTestCase {
public class EventHandlerTests extends ESTestCase {
private Consumer<Exception> channelExceptionHandler;
private Consumer<Exception> genericExceptionHandler;
private ReadWriteHandler readWriteHandler;
private SocketEventHandler handler;
private NioSocketChannel channel;
private SocketChannel rawChannel;
private DoNotRegisterContext context;
private EventHandler handler;
private DoNotRegisterSocketContext context;
private DoNotRegisterServerContext serverContext;
private ChannelFactory<NioServerSocketChannel, NioSocketChannel> channelFactory;
private RoundRobinSupplier<NioSelector> selectorSupplier;
@Before
@SuppressWarnings("unchecked")
@ -51,16 +55,24 @@ public class SocketEventHandlerTests extends ESTestCase {
channelExceptionHandler = mock(Consumer.class);
genericExceptionHandler = mock(Consumer.class);
readWriteHandler = mock(ReadWriteHandler.class);
SocketSelector selector = mock(SocketSelector.class);
handler = new SocketEventHandler(genericExceptionHandler);
rawChannel = mock(SocketChannel.class);
channel = new NioSocketChannel(rawChannel);
when(rawChannel.finishConnect()).thenReturn(true);
channelFactory = mock(ChannelFactory.class);
NioSelector selector = mock(NioSelector.class);
ArrayList<NioSelector> selectors = new ArrayList<>();
selectors.add(selector);
selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
handler = new EventHandler(genericExceptionHandler, selectorSupplier);
context = new DoNotRegisterContext(channel, selector, channelExceptionHandler, new TestSelectionKey(0), readWriteHandler);
SocketChannel rawChannel = mock(SocketChannel.class);
when(rawChannel.finishConnect()).thenReturn(true);
NioSocketChannel channel = new NioSocketChannel(rawChannel);
context = new DoNotRegisterSocketContext(channel, selector, channelExceptionHandler, readWriteHandler);
channel.setContext(context);
handler.handleRegistration(context);
NioServerSocketChannel serverChannel = new NioServerSocketChannel(mock(ServerSocketChannel.class));
serverContext = new DoNotRegisterServerContext(serverChannel, mock(NioSelector.class), mock(Consumer.class));
serverChannel.setContext(serverContext);
when(selector.isOnCurrentThread()).thenReturn(true);
}
@ -73,7 +85,7 @@ public class SocketEventHandlerTests extends ESTestCase {
verify(channelContext).register();
}
public void testRegisterAddsOP_CONNECTAndOP_READInterest() throws IOException {
public void testRegisterNonServerAddsOP_CONNECTAndOP_READInterest() throws IOException {
SocketChannelContext context = mock(SocketChannelContext.class);
when(context.getSelectionKey()).thenReturn(new TestSelectionKey(0));
handler.handleRegistration(context);
@ -81,16 +93,55 @@ public class SocketEventHandlerTests extends ESTestCase {
}
public void testRegisterAddsAttachment() throws IOException {
SocketChannelContext context = mock(SocketChannelContext.class);
ChannelContext<?> context = randomBoolean() ? mock(SocketChannelContext.class) : mock(ServerChannelContext.class);
when(context.getSelectionKey()).thenReturn(new TestSelectionKey(0));
handler.handleRegistration(context);
assertEquals(context, context.getSelectionKey().attachment());
}
public void testHandleServerRegisterSetsOP_ACCEPTInterest() throws IOException {
assertNull(serverContext.getSelectionKey());
handler.handleRegistration(serverContext);
assertEquals(SelectionKey.OP_ACCEPT, serverContext.getSelectionKey().interestOps());
}
public void testHandleAcceptCallsChannelFactory() throws IOException {
NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class));
NioSocketChannel nullChannel = null;
when(channelFactory.acceptNioChannel(same(serverContext), same(selectorSupplier))).thenReturn(childChannel, nullChannel);
handler.acceptChannel(serverContext);
verify(channelFactory, times(2)).acceptNioChannel(same(serverContext), same(selectorSupplier));
}
@SuppressWarnings("unchecked")
public void testHandleAcceptCallsServerAcceptCallback() throws IOException {
NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class));
SocketChannelContext childContext = mock(SocketChannelContext.class);
childChannel.setContext(childContext);
ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
when(channelFactory.acceptNioChannel(same(serverContext), same(selectorSupplier))).thenReturn(childChannel);
handler.acceptChannel(serverChannelContext);
verify(serverChannelContext).acceptChannels(selectorSupplier);
}
public void testAcceptExceptionCallsExceptionHandler() throws IOException {
ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
IOException exception = new IOException();
handler.acceptException(serverChannelContext, exception);
verify(serverChannelContext).handleException(exception);
}
public void testRegisterWithPendingWritesAddsOP_CONNECTAndOP_READAndOP_WRITEInterest() throws IOException {
FlushReadyWrite flushReadyWrite = mock(FlushReadyWrite.class);
when(readWriteHandler.writeToBytes(flushReadyWrite)).thenReturn(Collections.singletonList(flushReadyWrite));
channel.getContext().queueWriteOperation(flushReadyWrite);
context.queueWriteOperation(flushReadyWrite);
handler.handleRegistration(context);
assertEquals(SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE, context.getSelectionKey().interestOps());
}
@ -120,11 +171,7 @@ public class SocketEventHandlerTests extends ESTestCase {
}
public void testHandleReadDelegatesToContext() throws IOException {
NioSocketChannel channel = new NioSocketChannel(rawChannel);
SocketChannelContext context = mock(SocketChannelContext.class);
channel.setContext(context);
when(context.read()).thenReturn(1);
handler.handleRead(context);
verify(context).read();
}
@ -200,19 +247,31 @@ public class SocketEventHandlerTests extends ESTestCase {
verify(genericExceptionHandler).accept(listenerException);
}
private class DoNotRegisterContext extends BytesChannelContext {
private class DoNotRegisterSocketContext extends BytesChannelContext {
private final TestSelectionKey selectionKey;
DoNotRegisterContext(NioSocketChannel channel, SocketSelector selector, Consumer<Exception> exceptionHandler,
TestSelectionKey selectionKey, ReadWriteHandler handler) {
DoNotRegisterSocketContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler handler) {
super(channel, selector, exceptionHandler, handler, InboundChannelBuffer.allocatingInstance());
this.selectionKey = selectionKey;
}
@Override
public void register() {
setSelectionKey(selectionKey);
setSelectionKey(new TestSelectionKey(0));
}
}
private class DoNotRegisterServerContext extends ServerChannelContext {
@SuppressWarnings("unchecked")
DoNotRegisterServerContext(NioServerSocketChannel channel, NioSelector selector, Consumer<NioSocketChannel> acceptor) {
super(channel, channelFactory, selector, acceptor, mock(Consumer.class));
}
@Override
public void register() {
setSelectionKey(new TestSelectionKey(0));
}
}
}

View File

@ -38,9 +38,8 @@ public class NioGroupTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void setUp() throws Exception {
super.setUp();
nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1,
(s) -> new AcceptorEventHandler(s, mock(Consumer.class)), daemonThreadFactory(Settings.EMPTY, "selector"), 1,
() -> new SocketEventHandler(mock(Consumer.class)));
nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, daemonThreadFactory(Settings.EMPTY, "selector"), 1,
(s) -> new EventHandler(mock(Consumer.class), s));
}
@Override
@ -76,8 +75,8 @@ public class NioGroupTests extends ESTestCase {
public void testExceptionAtStartIsHandled() throws IOException {
RuntimeException ex = new RuntimeException();
CheckedRunnable<IOException> ctor = () -> new NioGroup(r -> {throw ex;}, 1,
(s) -> new AcceptorEventHandler(s, mock(Consumer.class)), daemonThreadFactory(Settings.EMPTY, "selector"),
1, () -> new SocketEventHandler(mock(Consumer.class)));
daemonThreadFactory(Settings.EMPTY, "selector"),
1, (s) -> new EventHandler(mock(Consumer.class), s));
RuntimeException runtimeException = expectThrows(RuntimeException.class, ctor::run);
assertSame(ex, runtimeException);
// ctor starts threads. So we are testing that a failure to construct will stop threads. Our thread

View File

@ -43,13 +43,15 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class SocketSelectorTests extends ESTestCase {
public class NioSelectorTests extends ESTestCase {
private SocketSelector socketSelector;
private SocketEventHandler eventHandler;
private NioSelector selector;
private EventHandler eventHandler;
private NioSocketChannel channel;
private NioServerSocketChannel serverChannel;
private TestSelectionKey selectionKey;
private SocketChannelContext channelContext;
private ServerChannelContext serverChannelContext;
private BiConsumer<Void, Exception> listener;
private ByteBuffer[] buffers = {ByteBuffer.allocate(1)};
private Selector rawSelector;
@ -59,75 +61,172 @@ public class SocketSelectorTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
rawSelector = mock(Selector.class);
eventHandler = mock(SocketEventHandler.class);
eventHandler = mock(EventHandler.class);
channel = mock(NioSocketChannel.class);
channelContext = mock(SocketChannelContext.class);
serverChannel = mock(NioServerSocketChannel.class);
serverChannelContext = mock(ServerChannelContext.class);
listener = mock(BiConsumer.class);
selectionKey = new TestSelectionKey(0);
selectionKey.attach(channelContext);
this.socketSelector = new SocketSelector(eventHandler, rawSelector);
this.socketSelector.setThread();
this.selector = new NioSelector(eventHandler, rawSelector);
this.selector.setThread();
when(channel.getContext()).thenReturn(channelContext);
when(channelContext.isOpen()).thenReturn(true);
when(channelContext.getSelector()).thenReturn(socketSelector);
when(channelContext.getSelector()).thenReturn(selector);
when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(channelContext.isConnectComplete()).thenReturn(true);
when(serverChannel.getContext()).thenReturn(serverChannelContext);
when(serverChannelContext.isOpen()).thenReturn(true);
when(serverChannelContext.getSelector()).thenReturn(selector);
when(serverChannelContext.getSelectionKey()).thenReturn(selectionKey);
}
public void testRegisterChannel() throws Exception {
socketSelector.scheduleForRegistration(channel);
@SuppressWarnings({"unchecked", "rawtypes"})
public void testQueueChannelForClosed() throws IOException {
NioChannel channel = mock(NioChannel.class);
ChannelContext context = mock(ChannelContext.class);
when(channel.getContext()).thenReturn(context);
when(context.getSelector()).thenReturn(selector);
socketSelector.preSelect();
selector.queueChannelClose(channel);
verify(eventHandler).handleRegistration(channelContext);
selector.singleLoop();
verify(eventHandler).handleClose(context);
}
public void testClosedChannelWillNotBeRegistered() throws Exception {
public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {
boolean closedSelectorExceptionCaught = false;
when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException());
try {
this.selector.singleLoop();
} catch (ClosedSelectorException e) {
closedSelectorExceptionCaught = true;
}
assertTrue(closedSelectorExceptionCaught);
}
public void testIOExceptionWhileSelect() throws IOException {
IOException ioException = new IOException();
when(rawSelector.select(anyInt())).thenThrow(ioException);
this.selector.singleLoop();
verify(eventHandler).selectorException(ioException);
}
public void testSelectorClosedIfOpenAndEventLoopNotRunning() throws IOException {
when(rawSelector.isOpen()).thenReturn(true);
selector.close();
verify(rawSelector).close();
}
public void testRegisteredChannel() throws IOException {
selector.scheduleForRegistration(serverChannel);
selector.preSelect();
verify(eventHandler).handleRegistration(serverChannelContext);
}
public void testClosedServerChannelWillNotBeRegistered() {
when(serverChannelContext.isOpen()).thenReturn(false);
selector.scheduleForRegistration(serverChannel);
selector.preSelect();
verify(eventHandler).registrationException(same(serverChannelContext), any(ClosedChannelException.class));
}
public void testRegisterServerChannelFailsDueToException() throws Exception {
selector.scheduleForRegistration(serverChannel);
ClosedChannelException closedChannelException = new ClosedChannelException();
doThrow(closedChannelException).when(eventHandler).handleRegistration(serverChannelContext);
selector.preSelect();
verify(eventHandler).registrationException(serverChannelContext, closedChannelException);
}
public void testClosedSocketChannelWillNotBeRegistered() throws Exception {
when(channelContext.isOpen()).thenReturn(false);
socketSelector.scheduleForRegistration(channel);
selector.scheduleForRegistration(channel);
socketSelector.preSelect();
selector.preSelect();
verify(eventHandler).registrationException(same(channelContext), any(ClosedChannelException.class));
verify(eventHandler, times(0)).handleConnect(channelContext);
}
public void testRegisterChannelFailsDueToException() throws Exception {
socketSelector.scheduleForRegistration(channel);
public void testRegisterSocketChannelFailsDueToException() throws Exception {
selector.scheduleForRegistration(channel);
ClosedChannelException closedChannelException = new ClosedChannelException();
doThrow(closedChannelException).when(eventHandler).handleRegistration(channelContext);
socketSelector.preSelect();
selector.preSelect();
verify(eventHandler).registrationException(channelContext, closedChannelException);
verify(eventHandler, times(0)).handleConnect(channelContext);
}
public void testSuccessfullyRegisterChannelWillAttemptConnect() throws Exception {
socketSelector.scheduleForRegistration(channel);
public void testAcceptEvent() throws IOException {
selectionKey.setReadyOps(SelectionKey.OP_ACCEPT);
socketSelector.preSelect();
selectionKey.attach(serverChannelContext);
selector.processKey(selectionKey);
verify(eventHandler).acceptChannel(serverChannelContext);
}
public void testAcceptException() throws IOException {
selectionKey.setReadyOps(SelectionKey.OP_ACCEPT);
IOException ioException = new IOException();
doThrow(ioException).when(eventHandler).acceptChannel(serverChannelContext);
selectionKey.attach(serverChannelContext);
selector.processKey(selectionKey);
verify(eventHandler).acceptException(serverChannelContext, ioException);
}
public void testRegisterChannel() throws Exception {
selector.scheduleForRegistration(channel);
selector.preSelect();
verify(eventHandler).handleRegistration(channelContext);
}
public void testSuccessfullyRegisterChannelWillAttemptConnect() throws Exception {
selector.scheduleForRegistration(channel);
selector.preSelect();
verify(eventHandler).handleConnect(channelContext);
}
public void testQueueWriteWhenNotRunning() throws Exception {
socketSelector.close();
selector.close();
socketSelector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener));
selector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener));
verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class));
}
public void testQueueWriteChannelIsClosed() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
socketSelector.queueWrite(writeOperation);
selector.queueWrite(writeOperation);
when(channelContext.isOpen()).thenReturn(false);
socketSelector.preSelect();
selector.preSelect();
verify(channelContext, times(0)).queueWriteOperation(writeOperation);
verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class));
@ -138,11 +237,11 @@ public class SocketSelectorTests extends ESTestCase {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
CancelledKeyException cancelledKeyException = new CancelledKeyException();
socketSelector.queueWrite(writeOperation);
selector.queueWrite(writeOperation);
when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
socketSelector.preSelect();
selector.preSelect();
verify(channelContext, times(0)).queueWriteOperation(writeOperation);
verify(listener).accept(null, cancelledKeyException);
@ -150,11 +249,11 @@ public class SocketSelectorTests extends ESTestCase {
public void testQueueWriteSuccessful() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
socketSelector.queueWrite(writeOperation);
selector.queueWrite(writeOperation);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
socketSelector.preSelect();
selector.preSelect();
verify(channelContext).queueWriteOperation(writeOperation);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
@ -165,7 +264,7 @@ public class SocketSelectorTests extends ESTestCase {
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
socketSelector.queueWriteInChannelBuffer(writeOperation);
selector.queueWriteInChannelBuffer(writeOperation);
verify(channelContext).queueWriteOperation(writeOperation);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
@ -179,7 +278,7 @@ public class SocketSelectorTests extends ESTestCase {
when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
socketSelector.queueWriteInChannelBuffer(writeOperation);
selector.queueWriteInChannelBuffer(writeOperation);
verify(channelContext, times(0)).queueWriteOperation(writeOperation);
verify(listener).accept(null, cancelledKeyException);
@ -188,7 +287,8 @@ public class SocketSelectorTests extends ESTestCase {
public void testConnectEvent() throws Exception {
selectionKey.setReadyOps(SelectionKey.OP_CONNECT);
socketSelector.processKey(selectionKey);
selectionKey.attach(channelContext);
selector.processKey(selectionKey);
verify(eventHandler).handleConnect(channelContext);
}
@ -199,7 +299,8 @@ public class SocketSelectorTests extends ESTestCase {
selectionKey.setReadyOps(SelectionKey.OP_CONNECT);
doThrow(ioException).when(eventHandler).handleConnect(channelContext);
socketSelector.processKey(selectionKey);
selectionKey.attach(channelContext);
selector.processKey(selectionKey);
verify(eventHandler).connectException(channelContext, ioException);
}
@ -212,7 +313,8 @@ public class SocketSelectorTests extends ESTestCase {
doThrow(ioException).when(eventHandler).handleWrite(channelContext);
when(channelContext.isConnectComplete()).thenReturn(false);
socketSelector.processKey(selectionKey);
selectionKey.attach(channelContext);
selector.processKey(selectionKey);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).handleRead(channelContext);
@ -221,7 +323,8 @@ public class SocketSelectorTests extends ESTestCase {
public void testSuccessfulWriteEvent() throws Exception {
selectionKey.setReadyOps(SelectionKey.OP_WRITE);
socketSelector.processKey(selectionKey);
selectionKey.attach(channelContext);
selector.processKey(selectionKey);
verify(eventHandler).handleWrite(channelContext);
}
@ -229,11 +332,13 @@ public class SocketSelectorTests extends ESTestCase {
public void testWriteEventWithException() throws Exception {
IOException ioException = new IOException();
selectionKey.attach(channelContext);
selectionKey.setReadyOps(SelectionKey.OP_WRITE);
doThrow(ioException).when(eventHandler).handleWrite(channelContext);
socketSelector.processKey(selectionKey);
selectionKey.attach(channelContext);
selector.processKey(selectionKey);
verify(eventHandler).writeException(channelContext, ioException);
}
@ -241,7 +346,8 @@ public class SocketSelectorTests extends ESTestCase {
public void testSuccessfulReadEvent() throws Exception {
selectionKey.setReadyOps(SelectionKey.OP_READ);
socketSelector.processKey(selectionKey);
selectionKey.attach(channelContext);
selector.processKey(selectionKey);
verify(eventHandler).handleRead(channelContext);
}
@ -253,7 +359,8 @@ public class SocketSelectorTests extends ESTestCase {
doThrow(ioException).when(eventHandler).handleRead(channelContext);
socketSelector.processKey(selectionKey);
selectionKey.attach(channelContext);
selector.processKey(selectionKey);
verify(eventHandler).readException(channelContext, ioException);
}
@ -261,7 +368,8 @@ public class SocketSelectorTests extends ESTestCase {
public void testWillCallPostHandleAfterChannelHandling() throws Exception {
selectionKey.setReadyOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
socketSelector.processKey(selectionKey);
selectionKey.attach(channelContext);
selector.processKey(selectionKey);
verify(eventHandler).handleWrite(channelContext);
verify(eventHandler).handleRead(channelContext);
@ -273,18 +381,18 @@ public class SocketSelectorTests extends ESTestCase {
SocketChannelContext unregisteredContext = mock(SocketChannelContext.class);
when(unregisteredChannel.getContext()).thenReturn(unregisteredContext);
socketSelector.scheduleForRegistration(channel);
selector.scheduleForRegistration(channel);
socketSelector.preSelect();
selector.preSelect();
socketSelector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener));
socketSelector.scheduleForRegistration(unregisteredChannel);
selector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener));
selector.scheduleForRegistration(unregisteredChannel);
TestSelectionKey testSelectionKey = new TestSelectionKey(0);
testSelectionKey.attach(channelContext);
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(testSelectionKey)));
socketSelector.cleanupAndCloseChannels();
selector.cleanupAndCloseChannels();
verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class));
verify(eventHandler).handleClose(channelContext);
@ -295,7 +403,7 @@ public class SocketSelectorTests extends ESTestCase {
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).accept(null, null);
socketSelector.executeListener(listener, null);
selector.executeListener(listener, null);
verify(eventHandler).listenerException(exception);
}
@ -305,7 +413,7 @@ public class SocketSelectorTests extends ESTestCase {
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).accept(null, ioException);
socketSelector.executeFailedListener(listener, ioException);
selector.executeFailedListener(listener, ioException);
verify(eventHandler).listenerException(exception);
}

View File

@ -51,7 +51,7 @@ public class SocketChannelContextTests extends ESTestCase {
private Consumer<Exception> exceptionHandler;
private NioSocketChannel channel;
private BiConsumer<Void, Exception> listener;
private SocketSelector selector;
private NioSelector selector;
private ReadWriteHandler readWriteHandler;
@SuppressWarnings("unchecked")
@ -64,7 +64,7 @@ public class SocketChannelContextTests extends ESTestCase {
listener = mock(BiConsumer.class);
when(channel.getRawChannel()).thenReturn(rawChannel);
exceptionHandler = mock(Consumer.class);
selector = mock(SocketSelector.class);
selector = mock(NioSelector.class);
readWriteHandler = mock(ReadWriteHandler.class);
InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
@ -275,7 +275,7 @@ public class SocketChannelContextTests extends ESTestCase {
private static class TestSocketChannelContext extends SocketChannelContext {
private TestSocketChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer<Exception> exceptionHandler,
private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
}

View File

@ -47,10 +47,9 @@ import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.AcceptorEventHandler;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioChannel;
import org.elasticsearch.nio.NioGroup;
@ -58,8 +57,7 @@ import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.SocketEventHandler;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
@ -180,9 +178,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
int acceptorCount = NIO_HTTP_ACCEPTOR_COUNT.get(settings);
int workerCount = NIO_HTTP_WORKER_COUNT.get(settings);
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount,
(s) -> new AcceptorEventHandler(s, this::nonChannelExceptionCaught),
daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), workerCount,
() -> new SocketEventHandler(this::nonChannelExceptionCaught));
(s) -> new EventHandler(this::nonChannelExceptionCaught, s));
channelFactory = new HttpChannelFactory();
this.boundAddress = createBoundHttpAddress();
@ -360,7 +357,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
}
@Override
public NioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException {
public NioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioSocketChannel nioChannel = new NioSocketChannel(channel);
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this,
httpHandlingSettings, xContentRegistry, corsConfig, threadPool.getThreadContext());
@ -372,7 +369,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
}
@Override
public NioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException {
public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);

View File

@ -31,16 +31,14 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.AcceptorEventHandler;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketEventHandler;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport;
@ -55,23 +53,18 @@ import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.elasticsearch.common.settings.Setting.intSetting;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
public class NioTransport extends TcpTransport {
private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;
private static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX;
public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count",
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope);
public static final Setting<Integer> NIO_ACCEPTOR_COUNT =
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
protected final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private volatile NioGroup nioGroup;
@ -101,20 +94,13 @@ public class NioTransport extends TcpTransport {
protected void doStart() {
boolean success = false;
try {
int acceptorCount = 0;
boolean useNetworkServer = NetworkService.NETWORK_SERVER.get(settings);
if (useNetworkServer) {
acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
}
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount,
(s) -> new AcceptorEventHandler(s, this::onNonChannelException),
daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), NioTransport.NIO_WORKER_COUNT.get(settings),
() -> new SocketEventHandler(this::onNonChannelException));
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX),
NioTransport.NIO_WORKER_COUNT.get(settings), (s) -> new EventHandler(this::onNonChannelException, s));
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = channelFactory(clientProfileSettings, true);
if (useNetworkServer) {
if (NetworkService.NETWORK_SERVER.get(settings)) {
// loop through all profiles and start them up, special handling for default one
for (ProfileSettings profileSettings : profileSettings) {
String profileName = profileSettings.profileName;
@ -178,7 +164,7 @@ public class NioTransport extends TcpTransport {
}
@Override
public TcpNioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException {
public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
TcpNioSocketChannel nioChannel = new TcpNioSocketChannel(profileName, channel);
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
@ -193,7 +179,7 @@ public class NioTransport extends TcpTransport {
}
@Override
public TcpNioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException {
public TcpNioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);

View File

@ -50,8 +50,7 @@ public class NioTransportPlugin extends Plugin implements NetworkPlugin {
return Arrays.asList(
NioHttpServerTransport.NIO_HTTP_ACCEPTOR_COUNT,
NioHttpServerTransport.NIO_HTTP_WORKER_COUNT,
NioTransport.NIO_WORKER_COUNT,
NioTransport.NIO_ACCEPTOR_COUNT
NioTransport.NIO_WORKER_COUNT
);
}

View File

@ -21,8 +21,6 @@ package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.transport.TcpChannel;

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.transport.TcpChannel;
import java.io.IOException;

View File

@ -30,7 +30,6 @@ public enum Transports {
public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread";
public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker";
public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor";
/**
* Utility method to detect whether a thread is a network thread. Typically
@ -44,8 +43,7 @@ public enum Transports {
TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
TEST_MOCK_TRANSPORT_THREAD_PREFIX,
NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX,
NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) {
NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX)) {
if (threadName.contains(s)) {
return true;
}

View File

@ -30,17 +30,15 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.AcceptorEventHandler;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.BytesWriteHandler;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport;
@ -62,7 +60,6 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class MockNioTransport extends TcpTransport {
private static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;
private static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX;
private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap();
@ -93,20 +90,13 @@ public class MockNioTransport extends TcpTransport {
protected void doStart() {
boolean success = false;
try {
int acceptorCount = 0;
boolean useNetworkServer = NetworkService.NETWORK_SERVER.get(settings);
if (useNetworkServer) {
acceptorCount = 1;
}
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount,
(s) -> new AcceptorEventHandler(s, this::onNonChannelException),
daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
() -> new TestingSocketEventHandler(this::onNonChannelException));
nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestingSocketEventHandler(this::onNonChannelException, s));
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = new MockTcpChannelFactory(clientProfileSettings, "client");
if (useNetworkServer) {
if (NetworkService.NETWORK_SERVER.get(settings)) {
// loop through all profiles and start them up, special handling for default one
for (ProfileSettings profileSettings : profileSettings) {
String profileName = profileSettings.profileName;
@ -159,7 +149,7 @@ public class MockNioTransport extends TcpTransport {
}
@Override
public MockSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException {
public MockSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
MockSocketChannel nioChannel = new MockSocketChannel(profileName, channel, selector);
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
@ -173,7 +163,7 @@ public class MockNioTransport extends TcpTransport {
}
@Override
public MockServerChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException {
public MockServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
MockServerChannel nioServerChannel = new MockServerChannel(profileName, channel, this, selector);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
@ -205,7 +195,7 @@ public class MockNioTransport extends TcpTransport {
private final String profile;
MockServerChannel(String profile, ServerSocketChannel channel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector)
MockServerChannel(String profile, ServerSocketChannel channel, ChannelFactory<?, ?> channelFactory, NioSelector selector)
throws IOException {
super(channel);
this.profile = profile;
@ -246,7 +236,7 @@ public class MockNioTransport extends TcpTransport {
private final String profile;
private MockSocketChannel(String profile, java.nio.channels.SocketChannel socketChannel, SocketSelector selector)
private MockSocketChannel(String profile, java.nio.channels.SocketChannel socketChannel, NioSelector selector)
throws IOException {
super(socketChannel);
this.profile = profile;

View File

@ -19,21 +19,23 @@
package org.elasticsearch.transport.nio;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.SocketEventHandler;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class TestingSocketEventHandler extends SocketEventHandler {
public class TestingSocketEventHandler extends EventHandler {
private Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
public TestingSocketEventHandler(Consumer<Exception> exceptionHandler) {
super(exceptionHandler);
public TestingSocketEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier) {
super(exceptionHandler, selectorSupplier);
}
public void handleConnect(SocketChannelContext context) throws IOException {

View File

@ -11,7 +11,7 @@ import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ReadWriteHandler;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.WriteOperation;
import java.io.IOException;
@ -28,7 +28,7 @@ public final class SSLChannelContext extends SocketChannelContext {
private final SSLDriver sslDriver;
SSLChannelContext(NioSocketChannel channel, SocketSelector selector, Consumer<Exception> exceptionHandler, SSLDriver sslDriver,
SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler, SSLDriver sslDriver,
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
this.sslDriver = sslDriver;
@ -140,7 +140,7 @@ public final class SSLChannelContext extends SocketChannelContext {
public void closeChannel() {
if (isClosing.compareAndSet(false, true)) {
WriteOperation writeOperation = new CloseNotifyOperation(this);
SocketSelector selector = getSelector();
NioSelector selector = getSelector();
if (selector.isOnCurrentThread() == false) {
selector.queueWrite(writeOperation);
return;

View File

@ -13,11 +13,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.nio.NioTransport;
@ -117,7 +116,7 @@ public class SecurityNioTransport extends NioTransport {
}
@Override
public TcpNioSocketChannel createChannel(SocketSelector selector, SocketChannel channel) throws IOException {
public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
SSLConfiguration defaultConfig = profileConfiguration.get(TcpTransport.DEFAULT_PROFILE);
SSLEngine sslEngine = sslService.createSSLEngine(profileConfiguration.getOrDefault(profileName, defaultConfig), null, -1);
SSLDriver sslDriver = new SSLDriver(sslEngine, isClient);
@ -136,7 +135,7 @@ public class SecurityNioTransport extends NioTransport {
}
@Override
public TcpNioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException {
public TcpNioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);

View File

@ -11,7 +11,7 @@ import org.elasticsearch.nio.BytesWriteHandler;
import org.elasticsearch.nio.FlushReadyWrite;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.WriteOperation;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -40,7 +40,7 @@ public class SSLChannelContextTests extends ESTestCase {
private SocketChannel rawChannel;
private SSLChannelContext context;
private InboundChannelBuffer channelBuffer;
private SocketSelector selector;
private NioSelector selector;
private BiConsumer<Void, Exception> listener;
private Consumer exceptionHandler;
private SSLDriver sslDriver;
@ -55,7 +55,7 @@ public class SSLChannelContextTests extends ESTestCase {
TestReadWriteHandler readWriteHandler = new TestReadWriteHandler(readConsumer);
messageLength = randomInt(96) + 20;
selector = mock(SocketSelector.class);
selector = mock(NioSelector.class);
listener = mock(BiConsumer.class);
channel = mock(NioSocketChannel.class);
rawChannel = mock(SocketChannel.class);