From cc628748e10569abc2bc57e65f315145440a8c5a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 17 Feb 2020 13:07:20 +0100 Subject: [PATCH] Optimize FilterStreamInput for Network Reads (#52395) (#52403) When `FilterStreamInput` wraps a Netty `ByteBuf` based stream it did not forward the bulk primitive reads to the delegate. These are optimized on the delegate but if they're not forwarded then the delegate will be called e.g. 4 times to read an `int`. This happens for essentially all network reads prior to this change because they all run from a `NamedWritableAwareStreamInput`. This also required optimising `BufferedChecksumStreamInput` individually to use bulk reads from the buffer because it implicitly assumed that the filter stream input wouldn't override any of the bulk operations. --- .../common/io/stream/FilterStreamInput.java | 15 ++++++++++++ .../translog/BufferedChecksumStreamInput.java | 24 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java index 1a3f9fe601d..2fa700634b8 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java @@ -45,6 +45,21 @@ public abstract class FilterStreamInput extends StreamInput { delegate.readBytes(b, offset, len); } + @Override + public short readShort() throws IOException { + return delegate.readShort(); + } + + @Override + public int readInt() throws IOException { + return delegate.readInt(); + } + + @Override + public long readLong() throws IOException { + return delegate.readLong(); + } + @Override public void reset() throws IOException { delegate.reset(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java b/server/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java index 8e815d3599a..fa37991d82c 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java +++ b/server/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java @@ -70,6 +70,30 @@ public final class BufferedChecksumStreamInput extends FilterStreamInput { digest.update(b, offset, len); } + private static final ThreadLocal buffer = ThreadLocal.withInitial(() -> new byte[8]); + + @Override + public short readShort() throws IOException { + final byte[] buf = buffer.get(); + readBytes(buf, 0, 2); + return (short) (((buf[0] & 0xFF) << 8) | (buf[1] & 0xFF)); + } + + @Override + public int readInt() throws IOException { + final byte[] buf = buffer.get(); + readBytes(buf, 0, 4); + return ((buf[0] & 0xFF) << 24) | ((buf[1] & 0xFF) << 16) | ((buf[2] & 0xFF) << 8) | (buf[3] & 0xFF); + } + + @Override + public long readLong() throws IOException { + final byte[] buf = buffer.get(); + readBytes(buf, 0, 8); + return (((long) (((buf[0] & 0xFF) << 24) | ((buf[1] & 0xFF) << 16) | ((buf[2] & 0xFF) << 8) | (buf[3] & 0xFF))) << 32) + | ((((buf[4] & 0xFF) << 24) | ((buf[5] & 0xFF) << 16) | ((buf[6] & 0xFF) << 8) | (buf[7] & 0xFF)) & 0xFFFFFFFFL); + } + @Override public void reset() throws IOException { delegate.reset();