Optimize Selector Wakeups (#43515) (#43650)

* Use atomic boolean to guard wakeups
* Don't trigger wakeups from the select loops thread itself for registering and closing channels
* Don't needlessly queue writes

Co-authored-by:  Tim Brooks <tim@uncontended.net>
This commit is contained in:
Armin Braun 2019-06-26 20:00:42 +02:00 committed by GitHub
parent 87566c9324
commit c00e305d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 84 deletions

View File

@ -61,6 +61,7 @@ public class NioSelector implements Closeable {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final CompletableFuture<Void> isRunningFuture = new CompletableFuture<>();
private final AtomicReference<Thread> thread = new AtomicReference<>(null);
private final AtomicBoolean wokenUp = new AtomicBoolean(false);
public NioSelector(EventHandler eventHandler) throws IOException {
this(eventHandler, Selector.open());
@ -153,7 +154,7 @@ public class NioSelector implements Closeable {
preSelect();
long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
int ready;
if (nanosUntilNextTask == 0) {
if (wokenUp.getAndSet(false) || nanosUntilNextTask == 0) {
ready = selector.selectNow();
} else {
long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
@ -221,13 +222,10 @@ public class NioSelector implements Closeable {
if (selectionKey.isAcceptable()) {
assert context instanceof ServerChannelContext : "Only server channels can receive accept events";
ServerChannelContext serverChannelContext = (ServerChannelContext) context;
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_ACCEPT) != 0) {
try {
eventHandler.acceptChannel(serverChannelContext);
} catch (IOException e) {
eventHandler.acceptException(serverChannelContext, e);
}
try {
eventHandler.acceptChannel(serverChannelContext);
} catch (IOException e) {
eventHandler.acceptException(serverChannelContext, e);
}
} else {
assert context instanceof SocketChannelContext : "Only sockets channels can receive non-accept events";
@ -279,29 +277,36 @@ public class NioSelector implements Closeable {
/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes.
* api available for non-selector threads to schedule writes. When invoked from the selector thread the write will be executed
* right away.
*
* @param writeOperation to be queued
*/
public void queueWrite(WriteOperation writeOperation) {
queuedWrites.offer(writeOperation);
if (isOpen() == false) {
boolean wasRemoved = queuedWrites.remove(writeOperation);
if (wasRemoved) {
writeOperation.getListener().accept(null, new ClosedSelectorException());
}
if (isOnCurrentThread()) {
writeToChannel(writeOperation);
} else {
wakeup();
queuedWrites.offer(writeOperation);
if (isOpen() == false) {
boolean wasRemoved = queuedWrites.remove(writeOperation);
if (wasRemoved) {
writeOperation.getListener().accept(null, new ClosedSelectorException());
}
} else {
wakeup();
}
}
}
public void queueChannelClose(NioChannel channel) {
ChannelContext<?> context = channel.getContext();
assert context.getSelector() == this : "Must schedule a channel for closure with its selector";
channelsToClose.offer(context);
if (isOnCurrentThread() == false) {
channelsToClose.offer(context);
ensureSelectorOpenForEnqueuing(channelsToClose, context);
wakeup();
} else {
closeChannel(context);
}
}
@ -313,9 +318,13 @@ public class NioSelector implements Closeable {
*/
public void scheduleForRegistration(NioChannel channel) {
ChannelContext<?> context = channel.getContext();
channelsToRegister.add(context);
ensureSelectorOpenForEnqueuing(channelsToRegister, context);
wakeup();
if (isOnCurrentThread() == false) {
channelsToRegister.add(context);
ensureSelectorOpenForEnqueuing(channelsToRegister, context);
wakeup();
} else {
registerChannel(context);
}
}
/**
@ -326,7 +335,7 @@ public class NioSelector implements Closeable {
*
* @param writeOperation to be queued in a channel's buffer
*/
public void writeToChannel(WriteOperation writeOperation) {
private void writeToChannel(WriteOperation writeOperation) {
assertOnSelectorThread();
SocketChannelContext context = writeOperation.getChannel();
// If the channel does not currently have anything that is ready to flush, we should flush after
@ -380,8 +389,10 @@ public class NioSelector implements Closeable {
}
private void wakeup() {
// TODO: Do we need the wakeup optimizations that some other libraries use?
selector.wakeup();
assert isOnCurrentThread() == false;
if (wokenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
private void handleWrite(SocketChannelContext context) {
@ -414,30 +425,38 @@ public class NioSelector implements Closeable {
private void setUpNewChannels() {
ChannelContext<?> newChannel;
while ((newChannel = this.channelsToRegister.poll()) != null) {
assert newChannel.getSelector() == this : "The channel must be registered with the selector with which it was created";
try {
if (newChannel.isOpen()) {
eventHandler.handleRegistration(newChannel);
if (newChannel instanceof SocketChannelContext) {
attemptConnect((SocketChannelContext) newChannel, false);
}
} else {
eventHandler.registrationException(newChannel, new ClosedChannelException());
registerChannel(newChannel);
}
}
private void registerChannel(ChannelContext<?> newChannel) {
assert newChannel.getSelector() == this : "The channel must be registered with the selector with which it was created";
try {
if (newChannel.isOpen()) {
eventHandler.handleRegistration(newChannel);
if (newChannel instanceof SocketChannelContext) {
attemptConnect((SocketChannelContext) newChannel, false);
}
} catch (Exception e) {
eventHandler.registrationException(newChannel, e);
} else {
eventHandler.registrationException(newChannel, new ClosedChannelException());
}
} catch (Exception e) {
eventHandler.registrationException(newChannel, e);
}
}
private void closePendingChannels() {
ChannelContext<?> channelContext;
while ((channelContext = channelsToClose.poll()) != null) {
try {
eventHandler.handleClose(channelContext);
} catch (Exception e) {
eventHandler.closeException(channelContext, e);
}
closeChannel(channelContext);
}
}
private void closeChannel(final ChannelContext<?> channelContext) {
try {
eventHandler.handleClose(channelContext);
} catch (Exception e) {
eventHandler.closeException(channelContext, e);
}
}
@ -470,7 +489,7 @@ public class NioSelector implements Closeable {
* @param <O> the object type
*/
private <O> void ensureSelectorOpenForEnqueuing(ConcurrentLinkedQueue<O> queue, O objectAdded) {
if (isOpen() == false && isOnCurrentThread() == false) {
if (isOpen() == false) {
if (queue.remove(objectAdded)) {
throw new IllegalStateException("selector is already closed");
}

View File

@ -129,13 +129,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
WriteOperation writeOperation = readWriteHandler.createWriteOperation(this, message, listener);
NioSelector selector = getSelector();
if (selector.isOnCurrentThread() == false) {
selector.queueWrite(writeOperation);
return;
}
selector.writeToChannel(writeOperation);
getSelector().queueWrite(writeOperation);
}
public void queueWriteOperation(WriteOperation writeOperation) {
@ -269,7 +263,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
// Currently we limit to 64KB. This is a trade-off which means more syscalls, in exchange for less
// copying.
private final int WRITE_LIMIT = 1 << 16;
private static final int WRITE_LIMIT = 1 << 16;
protected int flushToChannel(FlushOperation flushOperation) throws IOException {
ByteBuffer ioBuffer = getSelector().getIoBuffer();

View File

@ -19,7 +19,9 @@
package org.elasticsearch.nio;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
@ -108,14 +110,14 @@ public class NioSelectorTests extends ESTestCase {
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testCloseException() throws IOException {
public void testCloseException() throws IOException, InterruptedException {
IOException ioException = new IOException();
NioChannel channel = mock(NioChannel.class);
ChannelContext context = mock(ChannelContext.class);
when(channel.getContext()).thenReturn(context);
when(context.getSelector()).thenReturn(selector);
selector.queueChannelClose(channel);
executeOnNewThread(() -> selector.queueChannelClose(channel));
doThrow(ioException).when(eventHandler).handleClose(context);
@ -198,9 +200,10 @@ public class NioSelectorTests extends ESTestCase {
verify(eventHandler).selectorException(ioException);
}
public void testSelectorClosedIfOpenAndEventLoopNotRunning() throws IOException {
public void testSelectorClosedIfOpenAndEventLoopNotRunning() throws Exception {
when(rawSelector.isOpen()).thenReturn(true);
selector.close();
executeOnNewThread(() -> selector.close());
verify(rawSelector).close();
}
@ -222,8 +225,7 @@ public class NioSelectorTests extends ESTestCase {
}
public void testRegisterServerChannelFailsDueToException() throws Exception {
selector.scheduleForRegistration(serverChannel);
executeOnNewThread(() -> selector.scheduleForRegistration(serverChannel));
ClosedChannelException closedChannelException = new ClosedChannelException();
doThrow(closedChannelException).when(eventHandler).handleRegistration(serverChannelContext);
@ -242,16 +244,18 @@ public class NioSelectorTests extends ESTestCase {
verify(eventHandler, times(0)).handleConnect(channelContext);
}
public void testRegisterSocketChannelFailsDueToException() throws Exception {
selector.scheduleForRegistration(channel);
public void testRegisterSocketChannelFailsDueToException() throws InterruptedException {
executeOnNewThread(() -> {
selector.scheduleForRegistration(channel);
ClosedChannelException closedChannelException = new ClosedChannelException();
doThrow(closedChannelException).when(eventHandler).handleRegistration(channelContext);
ClosedChannelException closedChannelException = new ClosedChannelException();
doThrow(closedChannelException).when(eventHandler).handleRegistration(channelContext);
selector.preSelect();
selector.preSelect();
verify(eventHandler).registrationException(channelContext, closedChannelException);
verify(eventHandler, times(0)).handleConnect(channelContext);
verify(eventHandler).registrationException(channelContext, closedChannelException);
verify(eventHandler, times(0)).handleConnect(channelContext);
});
}
public void testAcceptEvent() throws IOException {
@ -292,17 +296,17 @@ public class NioSelectorTests extends ESTestCase {
}
public void testQueueWriteWhenNotRunning() throws Exception {
selector.close();
selector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener));
executeOnNewThread(() -> {
selector.close();
selector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener));
});
verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class));
}
public void testQueueWriteChannelIsClosed() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
selector.queueWrite(writeOperation);
executeOnNewThread(() -> selector.queueWrite(writeOperation));
when(channelContext.isOpen()).thenReturn(false);
selector.preSelect();
@ -315,7 +319,7 @@ public class NioSelectorTests extends ESTestCase {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
CancelledKeyException cancelledKeyException = new CancelledKeyException();
selector.queueWrite(writeOperation);
executeOnNewThread(() -> selector.queueWrite(writeOperation));
when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
@ -327,7 +331,7 @@ public class NioSelectorTests extends ESTestCase {
public void testQueueWriteSuccessful() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
selector.queueWrite(writeOperation);
executeOnNewThread(() -> selector.queueWrite(writeOperation));
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
@ -343,7 +347,7 @@ public class NioSelectorTests extends ESTestCase {
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
when(channelContext.readyForFlush()).thenReturn(true);
selector.writeToChannel(writeOperation);
selector.queueWrite(writeOperation);
verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
@ -357,7 +361,7 @@ public class NioSelectorTests extends ESTestCase {
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
when(channelContext.readyForFlush()).thenReturn(false);
selector.writeToChannel(writeOperation);
selector.queueWrite(writeOperation);
verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler).handleWrite(channelContext);
@ -374,7 +378,7 @@ public class NioSelectorTests extends ESTestCase {
when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(channelContext.readyForFlush()).thenReturn(false);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
selector.writeToChannel(writeOperation);
selector.queueWrite(writeOperation);
verify(channelContext, times(0)).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
@ -477,14 +481,17 @@ public class NioSelectorTests extends ESTestCase {
public void testCleanup() throws Exception {
NioSocketChannel unregisteredChannel = mock(NioSocketChannel.class);
SocketChannelContext unregisteredContext = mock(SocketChannelContext.class);
when(unregisteredContext.getSelector()).thenReturn(selector);
when(unregisteredChannel.getContext()).thenReturn(unregisteredContext);
selector.scheduleForRegistration(channel);
executeOnNewThread(() -> selector.scheduleForRegistration(channel));
selector.preSelect();
selector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener));
selector.scheduleForRegistration(unregisteredChannel);
executeOnNewThread(() -> {
selector.queueWrite(new FlushReadyWrite(channelContext, buffers, listener));
selector.scheduleForRegistration(unregisteredChannel);
});
TestSelectionKey testSelectionKey = new TestSelectionKey(0);
testSelectionKey.attach(channelContext);
@ -496,4 +503,20 @@ public class NioSelectorTests extends ESTestCase {
verify(eventHandler).handleClose(channelContext);
verify(eventHandler).handleClose(unregisteredContext);
}
private static void executeOnNewThread(CheckedRunnable<Exception> runnable) throws InterruptedException {
final Thread thread = new Thread(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
runnable.run();
}
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
});
thread.start();
thread.join();
}
}

View File

@ -175,7 +175,7 @@ public class SocketChannelContextTests extends ESTestCase {
when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
context.sendMessage(buffers, listener);
verify(selector).writeToChannel(writeOpCaptor.capture());
verify(selector).queueWrite(writeOpCaptor.capture());
WriteOperation writeOp = writeOpCaptor.getValue();
assertSame(writeOperation, writeOp);

View File

@ -180,12 +180,7 @@ public final class SSLChannelContext extends SocketChannelContext {
public void closeChannel() {
if (isClosing.compareAndSet(false, true)) {
WriteOperation writeOperation = new CloseNotifyOperation(this);
NioSelector selector = getSelector();
if (selector.isOnCurrentThread() == false) {
selector.queueWrite(writeOperation);
return;
}
selector.writeToChannel(writeOperation);
getSelector().queueWrite(writeOperation);
}
}

View File

@ -311,7 +311,7 @@ public class SSLChannelContextTests extends ESTestCase {
context.closeChannel();
ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
verify(selector).writeToChannel(captor.capture());
verify(selector).queueWrite(captor.capture());
ArgumentCaptor<Runnable> taskCaptor = ArgumentCaptor.forClass(Runnable.class);
Runnable cancellable = mock(Runnable.class);
@ -333,7 +333,7 @@ public class SSLChannelContextTests extends ESTestCase {
context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer);
context.closeChannel();
ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
verify(selector).writeToChannel(captor.capture());
verify(selector).queueWrite(captor.capture());
ArgumentCaptor<Runnable> taskCaptor = ArgumentCaptor.forClass(Runnable.class);
Runnable cancellable = mock(Runnable.class);
when(nioTimer.scheduleAtRelativeTime(taskCaptor.capture(), anyLong())).thenReturn(cancellable);
@ -360,7 +360,7 @@ public class SSLChannelContextTests extends ESTestCase {
context.closeChannel();
ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
verify(selector).writeToChannel(captor.capture());
verify(selector).queueWrite(captor.capture());
context.queueWriteOperation(captor.getValue());
verify(sslDriver).initiateClose();