Simplify BytesReference StreamInput (#61681) (#62014)

Flattening both streams into a single stream here saves a few objects and some indirection.
Also, removed the redundant `offset` field which added nothing but complexity by forcing the
incrementation of two counters on every read.
This commit is contained in:
Armin Braun 2020-09-05 10:45:52 +02:00 committed by GitHub
parent ab8f65a099
commit 1e3edbbe74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 188 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.function.ToIntBiFunction; import java.util.function.ToIntBiFunction;
@ -51,7 +50,7 @@ public abstract class AbstractBytesReference implements BytesReference {
@Override @Override
public StreamInput streamInput() throws IOException { public StreamInput streamInput() throws IOException {
return new MarkSupportingStreamInputWrapper(this); return new BytesReferenceStreamInput();
} }
@Override @Override
@ -181,61 +180,139 @@ public abstract class AbstractBytesReference implements BytesReference {
ref.offset += length; ref.offset += length;
} }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
BytesRef bytes = toBytesRef();
return builder.value(bytes.bytes, bytes.offset, bytes.length);
}
/** /**
* Instead of adding the complexity of {@link InputStream#reset()} etc to the actual impl * A StreamInput that reads off a {@link BytesRefIterator}. This is used to provide
* this wrapper builds it on top of the BytesReferenceStreamInput which is much simpler * generic stream access to {@link BytesReference} instances without materializing the
* that way. * underlying bytes reference.
*/ */
private static final class MarkSupportingStreamInputWrapper extends StreamInput { private final class BytesReferenceStreamInput extends StreamInput {
// can't use FilterStreamInput it needs to reset the delegate
private final BytesReference reference; private BytesRefIterator iterator;
private BytesReferenceStreamInput input; private int sliceIndex;
private BytesRef slice;
private int sliceStartOffset; // the offset on the stream at which the current slice starts
private int mark = 0; private int mark = 0;
private MarkSupportingStreamInputWrapper(BytesReference reference) throws IOException { BytesReferenceStreamInput() throws IOException {
this.reference = reference; this.iterator = iterator();
this.input = new BytesReferenceStreamInput(reference.iterator(), reference.length()); this.slice = iterator.next();
this.sliceStartOffset = 0;
this.sliceIndex = 0;
} }
@Override @Override
public byte readByte() throws IOException { public byte readByte() throws IOException {
return input.readByte(); if (offset() >= length()) {
throw new EOFException();
}
maybeNextSlice();
return slice.bytes[slice.offset + (sliceIndex++)];
}
private int offset() {
return sliceStartOffset + sliceIndex;
}
private void maybeNextSlice() throws IOException {
while (sliceIndex == slice.length) {
sliceStartOffset += sliceIndex;
slice = iterator.next();
sliceIndex = 0;
if (slice == null) {
throw new EOFException();
}
}
} }
@Override @Override
public void readBytes(byte[] b, int offset, int len) throws IOException { public void readBytes(byte[] b, int bOffset, int len) throws IOException {
input.readBytes(b, offset, len); final int length = length();
final int offset = offset();
if (offset + len > length) {
throw new IndexOutOfBoundsException(
"Cannot read " + len + " bytes from stream with length " + length + " at offset " + offset);
} }
read(b, bOffset, len);
@Override
public int read(byte[] b, int off, int len) throws IOException {
return input.read(b, off, len);
}
@Override
public void close() throws IOException {
input.close();
} }
@Override @Override
public int read() throws IOException { public int read() throws IOException {
return input.read(); if (offset() >= length()) {
return -1;
}
return Byte.toUnsignedInt(readByte());
} }
@Override @Override
public int available() throws IOException { public int read(final byte[] b, final int bOffset, final int len) throws IOException {
return input.available(); final int length = length();
final int offset = offset();
if (offset >= length) {
return -1;
}
final int numBytesToCopy = Math.min(len, length - offset);
int remaining = numBytesToCopy; // copy the full length or the remaining part
int destOffset = bOffset;
while (remaining > 0) {
maybeNextSlice();
final int currentLen = Math.min(remaining, slice.length - sliceIndex);
assert currentLen > 0 : "length has to be > 0 to make progress but was: " + currentLen;
System.arraycopy(slice.bytes, slice.offset + sliceIndex, b, destOffset, currentLen);
destOffset += currentLen;
remaining -= currentLen;
sliceIndex += currentLen;
assert remaining >= 0 : "remaining: " + remaining;
}
return numBytesToCopy;
} }
@Override @Override
protected void ensureCanReadBytes(int length) throws EOFException { public void close() {
input.ensureCanReadBytes(length); // do nothing
}
@Override
public int available() {
return length() - offset();
}
@Override
protected void ensureCanReadBytes(int bytesToRead) throws EOFException {
int bytesAvailable = length() - offset();
if (bytesAvailable < bytesToRead) {
throw new EOFException("tried to read: " + bytesToRead + " bytes but only " + bytesAvailable + " remaining");
}
}
@Override
public long skip(long n) throws IOException {
final int skip = (int) Math.min(Integer.MAX_VALUE, n);
final int numBytesSkipped = Math.min(skip, length() - offset());
int remaining = numBytesSkipped;
while (remaining > 0) {
maybeNextSlice();
int currentLen = Math.min(remaining, slice.length - sliceIndex);
remaining -= currentLen;
sliceIndex += currentLen;
assert remaining >= 0 : "remaining: " + remaining;
}
return numBytesSkipped;
} }
@Override @Override
public void reset() throws IOException { public void reset() throws IOException {
input = new BytesReferenceStreamInput(reference.iterator(), reference.length()); iterator = iterator();
input.skip(mark); slice = iterator.next();
sliceStartOffset = 0;
sliceIndex = 0;
skip(mark);
} }
@Override @Override
@ -247,18 +324,7 @@ public abstract class AbstractBytesReference implements BytesReference {
public void mark(int readLimit) { public void mark(int readLimit) {
// readLimit is optional it only guarantees that the stream remembers data upto this limit but it can remember more // readLimit is optional it only guarantees that the stream remembers data upto this limit but it can remember more
// which we do in our case // which we do in our case
this.mark = input.getOffset(); this.mark = offset();
}
@Override
public long skip(long n) throws IOException {
return input.skip(n);
} }
} }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
BytesRef bytes = toBytesRef();
return builder.value(bytes.bytes, bytes.offset, bytes.length);
}
} }

