Optimize FilterStreamInput for Network Reads (#52395) (#52403)

When `FilterStreamInput` wraps a Netty `ByteBuf` based stream it
did not forward the bulk primitive reads to the delegate.
These are optimized on the delegate but if they're not forwarded
then the delegate will be called e.g. 4 times to read an `int`.
This happens for essentially all network reads prior to this
change because they all run from a `NamedWritableAwareStreamInput`.

This also required optimising `BufferedChecksumStreamInput` individually to use bulk reads from the buffer because it implicitly assumed that the filter stream input  wouldn't override any of the bulk operations.
This commit is contained in:
Armin Braun 2020-02-17 13:07:20 +01:00 committed by GitHub
parent 81e47e9cab
commit cc628748e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 0 deletions

View File

@ -45,6 +45,21 @@ public abstract class FilterStreamInput extends StreamInput {
delegate.readBytes(b, offset, len);
}
@Override
public short readShort() throws IOException {
return delegate.readShort();
}
@Override
public int readInt() throws IOException {
return delegate.readInt();
}
@Override
public long readLong() throws IOException {
return delegate.readLong();
}
@Override
public void reset() throws IOException {
delegate.reset();

View File

@ -70,6 +70,30 @@ public final class BufferedChecksumStreamInput extends FilterStreamInput {
digest.update(b, offset, len);
}
private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[8]);
@Override
public short readShort() throws IOException {
final byte[] buf = buffer.get();
readBytes(buf, 0, 2);
return (short) (((buf[0] & 0xFF) << 8) | (buf[1] & 0xFF));
}
@Override
public int readInt() throws IOException {
final byte[] buf = buffer.get();
readBytes(buf, 0, 4);
return ((buf[0] & 0xFF) << 24) | ((buf[1] & 0xFF) << 16) | ((buf[2] & 0xFF) << 8) | (buf[3] & 0xFF);
}
@Override
public long readLong() throws IOException {
final byte[] buf = buffer.get();
readBytes(buf, 0, 8);
return (((long) (((buf[0] & 0xFF) << 24) | ((buf[1] & 0xFF) << 16) | ((buf[2] & 0xFF) << 8) | (buf[3] & 0xFF))) << 32)
| ((((buf[4] & 0xFF) << 24) | ((buf[5] & 0xFF) << 16) | ((buf[6] & 0xFF) << 8) | (buf[7] & 0xFF)) & 0xFFFFFFFFL);
}
@Override
public void reset() throws IOException {
delegate.reset();