Fix bug in copying bytes for socket write (#45463)

Currently we take the array of nio buffers from the netty channel
outbound buffer and copy their bytes to a direct buffer. In the process
we mutate the nio buffer positions. It seems like netty will continue to
reuse these buffers. This means than any data that is not flushed in a
call is lost. This commit fixes this by incrementing the positions after
the flush has completed. This is similar to the behavior that
SocketChannel would have provided and netty relied upon.

Fixes #45444.
This commit is contained in:
Tim Brooks 2019-08-12 17:44:54 -04:00
parent dc1856ca53
commit ae06a9399a
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77

View File

@ -42,8 +42,6 @@ import org.elasticsearch.common.SuppressForbidden;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Objects;
import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD; import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
@ -89,7 +87,6 @@ public class CopyBytesSocketChannel extends NioSocketChannel {
// Ensure the pending writes are made of ByteBufs only. // Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = writeConfig.getMaxBytesPerGatheringWrite(); int maxBytesPerGatheringWrite = writeConfig.getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
assert Arrays.stream(nioBuffers).filter(Objects::nonNull).noneMatch(ByteBuffer::isDirect) : "Expected all to be heap buffers";
int nioBufferCnt = in.nioBufferCount(); int nioBufferCnt = in.nioBufferCount();
if (nioBufferCnt == 0) {// We have something else beside ByteBuffers to write so fallback to normal writes. if (nioBufferCnt == 0) {// We have something else beside ByteBuffers to write so fallback to normal writes.
@ -108,6 +105,7 @@ public class CopyBytesSocketChannel extends NioSocketChannel {
return; return;
} }
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
setWrittenBytes(nioBuffers, localWrittenBytes);
in.removeBytes(localWrittenBytes); in.removeBytes(localWrittenBytes);
--writeSpinCount; --writeSpinCount;
} }
@ -151,11 +149,18 @@ public class CopyBytesSocketChannel extends NioSocketChannel {
private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) { private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) {
for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) { for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) {
ByteBuffer buffer = source[i]; ByteBuffer buffer = source[i];
assert buffer.hasArray() : "Buffer must have heap array";
int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining()); int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining());
int initialLimit = buffer.limit(); destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy);
buffer.limit(buffer.position() + nBytesToCopy); }
destination.put(buffer); }
buffer.limit(initialLimit);
private static void setWrittenBytes(ByteBuffer[] source, int bytesWritten) {
for (int i = 0; bytesWritten > 0; i++) {
ByteBuffer buffer = source[i];
int nBytes = Math.min(buffer.remaining(), bytesWritten);
buffer.position(buffer.position() + nBytes);
bytesWritten = bytesWritten - nBytes;
} }
} }