Immediately flush channel after writing to buffer (#31301)

This is related to #27260. Currently when we queue a write with a
channel we set OP_WRITE and wait until the next selection loop to flush
the write. However, if the channel does not have a pending write, it
is probably ready to flush. This PR implements an optimistic flush logic
that will attempt this flush.
This commit is contained in:
Tim Brooks 2018-06-13 15:32:13 -06:00 committed by GitHub
parent 509729f9c1
commit 700357d04e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 54 additions and 30 deletions

View File

@ -66,7 +66,7 @@ public abstract class ChannelContext<S extends SelectableChannel & NetworkChanne
* @throws IOException during channel / context close * @throws IOException during channel / context close
*/ */
public void closeFromSelector() throws IOException { public void closeFromSelector() throws IOException {
if (closeContext.isDone() == false) { if (isOpen()) {
try { try {
rawChannel.close(); rawChannel.close();
closeContext.complete(null); closeContext.complete(null);

View File

@ -159,8 +159,7 @@ public class EventHandler {
} }
/** /**
* This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a * This method is called after events (READ, WRITE, CONNECT) have been handled for a channel.
* channel.
* *
* @param context that was handled * @param context that was handled
*/ */

View File

@ -43,9 +43,6 @@ import java.util.stream.Collectors;
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing * {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by * of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
* this selector. * this selector.
* <p>
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
* {@link #preSelect()}, and {@link #cleanup()} functionality.
*/ */
public class NioSelector implements Closeable { public class NioSelector implements Closeable {
@ -65,7 +62,7 @@ public class NioSelector implements Closeable {
this(eventHandler, Selector.open()); this(eventHandler, Selector.open());
} }
public NioSelector(EventHandler eventHandler, Selector selector) throws IOException { public NioSelector(EventHandler eventHandler, Selector selector) {
this.selector = selector; this.selector = selector;
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
} }
@ -165,7 +162,7 @@ public class NioSelector implements Closeable {
} }
void cleanupAndCloseChannels() { void cleanupAndCloseChannels() {
cleanup(); cleanupPendingWrites();
channelsToClose.addAll(channelsToRegister); channelsToClose.addAll(channelsToRegister);
channelsToRegister.clear(); channelsToRegister.clear();
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList())); channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
@ -234,16 +231,6 @@ public class NioSelector implements Closeable {
handleQueuedWrites(); handleQueuedWrites();
} }
/**
* Called once as the selector is being closed.
*/
void cleanup() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}
/** /**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the * Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes. * api available for non-selector threads to schedule writes.
@ -284,20 +271,31 @@ public class NioSelector implements Closeable {
} }
/** /**
* Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed * Queues a write operation directly in a channel's buffer. If this channel does not have pending writes
* by the selector thread. As a result, this method should only be called by the selector thread. * already, the channel will be flushed. Channel buffers are only safe to be accessed by the selector
* thread. As a result, this method should only be called by the selector thread. If this channel does
* not have pending writes already, the channel will be flushed.
* *
* @param writeOperation to be queued in a channel's buffer * @param writeOperation to be queued in a channel's buffer
*/ */
public void queueWriteInChannelBuffer(WriteOperation writeOperation) { public void writeToChannel(WriteOperation writeOperation) {
assertOnSelectorThread(); assertOnSelectorThread();
SocketChannelContext context = writeOperation.getChannel(); SocketChannelContext context = writeOperation.getChannel();
// If the channel does not currently have anything that is ready to flush, we should flush after
// the write operation is queued.
boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
try { try {
SelectionKeyUtils.setWriteInterested(context.getSelectionKey()); SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
context.queueWriteOperation(writeOperation); context.queueWriteOperation(writeOperation);
} catch (Exception e) { } catch (Exception e) {
shouldFlushAfterQueuing = false;
executeFailedListener(writeOperation.getListener(), e); executeFailedListener(writeOperation.getListener(), e);
} }
if (shouldFlushAfterQueuing) {
handleWrite(context);
eventHandler.postHandling(context);
}
} }
/** /**
@ -332,6 +330,13 @@ public class NioSelector implements Closeable {
} }
} }
private void cleanupPendingWrites() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}
private void wakeup() { private void wakeup() {
// TODO: Do we need the wakeup optimizations that some other libraries use? // TODO: Do we need the wakeup optimizations that some other libraries use?
selector.wakeup(); selector.wakeup();
@ -394,7 +399,7 @@ public class NioSelector implements Closeable {
WriteOperation writeOperation; WriteOperation writeOperation;
while ((writeOperation = queuedWrites.poll()) != null) { while ((writeOperation = queuedWrites.poll()) != null) {
if (writeOperation.getChannel().isOpen()) { if (writeOperation.getChannel().isOpen()) {
queueWriteInChannelBuffer(writeOperation); writeToChannel(writeOperation);
} else { } else {
executeFailedListener(writeOperation.getListener(), new ClosedChannelException()); executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
} }

View File

@ -135,7 +135,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
return; return;
} }
selector.queueWriteInChannelBuffer(writeOperation); selector.writeToChannel(writeOperation);
} }
public void queueWriteOperation(WriteOperation writeOperation) { public void queueWriteOperation(WriteOperation writeOperation) {
@ -164,7 +164,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
@Override @Override
public void closeFromSelector() throws IOException { public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread(); getSelector().assertOnSelectorThread();
if (channel.isOpen()) { if (isOpen()) {
ArrayList<IOException> closingExceptions = new ArrayList<>(3); ArrayList<IOException> closingExceptions = new ArrayList<>(3);
try { try {
super.closeFromSelector(); super.closeFromSelector();

View File

@ -262,11 +262,28 @@ public class NioSelectorTests extends ESTestCase {
public void testQueueDirectlyInChannelBufferSuccessful() throws Exception { public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0); assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
selector.queueWriteInChannelBuffer(writeOperation); when(channelContext.readyForFlush()).thenReturn(true);
selector.writeToChannel(writeOperation);
verify(channelContext).queueWriteOperation(writeOperation); verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).postHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
}
public void testShouldFlushIfNoPendingFlushes() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
when(channelContext.readyForFlush()).thenReturn(false);
selector.writeToChannel(writeOperation);
verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler).handleWrite(channelContext);
verify(eventHandler).postHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
} }
@ -277,10 +294,13 @@ public class NioSelectorTests extends ESTestCase {
CancelledKeyException cancelledKeyException = new CancelledKeyException(); CancelledKeyException cancelledKeyException = new CancelledKeyException();
when(channelContext.getSelectionKey()).thenReturn(selectionKey); when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(channelContext.readyForFlush()).thenReturn(false);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException); when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
selector.queueWriteInChannelBuffer(writeOperation); selector.writeToChannel(writeOperation);
verify(channelContext, times(0)).queueWriteOperation(writeOperation); verify(channelContext, times(0)).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).postHandling(channelContext);
verify(listener).accept(null, cancelledKeyException); verify(listener).accept(null, cancelledKeyException);
} }

View File

@ -170,7 +170,7 @@ public class SocketChannelContextTests extends ESTestCase {
when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation); when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
context.sendMessage(buffers, listener); context.sendMessage(buffers, listener);
verify(selector).queueWriteInChannelBuffer(writeOpCaptor.capture()); verify(selector).writeToChannel(writeOpCaptor.capture());
WriteOperation writeOp = writeOpCaptor.getValue(); WriteOperation writeOp = writeOpCaptor.getValue();
assertSame(writeOperation, writeOp); assertSame(writeOperation, writeOp);

View File

@ -145,7 +145,7 @@ public final class SSLChannelContext extends SocketChannelContext {
selector.queueWrite(writeOperation); selector.queueWrite(writeOperation);
return; return;
} }
selector.queueWriteInChannelBuffer(writeOperation); selector.writeToChannel(writeOperation);
} }
} }

View File

@ -345,7 +345,7 @@ public class SSLChannelContextTests extends ESTestCase {
context.closeChannel(); context.closeChannel();
ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class); ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
verify(selector).queueWriteInChannelBuffer(captor.capture()); verify(selector).writeToChannel(captor.capture());
context.queueWriteOperation(captor.getValue()); context.queueWriteOperation(captor.getValue());
verify(sslDriver).initiateClose(); verify(sslDriver).initiateClose();