View File

@ -1,144 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.bytes;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.EOFException;
import java.io.IOException;
/**
* A StreamInput that reads off a {@link BytesRefIterator}. This is used to provide
* generic stream access to {@link BytesReference} instances without materializing the
* underlying bytes reference.
*/
final class BytesReferenceStreamInput extends StreamInput {
private final BytesRefIterator iterator;
private int sliceIndex;
private BytesRef slice;
private final int length; // the total size of the stream
private int offset; // the current position of the stream
BytesReferenceStreamInput(BytesRefIterator iterator, final int length) throws IOException {
this.iterator = iterator;
this.slice = iterator.next();
this.length = length;
this.offset = 0;
this.sliceIndex = 0;
}
@Override
public byte readByte() throws IOException {
if (offset >= length) {
throw new EOFException();
}
maybeNextSlice();
byte b = slice.bytes[slice.offset + (sliceIndex++)];
offset++;
return b;
}
private void maybeNextSlice() throws IOException {
while (sliceIndex == slice.length) {
slice = iterator.next();
sliceIndex = 0;
if (slice == null) {
throw new EOFException();
}
}
}
@Override
public void readBytes(byte[] b, int bOffset, int len) throws IOException {
if (offset + len > length) {
throw new IndexOutOfBoundsException("Cannot read " + len + " bytes from stream with length " + length + " at offset " + offset);
}
read(b, bOffset, len);
}
@Override
public int read() throws IOException {
if (offset >= length) {
return -1;
}
return Byte.toUnsignedInt(readByte());
}
@Override
public int read(final byte[] b, final int bOffset, final int len) throws IOException {
if (offset >= length) {
return -1;
}
final int numBytesToCopy = Math.min(len, length - offset);
int remaining = numBytesToCopy; // copy the full length or the remaining part
int destOffset = bOffset;
while (remaining > 0) {
maybeNextSlice();
final int currentLen = Math.min(remaining, slice.length - sliceIndex);
assert currentLen > 0 : "length has to be > 0 to make progress but was: " + currentLen;
System.arraycopy(slice.bytes, slice.offset + sliceIndex, b, destOffset, currentLen);
destOffset += currentLen;
remaining -= currentLen;
sliceIndex += currentLen;
offset += currentLen;
assert remaining >= 0 : "remaining: " + remaining;
}
return numBytesToCopy;
}
@Override
public void close() throws IOException {
// do nothing
}
@Override
public int available() throws IOException {
return length - offset;
}
@Override
protected void ensureCanReadBytes(int bytesToRead) throws EOFException {
int bytesAvailable = length - offset;
if (bytesAvailable < bytesToRead) {
throw new EOFException("tried to read: " + bytesToRead + " bytes but only " + bytesAvailable + " remaining");
}
}
@Override
public long skip(long n) throws IOException {
final int skip = (int) Math.min(Integer.MAX_VALUE, n);
final int numBytesSkipped = Math.min(skip, length - offset);
int remaining = numBytesSkipped;
while (remaining > 0) {
maybeNextSlice();
int currentLen = Math.min(remaining, slice.length - sliceIndex);
remaining -= currentLen;
sliceIndex += currentLen;
offset += currentLen;
assert remaining >= 0 : "remaining: " + remaining;
}
return numBytesSkipped;
}
int getOffset() {
return offset;
}
}