From 1e3edbbe745c27daae76eba5919d831a2864a038 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 5 Sep 2020 10:45:52 +0200 Subject: [PATCH] Simplify BytesReference StreamInput (#61681) (#62014) Flattening both streams into a single stream here saves a few objects and some indirection. Also, removed the redundant `offset` field which added nothing but complexity by forcing the incrementation of two counters on every read. --- .../common/bytes/AbstractBytesReference.java | 154 +++++++++++++----- .../bytes/BytesReferenceStreamInput.java | 144 ---------------- 2 files changed, 110 insertions(+), 188 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java diff --git a/server/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReference.java index 8c9ef67a0b5..5ca0d06ad5a 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReference.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.function.ToIntBiFunction; @@ -51,7 +50,7 @@ public abstract class AbstractBytesReference implements BytesReference { @Override public StreamInput streamInput() throws IOException { - return new MarkSupportingStreamInputWrapper(this); + return new BytesReferenceStreamInput(); } @Override @@ -181,61 +180,139 @@ public abstract class AbstractBytesReference implements BytesReference { ref.offset += length; } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + BytesRef bytes = toBytesRef(); + return builder.value(bytes.bytes, bytes.offset, bytes.length); + } + /** - * Instead of adding the complexity of {@link InputStream#reset()} etc to the actual impl - * this wrapper builds it on top of the BytesReferenceStreamInput which is much simpler - * that way. + * A StreamInput that reads off a {@link BytesRefIterator}. This is used to provide + * generic stream access to {@link BytesReference} instances without materializing the + * underlying bytes reference. */ - private static final class MarkSupportingStreamInputWrapper extends StreamInput { - // can't use FilterStreamInput it needs to reset the delegate - private final BytesReference reference; - private BytesReferenceStreamInput input; + private final class BytesReferenceStreamInput extends StreamInput { + + private BytesRefIterator iterator; + private int sliceIndex; + private BytesRef slice; + private int sliceStartOffset; // the offset on the stream at which the current slice starts + private int mark = 0; - private MarkSupportingStreamInputWrapper(BytesReference reference) throws IOException { - this.reference = reference; - this.input = new BytesReferenceStreamInput(reference.iterator(), reference.length()); + BytesReferenceStreamInput() throws IOException { + this.iterator = iterator(); + this.slice = iterator.next(); + this.sliceStartOffset = 0; + this.sliceIndex = 0; } @Override public byte readByte() throws IOException { - return input.readByte(); + if (offset() >= length()) { + throw new EOFException(); + } + maybeNextSlice(); + return slice.bytes[slice.offset + (sliceIndex++)]; + } + + private int offset() { + return sliceStartOffset + sliceIndex; + } + + private void maybeNextSlice() throws IOException { + while (sliceIndex == slice.length) { + sliceStartOffset += sliceIndex; + slice = iterator.next(); + sliceIndex = 0; + if (slice == null) { + throw new EOFException(); + } + } } @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - input.readBytes(b, offset, len); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return input.read(b, off, len); - } - - @Override - public void close() throws IOException { - input.close(); + public void readBytes(byte[] b, int bOffset, int len) throws IOException { + final int length = length(); + final int offset = offset(); + if (offset + len > length) { + throw new IndexOutOfBoundsException( + "Cannot read " + len + " bytes from stream with length " + length + " at offset " + offset); + } + read(b, bOffset, len); } @Override public int read() throws IOException { - return input.read(); + if (offset() >= length()) { + return -1; + } + return Byte.toUnsignedInt(readByte()); } @Override - public int available() throws IOException { - return input.available(); + public int read(final byte[] b, final int bOffset, final int len) throws IOException { + final int length = length(); + final int offset = offset(); + if (offset >= length) { + return -1; + } + final int numBytesToCopy = Math.min(len, length - offset); + int remaining = numBytesToCopy; // copy the full length or the remaining part + int destOffset = bOffset; + while (remaining > 0) { + maybeNextSlice(); + final int currentLen = Math.min(remaining, slice.length - sliceIndex); + assert currentLen > 0 : "length has to be > 0 to make progress but was: " + currentLen; + System.arraycopy(slice.bytes, slice.offset + sliceIndex, b, destOffset, currentLen); + destOffset += currentLen; + remaining -= currentLen; + sliceIndex += currentLen; + assert remaining >= 0 : "remaining: " + remaining; + } + return numBytesToCopy; } @Override - protected void ensureCanReadBytes(int length) throws EOFException { - input.ensureCanReadBytes(length); + public void close() { + // do nothing + } + + @Override + public int available() { + return length() - offset(); + } + + @Override + protected void ensureCanReadBytes(int bytesToRead) throws EOFException { + int bytesAvailable = length() - offset(); + if (bytesAvailable < bytesToRead) { + throw new EOFException("tried to read: " + bytesToRead + " bytes but only " + bytesAvailable + " remaining"); + } + } + + @Override + public long skip(long n) throws IOException { + final int skip = (int) Math.min(Integer.MAX_VALUE, n); + final int numBytesSkipped = Math.min(skip, length() - offset()); + int remaining = numBytesSkipped; + while (remaining > 0) { + maybeNextSlice(); + int currentLen = Math.min(remaining, slice.length - sliceIndex); + remaining -= currentLen; + sliceIndex += currentLen; + assert remaining >= 0 : "remaining: " + remaining; + } + return numBytesSkipped; } @Override public void reset() throws IOException { - input = new BytesReferenceStreamInput(reference.iterator(), reference.length()); - input.skip(mark); + iterator = iterator(); + slice = iterator.next(); + sliceStartOffset = 0; + sliceIndex = 0; + skip(mark); } @Override @@ -247,18 +324,7 @@ public abstract class AbstractBytesReference implements BytesReference { public void mark(int readLimit) { // readLimit is optional it only guarantees that the stream remembers data upto this limit but it can remember more // which we do in our case - this.mark = input.getOffset(); + this.mark = offset(); } - - @Override - public long skip(long n) throws IOException { - return input.skip(n); - } - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - BytesRef bytes = toBytesRef(); - return builder.value(bytes.bytes, bytes.offset, bytes.length); } } diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java deleted file mode 100644 index f7f1cdc6501..00000000000 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.common.bytes; - -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; -import org.elasticsearch.common.io.stream.StreamInput; - -import java.io.EOFException; -import java.io.IOException; - -/** - * A StreamInput that reads off a {@link BytesRefIterator}. This is used to provide - * generic stream access to {@link BytesReference} instances without materializing the - * underlying bytes reference. - */ -final class BytesReferenceStreamInput extends StreamInput { - private final BytesRefIterator iterator; - private int sliceIndex; - private BytesRef slice; - private final int length; // the total size of the stream - private int offset; // the current position of the stream - - BytesReferenceStreamInput(BytesRefIterator iterator, final int length) throws IOException { - this.iterator = iterator; - this.slice = iterator.next(); - this.length = length; - this.offset = 0; - this.sliceIndex = 0; - } - - @Override - public byte readByte() throws IOException { - if (offset >= length) { - throw new EOFException(); - } - maybeNextSlice(); - byte b = slice.bytes[slice.offset + (sliceIndex++)]; - offset++; - return b; - } - - private void maybeNextSlice() throws IOException { - while (sliceIndex == slice.length) { - slice = iterator.next(); - sliceIndex = 0; - if (slice == null) { - throw new EOFException(); - } - } - } - - @Override - public void readBytes(byte[] b, int bOffset, int len) throws IOException { - if (offset + len > length) { - throw new IndexOutOfBoundsException("Cannot read " + len + " bytes from stream with length " + length + " at offset " + offset); - } - read(b, bOffset, len); - } - - @Override - public int read() throws IOException { - if (offset >= length) { - return -1; - } - return Byte.toUnsignedInt(readByte()); - } - - @Override - public int read(final byte[] b, final int bOffset, final int len) throws IOException { - if (offset >= length) { - return -1; - } - final int numBytesToCopy = Math.min(len, length - offset); - int remaining = numBytesToCopy; // copy the full length or the remaining part - int destOffset = bOffset; - while (remaining > 0) { - maybeNextSlice(); - final int currentLen = Math.min(remaining, slice.length - sliceIndex); - assert currentLen > 0 : "length has to be > 0 to make progress but was: " + currentLen; - System.arraycopy(slice.bytes, slice.offset + sliceIndex, b, destOffset, currentLen); - destOffset += currentLen; - remaining -= currentLen; - sliceIndex += currentLen; - offset += currentLen; - assert remaining >= 0 : "remaining: " + remaining; - } - return numBytesToCopy; - } - - @Override - public void close() throws IOException { - // do nothing - } - - @Override - public int available() throws IOException { - return length - offset; - } - - @Override - protected void ensureCanReadBytes(int bytesToRead) throws EOFException { - int bytesAvailable = length - offset; - if (bytesAvailable < bytesToRead) { - throw new EOFException("tried to read: " + bytesToRead + " bytes but only " + bytesAvailable + " remaining"); - } - } - - @Override - public long skip(long n) throws IOException { - final int skip = (int) Math.min(Integer.MAX_VALUE, n); - final int numBytesSkipped = Math.min(skip, length - offset); - int remaining = numBytesSkipped; - while (remaining > 0) { - maybeNextSlice(); - int currentLen = Math.min(remaining, slice.length - sliceIndex); - remaining -= currentLen; - sliceIndex += currentLen; - offset += currentLen; - assert remaining >= 0 : "remaining: " + remaining; - } - return numBytesSkipped; - } - - int getOffset() { - return offset; - } -}