Add readBytes method to RandomAccessInput (#12600)

Adds a new method to RandomAccessInput tio bulk read bytes into a provided byte array.
This commit is contained in:
Ignacio Vera 2023-10-04 16:23:42 +02:00 committed by GitHub
parent 96052891e6
commit c4694c31fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 314 additions and 13 deletions

View File

@ -141,6 +141,8 @@ API Changes
* GITHUB#12592: Add RandomAccessInput#length method to the RandomAccessInput interface. In addition deprecate
ByteBuffersDataInput#size in favour of this new method. (Ignacio Vera)
* GITHUB#12599: Add RandomAccessInput#readBytes method to the RandomAccessInput interface. (Ignacio Vera)
New Features
---------------------
(No changes)

View File

@ -248,6 +248,27 @@ public abstract class BufferedIndexInput extends IndexInput implements RandomAcc
return buffer.get((int) index);
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int len) throws IOException {
if (len <= bufferSize) {
// the buffer is big enough to satisfy this request
if (len > 0) { // to allow b to be null if len is 0...
long index = resolvePositionInBuffer(pos, len);
buffer.get((int) index, bytes, offset, len);
}
} else {
while (len > bufferSize) {
long index = resolvePositionInBuffer(pos, bufferSize);
buffer.get((int) index, bytes, offset, bufferSize);
len -= bufferSize;
offset += bufferSize;
pos += bufferSize;
}
long index = resolvePositionInBuffer(pos, len);
buffer.get((int) index, bytes, offset, len);
}
}
@Override
public final short readShort(long pos) throws IOException {
long index = resolvePositionInBuffer(pos, Short.BYTES);

View File

@ -85,6 +85,11 @@ final class ByteBufferGuard {
}
}
public void getBytes(ByteBuffer receiver, int pos, byte[] dst, int offset, int length) {
ensureValid();
receiver.get(pos, dst, offset, length);
}
public void getBytes(ByteBuffer receiver, byte[] dst, int offset, int length) {
ensureValid();
receiver.get(dst, offset, length);

View File

@ -346,6 +346,29 @@ public abstract class ByteBufferIndexInput extends IndexInput implements RandomA
}
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int len) throws IOException {
int bi = (int) (pos >> chunkSizePower);
int bufferPos = (int) (pos & chunkSizeMask);
try {
int curAvail = Math.min(buffers[bi].capacity() - bufferPos, len);
while (len > curAvail) {
guard.getBytes(buffers[bi], bufferPos, bytes, offset, curAvail);
len -= curAvail;
offset += curAvail;
bi++;
if (bi >= buffers.length) {
throw new EOFException("read past EOF: " + this);
}
bufferPos = 0;
curAvail = Math.min(len, buffers[bi].capacity());
}
guard.getBytes(buffers[bi], bufferPos, bytes, offset, curAvail);
} catch (NullPointerException e) {
throw alreadyClosed(e);
}
}
@Override
public short readShort(long pos) throws IOException {
final int bi = (int) (pos >> chunkSizePower);
@ -569,6 +592,17 @@ public abstract class ByteBufferIndexInput extends IndexInput implements RandomA
}
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int len) throws IOException {
try {
guard.getBytes(curBuf, (int) pos, bytes, offset, len);
} catch (IllegalArgumentException e) {
throw handlePositionalIOOBE(e, "read", pos);
} catch (NullPointerException e) {
throw alreadyClosed(e);
}
}
@Override
public short readShort(long pos) throws IOException {
try {
@ -645,6 +679,11 @@ public abstract class ByteBufferIndexInput extends IndexInput implements RandomA
return super.readByte(pos + offset);
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int len) throws IOException {
super.readBytes(pos + this.offset, bytes, offset, len);
}
@Override
public short readShort(long pos) throws IOException {
return super.readShort(pos + offset);

View File

@ -223,6 +223,34 @@ public final class ByteBuffersDataInput extends DataInput
return blocks[blockIndex(pos)].get(blockOffset(pos));
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int len) throws IOException {
long absPos = this.offset + pos;
try {
while (len > 0) {
ByteBuffer block = blocks[blockIndex(absPos)];
int blockPosition = blockOffset(absPos);
int chunk = Math.min(len, block.capacity() - blockPosition);
if (chunk == 0) {
throw new EOFException();
}
// Update pos early on for EOF detection, then try to get buffer content.
block.get(blockPosition, bytes, offset, chunk);
absPos += chunk;
len -= chunk;
offset += chunk;
}
} catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) {
if (absPos >= length()) {
throw new EOFException();
} else {
throw e; // Something is wrong.
}
}
}
@Override
public short readShort(long pos) {
long absPos = offset + pos;

View File

@ -169,6 +169,12 @@ public final class ByteBuffersIndexInput extends IndexInput implements RandomAcc
return in.readByte(pos);
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
ensureOpen();
in.readBytes(pos, bytes, offset, length);
}
@Override
public short readShort(long pos) throws IOException {
ensureOpen();

View File

@ -160,6 +160,12 @@ public abstract class IndexInput extends DataInput implements Closeable {
return slice.readByte();
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
slice.seek(pos);
slice.readBytes(bytes, offset, length);
}
@Override
public short readShort(long pos) throws IOException {
slice.seek(pos);

View File

@ -34,6 +34,19 @@ public interface RandomAccessInput {
* @see DataInput#readByte
*/
public byte readByte(long pos) throws IOException;
/**
* Reads a specified number of bytes starting at a given position into an array at the specified
* offset.
*
* @see DataInput#readBytes
*/
default void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
for (int i = 0; i < length; i++) {
bytes[offset + i] = readByte(pos + i);
}
}
/**
* Reads a short (LE byte order) at the given position in the file
*

View File

@ -292,6 +292,31 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
}
}
@Override
public void readBytes(long pos, byte[] b, int offset, int len) throws IOException {
try {
int si = (int) (pos >> chunkSizePower);
pos = pos & chunkSizeMask;
long curAvail = segments[si].byteSize() - pos;
while (len > curAvail) {
MemorySegment.copy(segments[si], LAYOUT_BYTE, pos, b, offset, (int) curAvail);
len -= curAvail;
offset += curAvail;
si++;
if (si >= segments.length) {
throw new EOFException("read past EOF: " + this);
}
pos = 0L;
curAvail = segments[si].byteSize();
}
MemorySegment.copy(segments[si], LAYOUT_BYTE, pos, b, offset, len);
} catch (IndexOutOfBoundsException ioobe) {
throw handlePositionalIOOBE(ioobe, "read", pos);
} catch (NullPointerException | IllegalStateException e) {
throw alreadyClosed(e);
}
}
// used only by random access methods to handle reads across boundaries
private void setPos(long pos, int si) throws IOException {
try {
@ -490,6 +515,17 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
}
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
try {
MemorySegment.copy(curSegment, LAYOUT_BYTE, pos, bytes, offset, length);
} catch (IndexOutOfBoundsException e) {
throw handlePositionalIOOBE(e, "read", pos);
} catch (NullPointerException | IllegalStateException e) {
throw alreadyClosed(e);
}
}
@Override
public short readShort(long pos) throws IOException {
try {
@ -567,6 +603,11 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
return super.readByte(pos + offset);
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
super.readBytes(pos + this.offset, bytes, offset, length);
}
@Override
public short readShort(long pos) throws IOException {
return super.readShort(pos + offset);

View File

@ -290,6 +290,31 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
}
}
@Override
public void readBytes(long pos, byte[] b, int offset, int len) throws IOException {
try {
int si = (int) (pos >> chunkSizePower);
pos = pos & chunkSizeMask;
long curAvail = segments[si].byteSize() - pos;
while (len > curAvail) {
MemorySegment.copy(segments[si], LAYOUT_BYTE, pos, b, offset, (int) curAvail);
len -= curAvail;
offset += curAvail;
si++;
if (si >= segments.length) {
throw new EOFException("read past EOF: " + this);
}
pos = 0L;
curAvail = segments[si].byteSize();
}
MemorySegment.copy(segments[si], LAYOUT_BYTE, pos, b, offset, len);
} catch (IndexOutOfBoundsException ioobe) {
throw handlePositionalIOOBE(ioobe, "read", pos);
} catch (NullPointerException | IllegalStateException e) {
throw alreadyClosed(e);
}
}
// used only by random access methods to handle reads across boundaries
private void setPos(long pos, int si) throws IOException {
try {
@ -488,6 +513,17 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
}
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
try {
MemorySegment.copy(curSegment, LAYOUT_BYTE, pos, bytes, offset, length);
} catch (IndexOutOfBoundsException e) {
throw handlePositionalIOOBE(e, "read", pos);
} catch (NullPointerException | IllegalStateException e) {
throw alreadyClosed(e);
}
}
@Override
public short readShort(long pos) throws IOException {
try {
@ -565,6 +601,11 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
return super.readByte(pos + offset);
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
super.readBytes(pos + this.offset, bytes, offset, length);
}
@Override
public short readShort(long pos) throws IOException {
return super.readShort(pos + offset);

View File

@ -290,6 +290,31 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
}
}
@Override
public void readBytes(long pos, byte[] b, int offset, int len) throws IOException {
try {
int si = (int) (pos >> chunkSizePower);
pos = pos & chunkSizeMask;
long curAvail = segments[si].byteSize() - pos;
while (len > curAvail) {
MemorySegment.copy(segments[si], LAYOUT_BYTE, pos, b, offset, (int) curAvail);
len -= curAvail;
offset += curAvail;
si++;
if (si >= segments.length) {
throw new EOFException("read past EOF: " + this);
}
pos = 0L;
curAvail = segments[si].byteSize();
}
MemorySegment.copy(segments[si], LAYOUT_BYTE, pos, b, offset, len);
} catch (IndexOutOfBoundsException ioobe) {
throw handlePositionalIOOBE(ioobe, "read", pos);
} catch (NullPointerException | IllegalStateException e) {
throw alreadyClosed(e);
}
}
// used only by random access methods to handle reads across boundaries
private void setPos(long pos, int si) throws IOException {
try {
@ -488,6 +513,17 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
}
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
try {
MemorySegment.copy(curSegment, LAYOUT_BYTE, pos, bytes, offset, length);
} catch (IndexOutOfBoundsException e) {
throw handlePositionalIOOBE(e, "read", pos);
} catch (NullPointerException | IllegalStateException e) {
throw alreadyClosed(e);
}
}
@Override
public short readShort(long pos) throws IOException {
try {
@ -565,6 +601,11 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
return super.readByte(pos + offset);
}
@Override
public void readBytes(long pos, byte[] bytes, int offset, int length) throws IOException {
super.readBytes(pos + this.offset, bytes, offset, length);
}
@Override
public short readShort(long pos) throws IOException {
return super.readShort(pos + offset);

View File

@ -28,6 +28,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RandomAccessInput;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.util.TestUtil;
@ -297,6 +298,47 @@ public abstract class BaseChunkedDirectoryTestCase extends BaseDirectoryTestCase
dir.close();
}
public void testBytesCrossBoundary() throws Exception {
int num =
TEST_NIGHTLY ? TestUtil.nextInt(random(), 100, 1000) : TestUtil.nextInt(random(), 50, 100);
byte[] bytes = new byte[num];
random().nextBytes(bytes);
try (Directory dir = getDirectory(createTempDir("testBytesCrossBoundary"), 16)) {
try (IndexOutput out = dir.createOutput("bytesCrossBoundary", newIOContext(random()))) {
out.writeBytes(bytes, bytes.length);
}
try (IndexInput input = dir.openInput("bytesCrossBoundary", newIOContext(random()))) {
RandomAccessInput slice = input.randomAccessSlice(0, input.length());
assertEquals(input.length(), slice.length());
assertBytes(slice, bytes, 0);
// subslices
for (int offset = 1; offset < bytes.length; offset++) {
RandomAccessInput subslice = input.randomAccessSlice(offset, input.length() - offset);
assertEquals(input.length() - offset, subslice.length());
assertBytes(subslice, bytes, offset);
}
// with padding
for (int i = 1; i < 7; i++) {
String name = "bytes-" + i;
IndexOutput o = dir.createOutput(name, newIOContext(random()));
byte[] junk = new byte[i];
random().nextBytes(junk);
o.writeBytes(junk, junk.length);
input.seek(0);
o.copyBytes(input, input.length());
o.close();
IndexInput padded = dir.openInput(name, newIOContext(random()));
RandomAccessInput whole = padded.randomAccessSlice(i, padded.length() - i);
assertEquals(padded.length() - i, whole.length());
assertBytes(whole, bytes, 0);
padded.close();
}
}
}
}
public void testLittleEndianLongsCrossBoundary() throws Exception {
try (Directory dir = getDirectory(createTempDir("testLittleEndianLongsCrossBoundary"), 16)) {
try (IndexOutput out = dir.createOutput("littleEndianLongs", newIOContext(random()))) {

View File

@ -56,6 +56,7 @@ import org.apache.lucene.store.RandomAccessInput;
import org.apache.lucene.tests.mockfile.ExtrasFS;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils;
import org.junit.Assert;
@ -1098,7 +1099,10 @@ public abstract class BaseDirectoryTestCase extends LuceneTestCase {
public void testRandomByte() throws Exception {
try (Directory dir = getDirectory(createTempDir("testBytes"))) {
IndexOutput output = dir.createOutput("bytes", newIOContext(random()));
int num = TestUtil.nextInt(random(), 50, 3000);
int num =
TEST_NIGHTLY
? TestUtil.nextInt(random(), 1000, 3000)
: TestUtil.nextInt(random(), 50, 1000);
byte[] bytes = new byte[num];
random().nextBytes(bytes);
for (int i = 0; i < bytes.length; i++) {
@ -1110,22 +1114,17 @@ public abstract class BaseDirectoryTestCase extends LuceneTestCase {
IndexInput input = dir.openInput("bytes", newIOContext(random()));
RandomAccessInput slice = input.randomAccessSlice(0, input.length());
assertEquals(input.length(), slice.length());
for (int i = 0; i < bytes.length; i++) {
assertEquals(bytes[i], slice.readByte(i));
}
assertBytes(slice, bytes, 0);
// subslices
for (int i = 1; i < bytes.length; i++) {
long offset = i;
for (int offset = 1; offset < bytes.length; offset++) {
RandomAccessInput subslice = input.randomAccessSlice(offset, input.length() - offset);
assertEquals(input.length() - offset, subslice.length());
for (int j = i; j < bytes.length; j++) {
assertEquals(bytes[j], subslice.readByte(j - i));
}
assertBytes(subslice, bytes, offset);
}
// with padding
for (int i = 0; i < 7; i++) {
for (int i = 1; i < 7; i++) {
String name = "bytes-" + i;
IndexOutput o = dir.createOutput(name, newIOContext(random()));
byte[] junk = new byte[i];
@ -1137,15 +1136,32 @@ public abstract class BaseDirectoryTestCase extends LuceneTestCase {
IndexInput padded = dir.openInput(name, newIOContext(random()));
RandomAccessInput whole = padded.randomAccessSlice(i, padded.length() - i);
assertEquals(padded.length() - i, whole.length());
for (int j = 0; j < bytes.length; j++) {
assertEquals(bytes[j], whole.readByte(j));
}
assertBytes(whole, bytes, 0);
padded.close();
}
input.close();
}
}
protected void assertBytes(RandomAccessInput slice, byte[] bytes, int bytesOffset)
throws IOException {
int toRead = bytes.length - bytesOffset;
for (int i = 0; i < toRead; i++) {
assertEquals(bytes[bytesOffset + i], slice.readByte(i));
int offset = random().nextInt(1000);
byte[] sub1 = new byte[offset + i];
slice.readBytes(0, sub1, offset, i);
assertArrayEquals(
ArrayUtil.copyOfSubArray(bytes, bytesOffset, bytesOffset + i),
ArrayUtil.copyOfSubArray(sub1, offset, sub1.length));
byte[] sub2 = new byte[offset + toRead - i];
slice.readBytes(i, sub2, offset, toRead - i);
assertArrayEquals(
ArrayUtil.copyOfSubArray(bytes, bytesOffset + i, bytes.length),
ArrayUtil.copyOfSubArray(sub2, offset, sub2.length));
}
}
/** try to stress slices of slices */
public void testSliceOfSlice() throws Exception {
try (Directory dir = getDirectory(createTempDir("sliceOfSlice"))) {