From 84a35dfaea27581174c1104e239187112a1b5d43 Mon Sep 17 00:00:00 2001 From: Robert Muir Date: Wed, 24 Feb 2021 02:46:24 -0500 Subject: [PATCH] LUCENE-9794: Optimize skipBytes implementation in remaining DataInput subclasses Fix various DataInputs to no longer use skipBytesSlowly, add new tests. --- lucene/CHANGES.txt | 2 + .../CompressingStoredFieldsReader.java | 10 +- .../apache/lucene/index/ByteSliceReader.java | 16 ++- .../lucene/store/ByteBuffersDataInput.java | 6 +- .../lucene/store/ChecksumIndexInput.java | 34 +++++- .../org/apache/lucene/store/DataInput.java | 4 +- .../lucene/store/InputStreamDataInput.java | 9 +- .../org/apache/lucene/util/PagedBytes.java | 8 +- .../lucene/index/TestByteSliceReader.java | 83 +++++++++++++ .../store/TestByteBuffersDataInput.java | 16 ++- .../store/TestInputStreamDataInput.java | 110 ++++++++++++++++++ .../apache/lucene/util/TestPagedBytes.java | 12 ++ 12 files changed, 295 insertions(+), 15 deletions(-) create mode 100644 lucene/core/src/test/org/apache/lucene/index/TestByteSliceReader.java create mode 100644 lucene/core/src/test/org/apache/lucene/store/TestInputStreamDataInput.java diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 9bfe01166f6..9b6bea99c38 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -194,6 +194,8 @@ Improvements * LUCENE-9663: Adding compression to terms dict from SortedSet/Sorted DocValues. (Jaison Bi via Bruno Roustant) +* LUCENE-9794: Speed up implementations of DataInput.skipBytes(). (Greg Miller) + Bug fixes diff --git a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java index f169e527542..9b7fac438e1 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java @@ -654,7 +654,15 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader { @Override public void skipBytes(long numBytes) throws IOException { - skipBytesSlowly(numBytes); + if (numBytes < 0) { + throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes); + } + while (numBytes > bytes.length) { + numBytes -= bytes.length; + fillBuffer(); + } + bytes.offset += numBytes; + bytes.length -= numBytes; } }; } else { diff --git a/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java b/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java index 951d2c66308..b0b65de4d0c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java +++ b/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java @@ -140,7 +140,19 @@ final class ByteSliceReader extends DataInput { } @Override - public void skipBytes(long numBytes) throws IOException { - skipBytesSlowly(numBytes); + public void skipBytes(long numBytes) { + if (numBytes < 0) { + throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes); + } + while (numBytes > 0) { + final int numLeft = limit - upto; + if (numLeft < numBytes) { + numBytes -= numLeft; + nextSlice(); + } else { + upto += numBytes; + break; + } + } } } diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java index 57cfd5b560f..b6572f6ee1b 100644 --- a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java @@ -261,7 +261,11 @@ public final class ByteBuffersDataInput extends DataInput @Override public void skipBytes(long numBytes) throws IOException { - skipBytesSlowly(numBytes); + if (numBytes < 0) { + throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes); + } + long skipTo = position() + numBytes; + seek(skipTo); } public ByteBuffersDataInput slice(long offset, long length) { diff --git a/lucene/core/src/java/org/apache/lucene/store/ChecksumIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ChecksumIndexInput.java index 04a21f07a06..6f4a1aef1f7 100644 --- a/lucene/core/src/java/org/apache/lucene/store/ChecksumIndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/ChecksumIndexInput.java @@ -24,6 +24,17 @@ import java.io.IOException; */ public abstract class ChecksumIndexInput extends IndexInput { + private static final int SKIP_BUFFER_SIZE = 1024; + + /* This buffer is used when skipping bytes in skipBytes(). Skipping bytes + * still requires reading in the bytes we skip in order to update the checksum. + * The reason we need to use an instance member instead of sharing a single + * static instance across threads is that multiple instances invoking skipBytes() + * concurrently on different threads can clobber the contents of a shared buffer, + * corrupting the checksum. See LUCENE-5583 for additional context. + */ + private byte[] skipBuffer; + /** * resourceDescription should be a non-null, opaque string describing this resource; it's returned * from {@link #toString}. @@ -50,10 +61,23 @@ public abstract class ChecksumIndexInput extends IndexInput { throw new IllegalStateException( getClass() + " cannot seek backwards (pos=" + pos + " getFilePointer()=" + curFP + ")"); } - // we must skip slowly to ensure skipped bytes are still read and used - // to update checksums - // TODO: this "slow skip" logic should be moved into this class once - // no longer needed as default logic in DataInput - skipBytesSlowly(skip); + skipByReading(skip); + } + + /** + * Skip over numBytes bytes. The contract on this method is that it should have the + * same behavior as reading the same number of bytes into a buffer and discarding its content. + * Negative values of numBytes are not supported. + */ + private void skipByReading(long numBytes) throws IOException { + if (skipBuffer == null) { + skipBuffer = new byte[SKIP_BUFFER_SIZE]; + } + assert skipBuffer.length == SKIP_BUFFER_SIZE; + for (long skipped = 0; skipped < numBytes; ) { + final int step = (int) Math.min(SKIP_BUFFER_SIZE, numBytes - skipped); + readBytes(skipBuffer, 0, step, false); + skipped += step; + } } } diff --git a/lucene/core/src/java/org/apache/lucene/store/DataInput.java b/lucene/core/src/java/org/apache/lucene/store/DataInput.java index 11776195728..d7d1eade8dc 100644 --- a/lucene/core/src/java/org/apache/lucene/store/DataInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/DataInput.java @@ -41,8 +41,8 @@ public abstract class DataInput implements Cloneable { private static final int SKIP_BUFFER_SIZE = 1024; - /* This buffer is used to skip over bytes with the default implementation of - * skipBytes. The reason why we need to use an instance member instead of + /* This buffer is used to skip over bytes with the slow implementation of + * skipBytesSlowly. The reason why we need to use an instance member instead of * sharing a single instance across threads is that some delegating * implementations of DataInput might want to reuse the provided buffer in * order to eg. update the checksum. If we shared the same buffer across diff --git a/lucene/core/src/java/org/apache/lucene/store/InputStreamDataInput.java b/lucene/core/src/java/org/apache/lucene/store/InputStreamDataInput.java index f86185ae79b..803959711bc 100644 --- a/lucene/core/src/java/org/apache/lucene/store/InputStreamDataInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/InputStreamDataInput.java @@ -53,6 +53,13 @@ public class InputStreamDataInput extends DataInput implements Closeable { @Override public void skipBytes(long numBytes) throws IOException { - skipBytesSlowly(numBytes); + if (numBytes < 0) { + throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes); + } + long skipped = is.skip(numBytes); + assert skipped <= numBytes; + if (skipped < numBytes) { + throw new EOFException(); + } } } diff --git a/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java b/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java index 6916ef14834..dd495a9fff8 100644 --- a/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java +++ b/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java @@ -345,8 +345,12 @@ public final class PagedBytes implements Accountable { } @Override - public void skipBytes(long numBytes) throws IOException { - skipBytesSlowly(numBytes); + public void skipBytes(long numBytes) { + if (numBytes < 0) { + throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes); + } + final long skipTo = getPosition() + numBytes; + setPosition(skipTo); } private void nextBlock() { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestByteSliceReader.java b/lucene/core/src/test/org/apache/lucene/index/TestByteSliceReader.java new file mode 100644 index 00000000000..b0bc2f9d54b --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestByteSliceReader.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.util.Random; +import org.apache.lucene.util.ByteBlockPool; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.TestUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class TestByteSliceReader extends LuceneTestCase { + private static byte[] RANDOM_DATA; + private static ByteBlockPool BLOCK_POOL; + private static int BLOCK_POOL_END; + + @BeforeClass + public static void beforeClass() { + int len = atLeast(100); + RANDOM_DATA = new byte[len]; + random().nextBytes(RANDOM_DATA); + + BLOCK_POOL = new ByteBlockPool(new ByteBlockPool.DirectAllocator()); + BLOCK_POOL.nextBuffer(); + byte[] buffer = BLOCK_POOL.buffer; + int upto = BLOCK_POOL.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE); + for (byte randomByte : RANDOM_DATA) { + if ((buffer[upto] & 16) != 0) { + upto = BLOCK_POOL.allocSlice(buffer, upto); + buffer = BLOCK_POOL.buffer; + } + buffer[upto++] = randomByte; + } + BLOCK_POOL_END = upto; + } + + @AfterClass + public static void afterClass() { + RANDOM_DATA = null; + BLOCK_POOL = null; + } + + public void testReadByte() { + ByteSliceReader sliceReader = new ByteSliceReader(); + sliceReader.init(BLOCK_POOL, 0, BLOCK_POOL_END); + for (byte expected : RANDOM_DATA) { + assertEquals(expected, sliceReader.readByte()); + } + } + + public void testSkipBytes() { + Random random = random(); + ByteSliceReader sliceReader = new ByteSliceReader(); + + int maxSkipTo = RANDOM_DATA.length - 1; + int iterations = atLeast(random, 10); + for (int i = 0; i < iterations; i++) { + sliceReader.init(BLOCK_POOL, 0, BLOCK_POOL_END); + // skip random chunks of bytes until exhausted + for (int curr = 0; curr < maxSkipTo; ) { + int skipTo = TestUtil.nextInt(random, curr, maxSkipTo); + int step = skipTo - curr; + sliceReader.skipBytes(step); + assertEquals(RANDOM_DATA[skipTo], sliceReader.readByte()); + curr = skipTo + 1; // +1 for read byte + } + } + } +} diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java index 8d8bcd4dc09..fdbcd42a650 100644 --- a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java +++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java @@ -139,7 +139,7 @@ public final class TestByteBuffersDataInput extends RandomizedTest { } @Test - public void testSeek() throws Exception { + public void testSeekAndSkip() throws Exception { for (int reps = randomIntBetween(1, 200); --reps > 0; ) { ByteBuffersDataOutput dst = new ByteBuffersDataOutput(); @@ -169,12 +169,26 @@ public final class TestByteBuffersDataInput extends RandomizedTest { byte[] array = dst.toArrayCopy(); array = ArrayUtil.copyOfSubArray(array, prefix.length, array.length); + // test seeking for (int i = 0; i < 1000; i++) { int offs = randomIntBetween(0, array.length - 1); in.seek(offs); assertEquals(offs, in.position()); assertEquals(array[offs], in.readByte()); } + + // test skipping + int maxSkipTo = array.length - 1; + in.seek(0); + // skip chunks of bytes until exhausted + for (int curr = 0; curr < maxSkipTo; ) { + int skipTo = randomIntBetween(curr, maxSkipTo); + int step = skipTo - curr; + in.skipBytes(step); + assertEquals(array[skipTo], in.readByte()); + curr = skipTo + 1; // +1 for read byte + } + in.seek(in.size()); assertEquals(in.size(), in.position()); LuceneTestCase.expectThrows( diff --git a/lucene/core/src/test/org/apache/lucene/store/TestInputStreamDataInput.java b/lucene/core/src/test/org/apache/lucene/store/TestInputStreamDataInput.java new file mode 100644 index 00000000000..26f8fd406b8 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/store/TestInputStreamDataInput.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Random; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.TestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +public class TestInputStreamDataInput extends LuceneTestCase { + private static byte[] RANDOM_DATA; + private InputStreamDataInput in; + + @BeforeClass + public static void beforeClass() { + RANDOM_DATA = new byte[atLeast(100)]; + random().nextBytes(RANDOM_DATA); + } + + @AfterClass + public static void afterClass() { + RANDOM_DATA = null; + } + + @Before + public void before() { + in = new NoReadInputStreamDataInput(new ByteArrayInputStream(RANDOM_DATA)); + } + + @After + public void after() throws IOException { + in.close(); + in = null; + } + + public void testSkipBytes() throws IOException { + Random random = random(); + // not using the wrapped (NoReadInputStreamDataInput) here since we want to actually read and + // verify + InputStreamDataInput in = new InputStreamDataInput(new ByteArrayInputStream(RANDOM_DATA)); + int maxSkipTo = RANDOM_DATA.length - 1; + // skip chunks of bytes until exhausted + for (int curr = 0; curr < maxSkipTo; ) { + int skipTo = TestUtil.nextInt(random, curr, maxSkipTo); + int step = skipTo - curr; + in.skipBytes(step); + assertEquals(RANDOM_DATA[skipTo], in.readByte()); + curr = skipTo + 1; // +1 for read byte + } + in.close(); + } + + public void testNoReadWhenSkipping() throws IOException { + Random random = random(); + int maxSkipTo = RANDOM_DATA.length - 1; + // skip chunks of bytes until exhausted + for (int curr = 0; curr < maxSkipTo; ) { + int step = TestUtil.nextInt(random, 0, maxSkipTo - curr); + in.skipBytes(step); + curr += step; + } + } + + public void testFullSkip() throws IOException { + in.skipBytes(RANDOM_DATA.length); + } + + public void testSkipOffEnd() { + expectThrows(EOFException.class, () -> in.skipBytes(RANDOM_DATA.length + 1)); + } + + /** Throws if trying to read bytes to ensure skipBytes doesn't invoke read */ + private static final class NoReadInputStreamDataInput extends InputStreamDataInput { + + public NoReadInputStreamDataInput(InputStream is) { + super(is); + } + + @Override + public void readBytes(byte[] b, int offset, int len) { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java b/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java index 837259e115c..28050b4b562 100644 --- a/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java +++ b/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java @@ -155,6 +155,18 @@ public class TestPagedBytes extends LuceneTestCase { assertEquals(answer[pos + byteUpto], slice.bytes[slice.offset + byteUpto]); } } + + // test skipping + final DataInput in2 = p.getDataInput(); + int maxSkipTo = numBytes - 1; + // skip chunks of bytes until exhausted + for (int curr = 0; curr < maxSkipTo; ) { + int skipTo = TestUtil.nextInt(random, curr, maxSkipTo); + int step = skipTo - curr; + in2.skipBytes(step); + assertEquals(answer[skipTo], in2.readByte()); + curr = skipTo + 1; // +1 for read byte + } } }