Move raw selector usage into ESSelector (#26825)

Currently we only log generic messages about errors in logs from the
nio event handler. This means that we do not know which channel had
issues connection, reading, writing, etc.

This commit changes the logs to include the local and remote addresses
and profile for a channel.
This commit is contained in:
Tim Brooks 2017-10-01 17:59:57 -06:00 committed by GitHub
parent 5869a7482b
commit 9ae7a80ba5
9 changed files with 128 additions and 164 deletions

View File

@ -51,21 +51,22 @@ public class AcceptingSelector extends ESSelector {
}
@Override
void doSelect(int timeout) throws IOException, ClosedSelectorException {
setUpNewServerChannels();
int ready = selector.select(timeout);
if (ready > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = keyIterator.next();
keyIterator.remove();
acceptChannel(sk);
void processKey(SelectionKey selectionKey) {
NioServerSocketChannel serverChannel = (NioServerSocketChannel) selectionKey.attachment();
if (selectionKey.isAcceptable()) {
try {
eventHandler.acceptChannel(serverChannel);
} catch (IOException e) {
eventHandler.acceptException(serverChannel, e);
}
}
}
@Override
void preSelect() {
setUpNewServerChannels();
}
@Override
void cleanup() {
channelsToClose.addAll(newChannels);
@ -74,6 +75,7 @@ public class AcceptingSelector extends ESSelector {
/**
* Schedules a NioServerSocketChannel to be registered with this selector. The channel will by queued and
* eventually registered next time through the event loop.
*
* @param serverSocketChannel the channel to register
*/
public void scheduleForRegistration(NioServerSocketChannel serverSocketChannel) {
@ -82,7 +84,7 @@ public class AcceptingSelector extends ESSelector {
wakeup();
}
private void setUpNewServerChannels() throws ClosedChannelException {
private void setUpNewServerChannels() {
NioServerSocketChannel newChannel;
while ((newChannel = this.newChannels.poll()) != null) {
assert newChannel.getSelector() == this : "The channel must be registered with the selector with which it was created";
@ -101,23 +103,4 @@ public class AcceptingSelector extends ESSelector {
}
}
}
private void acceptChannel(SelectionKey sk) {
NioServerSocketChannel serverChannel = (NioServerSocketChannel) sk.attachment();
if (sk.isValid()) {
try {
if (sk.isAcceptable()) {
try {
eventHandler.acceptChannel(serverChannel);
} catch (IOException e) {
eventHandler.acceptException(serverChannel, e);
}
}
} catch (CancelledKeyException ex) {
eventHandler.genericServerChannelException(serverChannel, ex);
}
} else {
eventHandler.genericServerChannelException(serverChannel, new CancelledKeyException());
}
}
}

View File

@ -87,16 +87,4 @@ public class AcceptorEventHandler extends EventHandler {
logger.debug(() -> new ParameterizedMessage("exception while accepting new channel from server channel: {}",
nioServerChannel), exception);
}
/**
* This method is called when handling an event from a channel fails due to an unexpected exception.
* An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw
* {@link java.nio.channels.CancelledKeyException}.
*
* @param channel that caused the exception
* @param exception that was thrown
*/
void genericServerChannelException(NioServerSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while handling event for server channel: {}", channel), exception);
}
}

View File

@ -24,9 +24,12 @@ import org.elasticsearch.transport.nio.channel.NioChannel;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -40,8 +43,8 @@ import java.util.concurrent.locks.ReentrantLock;
* {@link #close()} is called. This instance handles closing of channels. Users should call
* {@link #queueChannelClose(NioChannel)} to schedule a channel for close by this selector.
* <p>
* Children of this class should implement the specific {@link #doSelect(int)} and {@link #cleanup()}
* functionality.
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
* {@link #preSelect()}, and {@link #cleanup()} functionality.
*/
public abstract class ESSelector implements Closeable {
@ -98,7 +101,26 @@ public abstract class ESSelector implements Closeable {
void singleLoop() {
try {
closePendingChannels();
doSelect(300);
preSelect();
int ready = selector.select(300);
if (ready > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = keyIterator.next();
keyIterator.remove();
if (sk.isValid()) {
try {
processKey(sk);
} catch (CancelledKeyException cke) {
eventHandler.genericChannelException((NioChannel) sk.attachment(), cke);
}
} else {
eventHandler.genericChannelException((NioChannel) sk.attachment(), new CancelledKeyException());
}
}
}
} catch (ClosedSelectorException e) {
if (isOpen()) {
throw e;
@ -117,13 +139,19 @@ public abstract class ESSelector implements Closeable {
}
/**
* Should implement the specific select logic. This will be called once per {@link #singleLoop()}
* Called by the base {@link ESSelector} class when there is a {@link SelectionKey} to be handled.
*
* @param timeout to pass to the raw select operation
* @throws IOException thrown by the raw select operation
* @throws ClosedSelectorException thrown if the raw selector is closed
* @param selectionKey the key to be handled
* @throws CancelledKeyException thrown when the key has already been cancelled
*/
abstract void doSelect(int timeout) throws IOException, ClosedSelectorException;
abstract void processKey(SelectionKey selectionKey) throws CancelledKeyException;
/**
* Called immediately prior to a raw {@link Selector#select()} call. Should be used to implement
* channel registration, handling queued writes, and other work that is not specifically processing
* a selection key.
*/
abstract void preSelect();
/**
* Called once as the selector is being closed.

View File

@ -88,4 +88,16 @@ public abstract class EventHandler {
void closeException(NioChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while closing channel: {}", channel), exception);
}
/**
* This method is called when handling an event from a channel fails due to an unexpected exception.
* An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw
* {@link java.nio.channels.CancelledKeyException}.
*
* @param channel that caused the exception
* @param exception that was thrown
*/
void genericChannelException(NioChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while handling event for channel: {}", channel), exception);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
import org.elasticsearch.transport.nio.channel.WriteContext;
@ -82,7 +83,6 @@ public class SocketEventHandler extends EventHandler {
void connectException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", channel), exception);
exceptionCaught(channel, exception);
}
/**
@ -144,9 +144,9 @@ public class SocketEventHandler extends EventHandler {
* @param channel that caused the exception
* @param exception that was thrown
*/
void genericChannelException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while handling event for socket channel: {}", channel), exception);
exceptionCaught(channel, exception);
void genericChannelException(NioChannel channel, Exception exception) {
super.genericChannelException(channel, exception);
exceptionCaught((NioSocketChannel) channel, exception);
}
private void exceptionCaught(NioSocketChannel channel, Exception e) {

View File

@ -24,13 +24,10 @@ import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
import org.elasticsearch.transport.nio.channel.WriteContext;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@ -54,15 +51,28 @@ public class SocketSelector extends ESSelector {
}
@Override
void doSelect(int timeout) throws IOException, ClosedSelectorException {
void processKey(SelectionKey selectionKey) {
NioSocketChannel nioSocketChannel = (NioSocketChannel) selectionKey.attachment();
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_CONNECT) != 0) {
attemptConnect(nioSocketChannel, true);
}
if (nioSocketChannel.isConnectComplete()) {
if ((ops & SelectionKey.OP_WRITE) != 0) {
handleWrite(nioSocketChannel);
}
if ((ops & SelectionKey.OP_READ) != 0) {
handleRead(nioSocketChannel);
}
}
}
@Override
void preSelect() {
setUpNewChannels();
handleQueuedWrites();
int ready = selector.select(timeout);
if (ready > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
processKeys(selectionKeys);
}
}
@Override
@ -122,38 +132,6 @@ public class SocketSelector extends ESSelector {
}
}
private void processKeys(Set<SelectionKey> selectionKeys) {
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = keyIterator.next();
keyIterator.remove();
NioSocketChannel nioSocketChannel = (NioSocketChannel) sk.attachment();
if (sk.isValid()) {
try {
int ops = sk.readyOps();
if ((ops & SelectionKey.OP_CONNECT) != 0) {
attemptConnect(nioSocketChannel, true);
}
if (nioSocketChannel.isConnectComplete()) {
if ((ops & SelectionKey.OP_WRITE) != 0) {
handleWrite(nioSocketChannel);
}
if ((ops & SelectionKey.OP_READ) != 0) {
handleRead(nioSocketChannel);
}
}
} catch (CancelledKeyException e) {
eventHandler.genericChannelException(nioSocketChannel, e);
}
} else {
eventHandler.genericChannelException(nioSocketChannel, new CancelledKeyException());
}
}
}
private void handleWrite(NioSocketChannel nioSocketChannel) {
try {
eventHandler.handleWrite(nioSocketChannel);

View File

@ -46,7 +46,6 @@ public class AcceptingSelectorTests extends ESTestCase {
private NioServerSocketChannel serverChannel;
private AcceptorEventHandler eventHandler;
private TestSelectionKey selectionKey;
private HashSet<SelectionKey> keySet = new HashSet<>();
@Before
public void setUp() throws Exception {
@ -64,14 +63,12 @@ public class AcceptingSelectorTests extends ESTestCase {
when(serverChannel.getSelectionKey()).thenReturn(selectionKey);
when(serverChannel.getSelector()).thenReturn(selector);
when(serverChannel.isOpen()).thenReturn(true);
when(rawSelector.selectedKeys()).thenReturn(keySet);
when(rawSelector.select(0)).thenReturn(1);
}
public void testRegisteredChannel() throws IOException, PrivilegedActionException {
selector.scheduleForRegistration(serverChannel);
selector.doSelect(0);
selector.preSelect();
verify(eventHandler).serverChannelRegistered(serverChannel);
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
@ -83,7 +80,7 @@ public class AcceptingSelectorTests extends ESTestCase {
when(serverChannel.isOpen()).thenReturn(false);
selector.scheduleForRegistration(serverChannel);
selector.doSelect(0);
selector.preSelect();
verify(eventHandler).registrationException(same(serverChannel), any(ClosedChannelException.class));
@ -98,7 +95,7 @@ public class AcceptingSelectorTests extends ESTestCase {
ClosedChannelException closedChannelException = new ClosedChannelException();
doThrow(closedChannelException).when(serverChannel).register();
selector.doSelect(0);
selector.preSelect();
verify(eventHandler).registrationException(serverChannel, closedChannelException);
@ -109,21 +106,19 @@ public class AcceptingSelectorTests extends ESTestCase {
public void testAcceptEvent() throws IOException {
selectionKey.setReadyOps(SelectionKey.OP_ACCEPT);
keySet.add(selectionKey);
selector.doSelect(0);
selector.processKey(selectionKey);
verify(eventHandler).acceptChannel(serverChannel);
}
public void testAcceptException() throws IOException {
selectionKey.setReadyOps(SelectionKey.OP_ACCEPT);
keySet.add(selectionKey);
IOException ioException = new IOException();
doThrow(ioException).when(eventHandler).acceptChannel(serverChannel);
selector.doSelect(0);
selector.processKey(selectionKey);
verify(eventHandler).acceptException(serverChannel, ioException);
}
@ -131,7 +126,7 @@ public class AcceptingSelectorTests extends ESTestCase {
public void testCleanup() throws IOException {
selector.scheduleForRegistration(serverChannel);
selector.doSelect(0);
selector.preSelect();
assertEquals(1, selector.getRegisteredChannels().size());

View File

@ -24,8 +24,12 @@ import org.elasticsearch.transport.nio.channel.NioChannel;
import org.junit.Before;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -34,12 +38,14 @@ public class ESSelectorTests extends ESTestCase {
private ESSelector selector;
private EventHandler handler;
private Selector rawSelector;
@Before
public void setUp() throws Exception {
super.setUp();
handler = mock(EventHandler.class);
selector = new TestSelector(handler);
rawSelector = mock(Selector.class);
selector = new TestSelector(handler, rawSelector);
}
public void testQueueChannelForClosed() throws IOException {
@ -61,9 +67,8 @@ public class ESSelectorTests extends ESTestCase {
}
public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {
((TestSelector) this.selector).setClosedSelectorException(new ClosedSelectorException());
boolean closedSelectorExceptionCaught = false;
when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException());
try {
this.selector.singleLoop();
} catch (ClosedSelectorException e) {
@ -75,7 +80,8 @@ public class ESSelectorTests extends ESTestCase {
public void testIOExceptionWhileSelect() throws IOException {
IOException ioException = new IOException();
((TestSelector) this.selector).setIOException(ioException);
when(rawSelector.select(anyInt())).thenThrow(ioException);
this.selector.singleLoop();
@ -84,35 +90,24 @@ public class ESSelectorTests extends ESTestCase {
private static class TestSelector extends ESSelector {
private ClosedSelectorException closedSelectorException;
private IOException ioException;
protected TestSelector(EventHandler eventHandler) throws IOException {
super(eventHandler);
TestSelector(EventHandler eventHandler, Selector selector) throws IOException {
super(eventHandler, selector);
}
@Override
void doSelect(int timeout) throws IOException, ClosedSelectorException {
if (closedSelectorException != null) {
throw closedSelectorException;
}
if (ioException != null) {
throw ioException;
}
void processKey(SelectionKey selectionKey) throws CancelledKeyException {
}
@Override
void preSelect() {
}
@Override
void cleanup() {
}
public void setClosedSelectorException(ClosedSelectorException exception) {
this.closedSelectorException = exception;
}
public void setIOException(IOException ioException) {
this.ioException = ioException;
}
}
}

View File

@ -53,7 +53,6 @@ public class SocketSelectorTests extends ESTestCase {
private NioSocketChannel channel;
private TestSelectionKey selectionKey;
private WriteContext writeContext;
private HashSet<SelectionKey> keySet = new HashSet<>();
private ActionListener<NioChannel> listener;
private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1]));
@ -72,8 +71,6 @@ public class SocketSelectorTests extends ESTestCase {
this.socketSelector = new SocketSelector(eventHandler, rawSelector);
this.socketSelector.setThread();
when(rawSelector.selectedKeys()).thenReturn(keySet);
when(rawSelector.select(0)).thenReturn(1);
when(channel.isOpen()).thenReturn(true);
when(channel.getSelectionKey()).thenReturn(selectionKey);
when(channel.getWriteContext()).thenReturn(writeContext);
@ -84,7 +81,7 @@ public class SocketSelectorTests extends ESTestCase {
public void testRegisterChannel() throws Exception {
socketSelector.scheduleForRegistration(channel);
socketSelector.doSelect(0);
socketSelector.preSelect();
verify(eventHandler).handleRegistration(channel);
@ -97,7 +94,7 @@ public class SocketSelectorTests extends ESTestCase {
when(channel.isOpen()).thenReturn(false);
socketSelector.scheduleForRegistration(channel);
socketSelector.doSelect(0);
socketSelector.preSelect();
verify(eventHandler).registrationException(same(channel), any(ClosedChannelException.class));
verify(channel, times(0)).finishConnect();
@ -113,7 +110,7 @@ public class SocketSelectorTests extends ESTestCase {
ClosedChannelException closedChannelException = new ClosedChannelException();
doThrow(closedChannelException).when(channel).register();
socketSelector.doSelect(0);
socketSelector.preSelect();
verify(eventHandler).registrationException(channel, closedChannelException);
verify(channel, times(0)).finishConnect();
@ -128,7 +125,7 @@ public class SocketSelectorTests extends ESTestCase {
when(channel.finishConnect()).thenReturn(true);
socketSelector.doSelect(0);
socketSelector.preSelect();
verify(eventHandler).handleConnect(channel);
}
@ -138,7 +135,7 @@ public class SocketSelectorTests extends ESTestCase {
when(channel.finishConnect()).thenReturn(false);
socketSelector.doSelect(0);
socketSelector.preSelect();
verify(eventHandler, times(0)).handleConnect(channel);
}
@ -156,7 +153,7 @@ public class SocketSelectorTests extends ESTestCase {
socketSelector.queueWrite(writeOperation);
when(channel.isWritable()).thenReturn(false);
socketSelector.doSelect(0);
socketSelector.preSelect();
verify(writeContext, times(0)).queueWriteOperations(writeOperation);
verify(listener).onFailure(any(ClosedChannelException.class));
@ -172,7 +169,7 @@ public class SocketSelectorTests extends ESTestCase {
when(channel.isWritable()).thenReturn(true);
when(channel.getSelectionKey()).thenReturn(selectionKey);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
socketSelector.doSelect(0);
socketSelector.preSelect();
verify(writeContext, times(0)).queueWriteOperations(writeOperation);
verify(listener).onFailure(cancelledKeyException);
@ -185,7 +182,7 @@ public class SocketSelectorTests extends ESTestCase {
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
when(channel.isWritable()).thenReturn(true);
socketSelector.doSelect(0);
socketSelector.preSelect();
verify(writeContext).queueWriteOperations(writeOperation);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
@ -219,42 +216,36 @@ public class SocketSelectorTests extends ESTestCase {
}
public void testConnectEvent() throws Exception {
keySet.add(selectionKey);
selectionKey.setReadyOps(SelectionKey.OP_CONNECT);
when(channel.finishConnect()).thenReturn(true);
socketSelector.doSelect(0);
socketSelector.processKey(selectionKey);
verify(eventHandler).handleConnect(channel);
}
public void testConnectEventFinishUnsuccessful() throws Exception {
keySet.add(selectionKey);
selectionKey.setReadyOps(SelectionKey.OP_CONNECT);
when(channel.finishConnect()).thenReturn(false);
socketSelector.doSelect(0);
socketSelector.processKey(selectionKey);
verify(eventHandler, times(0)).handleConnect(channel);
}
public void testConnectEventFinishThrowException() throws Exception {
keySet.add(selectionKey);
IOException ioException = new IOException();
selectionKey.setReadyOps(SelectionKey.OP_CONNECT);
when(channel.finishConnect()).thenThrow(ioException);
socketSelector.doSelect(0);
socketSelector.processKey(selectionKey);
verify(eventHandler, times(0)).handleConnect(channel);
verify(eventHandler).connectException(channel, ioException);
}
public void testWillNotConsiderWriteOrReadUntilConnectionComplete() throws Exception {
keySet.add(selectionKey);
IOException ioException = new IOException();
selectionKey.setReadyOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
@ -262,54 +253,48 @@ public class SocketSelectorTests extends ESTestCase {
doThrow(ioException).when(eventHandler).handleWrite(channel);
when(channel.isConnectComplete()).thenReturn(false);
socketSelector.doSelect(0);
socketSelector.processKey(selectionKey);
verify(eventHandler, times(0)).handleWrite(channel);
verify(eventHandler, times(0)).handleRead(channel);
}
public void testSuccessfulWriteEvent() throws Exception {
keySet.add(selectionKey);
selectionKey.setReadyOps(SelectionKey.OP_WRITE);
socketSelector.doSelect(0);
socketSelector.processKey(selectionKey);
verify(eventHandler).handleWrite(channel);
}
public void testWriteEventWithException() throws Exception {
keySet.add(selectionKey);
IOException ioException = new IOException();
selectionKey.setReadyOps(SelectionKey.OP_WRITE);
doThrow(ioException).when(eventHandler).handleWrite(channel);
socketSelector.doSelect(0);
socketSelector.processKey(selectionKey);
verify(eventHandler).writeException(channel, ioException);
}
public void testSuccessfulReadEvent() throws Exception {
keySet.add(selectionKey);
selectionKey.setReadyOps(SelectionKey.OP_READ);
socketSelector.doSelect(0);
socketSelector.processKey(selectionKey);
verify(eventHandler).handleRead(channel);
}
public void testReadEventWithException() throws Exception {
keySet.add(selectionKey);
IOException ioException = new IOException();
selectionKey.setReadyOps(SelectionKey.OP_READ);
doThrow(ioException).when(eventHandler).handleRead(channel);
socketSelector.doSelect(0);
socketSelector.processKey(selectionKey);
verify(eventHandler).readException(channel, ioException);
}
@ -319,7 +304,7 @@ public class SocketSelectorTests extends ESTestCase {
socketSelector.scheduleForRegistration(channel);
socketSelector.doSelect(0);
socketSelector.preSelect();
NetworkBytesReference networkBuffer = NetworkBytesReference.wrap(new BytesArray(new byte[1]));
socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener));