From ae06a9399a01e36b75db6ba4deb4bded6e694ee9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 12 Aug 2019 17:44:54 -0400 Subject: [PATCH] Fix bug in copying bytes for socket write (#45463) Currently we take the array of nio buffers from the netty channel outbound buffer and copy their bytes to a direct buffer. In the process we mutate the nio buffer positions. It seems like netty will continue to reuse these buffers. This means than any data that is not flushed in a call is lost. This commit fixes this by incrementing the positions after the flush has completed. This is similar to the behavior that SocketChannel would have provided and netty relied upon. Fixes #45444. --- .../transport/CopyBytesSocketChannel.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/CopyBytesSocketChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/CopyBytesSocketChannel.java index 4eb89dbb3bd..dd7ba056010 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/CopyBytesSocketChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/CopyBytesSocketChannel.java @@ -42,8 +42,6 @@ import org.elasticsearch.common.SuppressForbidden; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.util.Arrays; -import java.util.Objects; import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD; @@ -89,7 +87,6 @@ public class CopyBytesSocketChannel extends NioSocketChannel { // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = writeConfig.getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); - assert Arrays.stream(nioBuffers).filter(Objects::nonNull).noneMatch(ByteBuffer::isDirect) : "Expected all to be heap buffers"; int nioBufferCnt = in.nioBufferCount(); if (nioBufferCnt == 0) {// We have something else beside ByteBuffers to write so fallback to normal writes. @@ -108,6 +105,7 @@ public class CopyBytesSocketChannel extends NioSocketChannel { return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); + setWrittenBytes(nioBuffers, localWrittenBytes); in.removeBytes(localWrittenBytes); --writeSpinCount; } @@ -151,11 +149,18 @@ public class CopyBytesSocketChannel extends NioSocketChannel { private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) { for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) { ByteBuffer buffer = source[i]; + assert buffer.hasArray() : "Buffer must have heap array"; int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining()); - int initialLimit = buffer.limit(); - buffer.limit(buffer.position() + nBytesToCopy); - destination.put(buffer); - buffer.limit(initialLimit); + destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy); + } + } + + private static void setWrittenBytes(ByteBuffer[] source, int bytesWritten) { + for (int i = 0; bytesWritten > 0; i++) { + ByteBuffer buffer = source[i]; + int nBytes = Math.min(buffer.remaining(), bytesWritten); + buffer.position(buffer.position() + nBytes); + bytesWritten = bytesWritten - nBytes; } }