mirror of https://github.com/apache/lucene.git
LUCENE-9794: Optimize skipBytes implementation in remaining DataInput subclasses
Fix various DataInputs to no longer use skipBytesSlowly, add new tests.
This commit is contained in:
parent
eba0e25535
commit
84a35dfaea
|
@ -194,6 +194,8 @@ Improvements
|
|||
* LUCENE-9663: Adding compression to terms dict from SortedSet/Sorted DocValues.
|
||||
(Jaison Bi via Bruno Roustant)
|
||||
|
||||
* LUCENE-9794: Speed up implementations of DataInput.skipBytes(). (Greg Miller)
|
||||
|
||||
Bug fixes
|
||||
|
||||
|
||||
|
|
|
@ -654,7 +654,15 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
|
|||
|
||||
@Override
|
||||
public void skipBytes(long numBytes) throws IOException {
|
||||
skipBytesSlowly(numBytes);
|
||||
if (numBytes < 0) {
|
||||
throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
|
||||
}
|
||||
while (numBytes > bytes.length) {
|
||||
numBytes -= bytes.length;
|
||||
fillBuffer();
|
||||
}
|
||||
bytes.offset += numBytes;
|
||||
bytes.length -= numBytes;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
|
|
|
@ -140,7 +140,19 @@ final class ByteSliceReader extends DataInput {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void skipBytes(long numBytes) throws IOException {
|
||||
skipBytesSlowly(numBytes);
|
||||
public void skipBytes(long numBytes) {
|
||||
if (numBytes < 0) {
|
||||
throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
|
||||
}
|
||||
while (numBytes > 0) {
|
||||
final int numLeft = limit - upto;
|
||||
if (numLeft < numBytes) {
|
||||
numBytes -= numLeft;
|
||||
nextSlice();
|
||||
} else {
|
||||
upto += numBytes;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -261,7 +261,11 @@ public final class ByteBuffersDataInput extends DataInput
|
|||
|
||||
@Override
|
||||
public void skipBytes(long numBytes) throws IOException {
|
||||
skipBytesSlowly(numBytes);
|
||||
if (numBytes < 0) {
|
||||
throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
|
||||
}
|
||||
long skipTo = position() + numBytes;
|
||||
seek(skipTo);
|
||||
}
|
||||
|
||||
public ByteBuffersDataInput slice(long offset, long length) {
|
||||
|
|
|
@ -24,6 +24,17 @@ import java.io.IOException;
|
|||
*/
|
||||
public abstract class ChecksumIndexInput extends IndexInput {
|
||||
|
||||
private static final int SKIP_BUFFER_SIZE = 1024;
|
||||
|
||||
/* This buffer is used when skipping bytes in skipBytes(). Skipping bytes
|
||||
* still requires reading in the bytes we skip in order to update the checksum.
|
||||
* The reason we need to use an instance member instead of sharing a single
|
||||
* static instance across threads is that multiple instances invoking skipBytes()
|
||||
* concurrently on different threads can clobber the contents of a shared buffer,
|
||||
* corrupting the checksum. See LUCENE-5583 for additional context.
|
||||
*/
|
||||
private byte[] skipBuffer;
|
||||
|
||||
/**
|
||||
* resourceDescription should be a non-null, opaque string describing this resource; it's returned
|
||||
* from {@link #toString}.
|
||||
|
@ -50,10 +61,23 @@ public abstract class ChecksumIndexInput extends IndexInput {
|
|||
throw new IllegalStateException(
|
||||
getClass() + " cannot seek backwards (pos=" + pos + " getFilePointer()=" + curFP + ")");
|
||||
}
|
||||
// we must skip slowly to ensure skipped bytes are still read and used
|
||||
// to update checksums
|
||||
// TODO: this "slow skip" logic should be moved into this class once
|
||||
// no longer needed as default logic in DataInput
|
||||
skipBytesSlowly(skip);
|
||||
skipByReading(skip);
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip over <code>numBytes</code> bytes. The contract on this method is that it should have the
|
||||
* same behavior as reading the same number of bytes into a buffer and discarding its content.
|
||||
* Negative values of <code>numBytes</code> are not supported.
|
||||
*/
|
||||
private void skipByReading(long numBytes) throws IOException {
|
||||
if (skipBuffer == null) {
|
||||
skipBuffer = new byte[SKIP_BUFFER_SIZE];
|
||||
}
|
||||
assert skipBuffer.length == SKIP_BUFFER_SIZE;
|
||||
for (long skipped = 0; skipped < numBytes; ) {
|
||||
final int step = (int) Math.min(SKIP_BUFFER_SIZE, numBytes - skipped);
|
||||
readBytes(skipBuffer, 0, step, false);
|
||||
skipped += step;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,8 +41,8 @@ public abstract class DataInput implements Cloneable {
|
|||
|
||||
private static final int SKIP_BUFFER_SIZE = 1024;
|
||||
|
||||
/* This buffer is used to skip over bytes with the default implementation of
|
||||
* skipBytes. The reason why we need to use an instance member instead of
|
||||
/* This buffer is used to skip over bytes with the slow implementation of
|
||||
* skipBytesSlowly. The reason why we need to use an instance member instead of
|
||||
* sharing a single instance across threads is that some delegating
|
||||
* implementations of DataInput might want to reuse the provided buffer in
|
||||
* order to eg. update the checksum. If we shared the same buffer across
|
||||
|
|
|
@ -53,6 +53,13 @@ public class InputStreamDataInput extends DataInput implements Closeable {
|
|||
|
||||
@Override
|
||||
public void skipBytes(long numBytes) throws IOException {
|
||||
skipBytesSlowly(numBytes);
|
||||
if (numBytes < 0) {
|
||||
throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
|
||||
}
|
||||
long skipped = is.skip(numBytes);
|
||||
assert skipped <= numBytes;
|
||||
if (skipped < numBytes) {
|
||||
throw new EOFException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -345,8 +345,12 @@ public final class PagedBytes implements Accountable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void skipBytes(long numBytes) throws IOException {
|
||||
skipBytesSlowly(numBytes);
|
||||
public void skipBytes(long numBytes) {
|
||||
if (numBytes < 0) {
|
||||
throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
|
||||
}
|
||||
final long skipTo = getPosition() + numBytes;
|
||||
setPosition(skipTo);
|
||||
}
|
||||
|
||||
private void nextBlock() {
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.index;
|
||||
|
||||
import java.util.Random;
|
||||
import org.apache.lucene.util.ByteBlockPool;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
public class TestByteSliceReader extends LuceneTestCase {
|
||||
private static byte[] RANDOM_DATA;
|
||||
private static ByteBlockPool BLOCK_POOL;
|
||||
private static int BLOCK_POOL_END;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
int len = atLeast(100);
|
||||
RANDOM_DATA = new byte[len];
|
||||
random().nextBytes(RANDOM_DATA);
|
||||
|
||||
BLOCK_POOL = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
|
||||
BLOCK_POOL.nextBuffer();
|
||||
byte[] buffer = BLOCK_POOL.buffer;
|
||||
int upto = BLOCK_POOL.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
|
||||
for (byte randomByte : RANDOM_DATA) {
|
||||
if ((buffer[upto] & 16) != 0) {
|
||||
upto = BLOCK_POOL.allocSlice(buffer, upto);
|
||||
buffer = BLOCK_POOL.buffer;
|
||||
}
|
||||
buffer[upto++] = randomByte;
|
||||
}
|
||||
BLOCK_POOL_END = upto;
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
RANDOM_DATA = null;
|
||||
BLOCK_POOL = null;
|
||||
}
|
||||
|
||||
public void testReadByte() {
|
||||
ByteSliceReader sliceReader = new ByteSliceReader();
|
||||
sliceReader.init(BLOCK_POOL, 0, BLOCK_POOL_END);
|
||||
for (byte expected : RANDOM_DATA) {
|
||||
assertEquals(expected, sliceReader.readByte());
|
||||
}
|
||||
}
|
||||
|
||||
public void testSkipBytes() {
|
||||
Random random = random();
|
||||
ByteSliceReader sliceReader = new ByteSliceReader();
|
||||
|
||||
int maxSkipTo = RANDOM_DATA.length - 1;
|
||||
int iterations = atLeast(random, 10);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
sliceReader.init(BLOCK_POOL, 0, BLOCK_POOL_END);
|
||||
// skip random chunks of bytes until exhausted
|
||||
for (int curr = 0; curr < maxSkipTo; ) {
|
||||
int skipTo = TestUtil.nextInt(random, curr, maxSkipTo);
|
||||
int step = skipTo - curr;
|
||||
sliceReader.skipBytes(step);
|
||||
assertEquals(RANDOM_DATA[skipTo], sliceReader.readByte());
|
||||
curr = skipTo + 1; // +1 for read byte
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -139,7 +139,7 @@ public final class TestByteBuffersDataInput extends RandomizedTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSeek() throws Exception {
|
||||
public void testSeekAndSkip() throws Exception {
|
||||
for (int reps = randomIntBetween(1, 200); --reps > 0; ) {
|
||||
ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
|
||||
|
||||
|
@ -169,12 +169,26 @@ public final class TestByteBuffersDataInput extends RandomizedTest {
|
|||
byte[] array = dst.toArrayCopy();
|
||||
array = ArrayUtil.copyOfSubArray(array, prefix.length, array.length);
|
||||
|
||||
// test seeking
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
int offs = randomIntBetween(0, array.length - 1);
|
||||
in.seek(offs);
|
||||
assertEquals(offs, in.position());
|
||||
assertEquals(array[offs], in.readByte());
|
||||
}
|
||||
|
||||
// test skipping
|
||||
int maxSkipTo = array.length - 1;
|
||||
in.seek(0);
|
||||
// skip chunks of bytes until exhausted
|
||||
for (int curr = 0; curr < maxSkipTo; ) {
|
||||
int skipTo = randomIntBetween(curr, maxSkipTo);
|
||||
int step = skipTo - curr;
|
||||
in.skipBytes(step);
|
||||
assertEquals(array[skipTo], in.readByte());
|
||||
curr = skipTo + 1; // +1 for read byte
|
||||
}
|
||||
|
||||
in.seek(in.size());
|
||||
assertEquals(in.size(), in.position());
|
||||
LuceneTestCase.expectThrows(
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Random;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
public class TestInputStreamDataInput extends LuceneTestCase {
|
||||
private static byte[] RANDOM_DATA;
|
||||
private InputStreamDataInput in;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
RANDOM_DATA = new byte[atLeast(100)];
|
||||
random().nextBytes(RANDOM_DATA);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
RANDOM_DATA = null;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
in = new NoReadInputStreamDataInput(new ByteArrayInputStream(RANDOM_DATA));
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws IOException {
|
||||
in.close();
|
||||
in = null;
|
||||
}
|
||||
|
||||
public void testSkipBytes() throws IOException {
|
||||
Random random = random();
|
||||
// not using the wrapped (NoReadInputStreamDataInput) here since we want to actually read and
|
||||
// verify
|
||||
InputStreamDataInput in = new InputStreamDataInput(new ByteArrayInputStream(RANDOM_DATA));
|
||||
int maxSkipTo = RANDOM_DATA.length - 1;
|
||||
// skip chunks of bytes until exhausted
|
||||
for (int curr = 0; curr < maxSkipTo; ) {
|
||||
int skipTo = TestUtil.nextInt(random, curr, maxSkipTo);
|
||||
int step = skipTo - curr;
|
||||
in.skipBytes(step);
|
||||
assertEquals(RANDOM_DATA[skipTo], in.readByte());
|
||||
curr = skipTo + 1; // +1 for read byte
|
||||
}
|
||||
in.close();
|
||||
}
|
||||
|
||||
public void testNoReadWhenSkipping() throws IOException {
|
||||
Random random = random();
|
||||
int maxSkipTo = RANDOM_DATA.length - 1;
|
||||
// skip chunks of bytes until exhausted
|
||||
for (int curr = 0; curr < maxSkipTo; ) {
|
||||
int step = TestUtil.nextInt(random, 0, maxSkipTo - curr);
|
||||
in.skipBytes(step);
|
||||
curr += step;
|
||||
}
|
||||
}
|
||||
|
||||
public void testFullSkip() throws IOException {
|
||||
in.skipBytes(RANDOM_DATA.length);
|
||||
}
|
||||
|
||||
public void testSkipOffEnd() {
|
||||
expectThrows(EOFException.class, () -> in.skipBytes(RANDOM_DATA.length + 1));
|
||||
}
|
||||
|
||||
/** Throws if trying to read bytes to ensure skipBytes doesn't invoke read */
|
||||
private static final class NoReadInputStreamDataInput extends InputStreamDataInput {
|
||||
|
||||
public NoReadInputStreamDataInput(InputStream is) {
|
||||
super(is);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int offset, int len) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -155,6 +155,18 @@ public class TestPagedBytes extends LuceneTestCase {
|
|||
assertEquals(answer[pos + byteUpto], slice.bytes[slice.offset + byteUpto]);
|
||||
}
|
||||
}
|
||||
|
||||
// test skipping
|
||||
final DataInput in2 = p.getDataInput();
|
||||
int maxSkipTo = numBytes - 1;
|
||||
// skip chunks of bytes until exhausted
|
||||
for (int curr = 0; curr < maxSkipTo; ) {
|
||||
int skipTo = TestUtil.nextInt(random, curr, maxSkipTo);
|
||||
int step = skipTo - curr;
|
||||
in2.skipBytes(step);
|
||||
assertEquals(answer[skipTo], in2.readByte());
|
||||
curr = skipTo + 1; // +1 for read byte
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue