Serialize Outbound Message on Flush (#57084) (#57682)

Follow up to #56961:

We can be a little more efficient than just serializing at the IO loop by serializing
only when we flush to a channel. This has the advantage that we don't serialize a long
queue of messages for a channel that isn't writable for a longer period of time (unstable network,
actually writing large volumes of data, etc.).
Also, this further reduces the time for which we hold on to the write buffer for a message,
making allocations because of an empty page cache recycler pool less likely.
This commit is contained in:
Armin Braun 2020-06-04 18:06:13 +02:00 committed by GitHub
parent 80d1b12fa3
commit 24779c80f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 27 additions and 16 deletions

View File

@ -90,15 +90,14 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
assert msg instanceof OutboundHandler.SendContext;
final boolean queued = queuedWrites.offer(
new WriteOperation(Netty4Utils.toByteBuf(((OutboundHandler.SendContext) msg).get()), promise));
final boolean queued = queuedWrites.offer(new WriteOperation((OutboundHandler.SendContext) msg, promise));
assert queued;
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws IOException {
if (ctx.channel().isWritable()) {
doFlush(ctx);
}
@ -106,7 +105,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
}
@Override
public void flush(ChannelHandlerContext ctx) {
public void flush(ChannelHandlerContext ctx) throws IOException {
Channel channel = ctx.channel();
if (channel.isWritable() || channel.isActive() == false) {
doFlush(ctx);
@ -120,7 +119,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
super.channelInactive(ctx);
}
private void doFlush(ChannelHandlerContext ctx) {
private void doFlush(ChannelHandlerContext ctx) throws IOException {
assert ctx.executor().inEventLoop();
final Channel channel = ctx.channel();
if (channel.isActive() == false) {
@ -138,24 +137,25 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
break;
}
final WriteOperation write = currentWrite;
if (write.buf.readableBytes() == 0) {
final ByteBuf currentBuffer = write.buffer();
if (currentBuffer.readableBytes() == 0) {
write.promise.trySuccess();
currentWrite = null;
continue;
}
final int readableBytes = write.buf.readableBytes();
final int readableBytes = currentBuffer.readableBytes();
final int bufferSize = Math.min(readableBytes, 1 << 18);
final int readerIndex = write.buf.readerIndex();
final int readerIndex = currentBuffer.readerIndex();
final boolean sliced = readableBytes != bufferSize;
final ByteBuf writeBuffer;
if (sliced) {
writeBuffer = write.buf.retainedSlice(readerIndex, bufferSize);
write.buf.readerIndex(readerIndex + bufferSize);
writeBuffer = currentBuffer.retainedSlice(readerIndex, bufferSize);
currentBuffer.readerIndex(readerIndex + bufferSize);
} else {
writeBuffer = write.buf;
writeBuffer = currentBuffer;
}
final ChannelFuture writeFuture = ctx.write(writeBuffer);
if (sliced == false || write.buf.readableBytes() == 0) {
if (sliced == false || currentBuffer.readableBytes() == 0) {
currentWrite = null;
writeFuture.addListener(future -> {
assert ctx.executor().inEventLoop();
@ -190,13 +190,24 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private static final class WriteOperation {
private final ByteBuf buf;
private ByteBuf buf;
private OutboundHandler.SendContext context;
private final ChannelPromise promise;
WriteOperation(ByteBuf buf, ChannelPromise promise) {
this.buf = buf;
WriteOperation(OutboundHandler.SendContext context, ChannelPromise promise) {
this.context = context;
this.promise = promise;
}
ByteBuf buffer() throws IOException {
if (buf == null) {
buf = Netty4Utils.toByteBuf(context.get());
context = null;
}
assert context == null;
return buf;
}
}
}