diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/FieldsIndex.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/FieldsIndex.java index 8b27aef8b1e..a0f59575b27 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/FieldsIndex.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/FieldsIndex.java @@ -21,8 +21,19 @@ import java.io.IOException; abstract class FieldsIndex implements Cloneable, Closeable { - /** Get the start pointer for the block that contains the given docID. */ - abstract long getStartPointer(int docID); + /** Get the ID of the block that contains the given docID. */ + abstract long getBlockID(int docID); + + /** Get the start pointer of the block with the given ID. */ + abstract long getBlockStartPointer(long blockID); + + /** Get the number of bytes of the block with the given ID. */ + abstract long getBlockLength(long blockID); + + /** Get the start pointer of the block that contains the given docID. */ + final long getStartPointer(int docID) { + return getBlockStartPointer(getBlockID(docID)); + } /** Check the integrity of the index. */ abstract void checkIntegrity() throws IOException; diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/FieldsIndexReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/FieldsIndexReader.java index 4b21bde85a5..91ac9010f4b 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/FieldsIndexReader.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/FieldsIndexReader.java @@ -117,15 +117,31 @@ final class FieldsIndexReader extends FieldsIndex { } @Override - long getStartPointer(int docID) { + long getBlockID(int docID) { Objects.checkIndex(docID, maxDoc); long blockIndex = docs.binarySearch(0, numChunks, docID); if (blockIndex < 0) { blockIndex = -2 - blockIndex; } + return blockIndex; + } + + @Override + long getBlockStartPointer(long blockIndex) { return startPointers.get(blockIndex); } + @Override + long getBlockLength(long blockIndex) { + final long endPointer; + if (blockIndex == numChunks - 1) { + endPointer = maxPointer; + } else { + endPointer = startPointers.get(blockIndex + 1); + } + return endPointer - getBlockStartPointer(blockIndex); + } + @Override public FieldsIndex clone() { try { diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingStoredFieldsReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingStoredFieldsReader.java index 1b5c307294b..5025e97f8b3 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingStoredFieldsReader.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingStoredFieldsReader.java @@ -40,6 +40,7 @@ import static org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingS import java.io.EOFException; import java.io.IOException; +import java.util.Arrays; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.StoredFieldsReader; import org.apache.lucene.codecs.compressing.CompressionMode; @@ -72,6 +73,9 @@ import org.apache.lucene.util.LongsRef; */ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsReader { + private static final int PREFETCH_CACHE_SIZE = 1 << 4; + private static final int PREFETCH_CACHE_MASK = PREFETCH_CACHE_SIZE - 1; + private final int version; private final FieldInfos fieldInfos; private final FieldsIndex indexReader; @@ -86,6 +90,11 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea private final long numChunks; // number of written blocks private final long numDirtyChunks; // number of incomplete compressed blocks written private final long numDirtyDocs; // cumulative number of docs in incomplete chunks + // Cache of recently prefetched block IDs. This helps reduce chances of prefetching the same block + // multiple times, which is otherwise likely due to index sorting or recursive graph bisection + // clustering similar documents together. NOTE: this cache must be small since it's fully scanned. + private final long[] prefetchedBlockIDCache; + private int prefetchedBlockIDCacheIndex; private boolean closed; // used by clone @@ -103,6 +112,8 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea this.numChunks = reader.numChunks; this.numDirtyChunks = reader.numDirtyChunks; this.numDirtyDocs = reader.numDirtyDocs; + this.prefetchedBlockIDCache = new long[PREFETCH_CACHE_SIZE]; + Arrays.fill(prefetchedBlockIDCache, -1); this.merging = merging; this.state = new BlockState(); this.closed = false; @@ -150,6 +161,8 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea chunkSize = metaIn.readVInt(); decompressor = compressionMode.newDecompressor(); + this.prefetchedBlockIDCache = new long[PREFETCH_CACHE_SIZE]; + Arrays.fill(prefetchedBlockIDCache, -1); this.merging = false; this.state = new BlockState(); @@ -609,6 +622,23 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea } } + @Override + public void prefetch(int docID) throws IOException { + final long blockID = indexReader.getBlockID(docID); + + for (long prefetchedBlockID : prefetchedBlockIDCache) { + if (prefetchedBlockID == blockID) { + return; + } + } + + final long blockStartPointer = indexReader.getBlockStartPointer(blockID); + final long blockLength = indexReader.getBlockLength(blockID); + fieldsStream.prefetch(blockStartPointer, blockLength); + + prefetchedBlockIDCache[prefetchedBlockIDCacheIndex++ & PREFETCH_CACHE_MASK] = blockID; + } + SerializedDocument serializedDocument(int docID) throws IOException { if (state.contains(docID) == false) { fieldsStream.seek(indexReader.getStartPointer(docID)); diff --git a/lucene/core/src/java/org/apache/lucene/index/BaseCompositeReader.java b/lucene/core/src/java/org/apache/lucene/index/BaseCompositeReader.java index e4e308a374e..68fc36120a6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BaseCompositeReader.java +++ b/lucene/core/src/java/org/apache/lucene/index/BaseCompositeReader.java @@ -163,6 +163,15 @@ public abstract class BaseCompositeReader extends Composi ensureOpen(); StoredFields[] subFields = new StoredFields[subReaders.length]; return new StoredFields() { + @Override + public void prefetch(int docID) throws IOException { + final int i = readerIndex(docID); // find subreader num + if (subFields[i] == null) { + subFields[i] = subReaders[i].storedFields(); + } + subFields[i].prefetch(docID - starts[i]); + } + @Override public void document(int docID, StoredFieldVisitor visitor) throws IOException { final int i = readerIndex(docID); // find subreader num diff --git a/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java b/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java index 6f950fc26a3..f4e3d1248d8 100644 --- a/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java +++ b/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java @@ -3164,6 +3164,9 @@ public final class CheckIndex implements Closeable { // Intentionally pull even deleted documents to // make sure they too are not corrupt: DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor(); + if ((j & 0x03) == 0) { + storedFields.prefetch(j); + } storedFields.document(j, visitor); Document doc = visitor.getDocument(); if (liveDocs == null || liveDocs.get(j)) { diff --git a/lucene/core/src/java/org/apache/lucene/index/CodecReader.java b/lucene/core/src/java/org/apache/lucene/index/CodecReader.java index bd4718c9c4c..980f2dd9582 100644 --- a/lucene/core/src/java/org/apache/lucene/index/CodecReader.java +++ b/lucene/core/src/java/org/apache/lucene/index/CodecReader.java @@ -87,6 +87,13 @@ public abstract class CodecReader extends LeafReader { public final StoredFields storedFields() throws IOException { final StoredFields reader = getFieldsReader(); return new StoredFields() { + @Override + public void prefetch(int docID) throws IOException { + // Don't trust the codec to do proper checks + Objects.checkIndex(docID, maxDoc()); + reader.prefetch(docID); + } + @Override public void document(int docID, StoredFieldVisitor visitor) throws IOException { // Don't trust the codec to do proper checks diff --git a/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java b/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java index 374dbb31194..19c2fd6bd4e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java +++ b/lucene/core/src/java/org/apache/lucene/index/ParallelLeafReader.java @@ -280,6 +280,13 @@ public class ParallelLeafReader extends LeafReader { fields[i] = storedFieldsReaders[i].storedFields(); } return new StoredFields() { + @Override + public void prefetch(int docID) throws IOException { + for (StoredFields reader : fields) { + reader.prefetch(docID); + } + } + @Override public void document(int docID, StoredFieldVisitor visitor) throws IOException { for (StoredFields reader : fields) { diff --git a/lucene/core/src/java/org/apache/lucene/index/SlowCodecReaderWrapper.java b/lucene/core/src/java/org/apache/lucene/index/SlowCodecReaderWrapper.java index 138995a2352..497fa1162bd 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SlowCodecReaderWrapper.java +++ b/lucene/core/src/java/org/apache/lucene/index/SlowCodecReaderWrapper.java @@ -258,6 +258,11 @@ public final class SlowCodecReaderWrapper { throw new UncheckedIOException(e); } return new StoredFieldsReader() { + @Override + public void prefetch(int docID) throws IOException { + storedFields.prefetch(docID); + } + @Override public void document(int docID, StoredFieldVisitor visitor) throws IOException { storedFields.document(docID, visitor); diff --git a/lucene/core/src/java/org/apache/lucene/index/SlowCompositeCodecReaderWrapper.java b/lucene/core/src/java/org/apache/lucene/index/SlowCompositeCodecReaderWrapper.java index f01a9bb966b..5809a9aa4f4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SlowCompositeCodecReaderWrapper.java +++ b/lucene/core/src/java/org/apache/lucene/index/SlowCompositeCodecReaderWrapper.java @@ -154,6 +154,12 @@ final class SlowCompositeCodecReaderWrapper extends CodecReader { } } + @Override + public void prefetch(int docID) throws IOException { + int readerId = docIdToReaderId(docID); + readers[readerId].prefetch(docID - docStarts[readerId]); + } + @Override public void document(int docID, StoredFieldVisitor visitor) throws IOException { int readerId = docIdToReaderId(docID); diff --git a/lucene/core/src/java/org/apache/lucene/index/SortingCodecReader.java b/lucene/core/src/java/org/apache/lucene/index/SortingCodecReader.java index ae943735980..4d1cb4c8cdb 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SortingCodecReader.java +++ b/lucene/core/src/java/org/apache/lucene/index/SortingCodecReader.java @@ -441,6 +441,11 @@ public final class SortingCodecReader extends FilterCodecReader { private StoredFieldsReader newStoredFieldsReader(StoredFieldsReader delegate) { return new StoredFieldsReader() { + @Override + public void prefetch(int docID) throws IOException { + delegate.prefetch(docMap.newToOld(docID)); + } + @Override public void document(int docID, StoredFieldVisitor visitor) throws IOException { delegate.document(docMap.newToOld(docID), visitor); diff --git a/lucene/core/src/java/org/apache/lucene/index/StoredFields.java b/lucene/core/src/java/org/apache/lucene/index/StoredFields.java index 2657d96d1a7..5da5f1ab447 100644 --- a/lucene/core/src/java/org/apache/lucene/index/StoredFields.java +++ b/lucene/core/src/java/org/apache/lucene/index/StoredFields.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Set; import org.apache.lucene.document.Document; import org.apache.lucene.document.DocumentStoredFieldVisitor; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.Bits; /** @@ -32,6 +33,18 @@ public abstract class StoredFields { /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ protected StoredFields() {} + /** + * Optional method: Give a hint to this {@link StoredFields} instance that the given document will + * be read in the near future. This typically delegates to {@link IndexInput#prefetch} and is + * useful to parallelize I/O across multiple documents. + * + *

NOTE: This API is expected to be called on a small enough set of doc IDs that they could all + * fit in the page cache. If you plan on retrieving a very large number of documents, it may be a + * good idea to perform calls to {@link #prefetch} and {@link #document} in batches instead of + * prefetching all documents up-front. + */ + public void prefetch(int docID) throws IOException {} + /** * Returns the stored fields of the nth Document in this * index. This is just sugar for using {@link DocumentStoredFieldVisitor}. diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene90/TestLucene90StoredFieldsFormat.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene90/TestLucene90StoredFieldsFormat.java index ad6c60ca435..ae9c6d7b53f 100644 --- a/lucene/core/src/test/org/apache/lucene/codecs/lucene90/TestLucene90StoredFieldsFormat.java +++ b/lucene/core/src/test/org/apache/lucene/codecs/lucene90/TestLucene90StoredFieldsFormat.java @@ -16,7 +16,22 @@ */ package org.apache.lucene.codecs.lucene90; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.FilterIndexInput; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.tests.codecs.compressing.dummy.DummyCompressingCodec; import org.apache.lucene.tests.index.BaseStoredFieldsFormatTestCase; import org.apache.lucene.tests.util.TestUtil; @@ -25,4 +40,82 @@ public class TestLucene90StoredFieldsFormat extends BaseStoredFieldsFormatTestCa protected Codec getCodec() { return TestUtil.getDefaultCodec(); } + + private static class CountingPrefetchDirectory extends FilterDirectory { + + private final AtomicInteger counter; + + CountingPrefetchDirectory(Directory in, AtomicInteger counter) { + super(in); + this.counter = counter; + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + return new CountingPrefetchIndexInput(super.openInput(name, context), counter); + } + } + + private static class CountingPrefetchIndexInput extends FilterIndexInput { + + private final AtomicInteger counter; + + public CountingPrefetchIndexInput(IndexInput input, AtomicInteger counter) { + super(input.toString(), input); + this.counter = counter; + } + + @Override + public void prefetch(long offset, long length) throws IOException { + in.prefetch(offset, length); + counter.incrementAndGet(); + } + + @Override + public IndexInput clone() { + return new CountingPrefetchIndexInput(in.clone(), counter); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + return new CountingPrefetchIndexInput(in.slice(sliceDescription, offset, length), counter); + } + } + + public void testSkipRedundantPrefetches() throws IOException { + // Use the "dummy" codec, which has the same base class as Lucene90StoredFieldsFormat but allows + // configuring the number of docs per chunk. + Codec codec = new DummyCompressingCodec(1 << 10, 2, false, 16); + try (Directory origDir = newDirectory()) { + AtomicInteger counter = new AtomicInteger(); + Directory dir = new CountingPrefetchDirectory(origDir, counter); + try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig().setCodec(codec))) { + for (int i = 0; i < 100; ++i) { + Document doc = new Document(); + doc.add(new StoredField("content", TestUtil.randomSimpleString(random()))); + w.addDocument(doc); + } + w.forceMerge(1); + } + + try (IndexReader reader = DirectoryReader.open(dir)) { + StoredFields storedFields = reader.storedFields(); + counter.set(0); + assertEquals(0, counter.get()); + storedFields.prefetch(0); + assertEquals(1, counter.get()); + storedFields.prefetch(1); + // This format has 2 docs per block, so the second prefetch is skipped + assertEquals(1, counter.get()); + storedFields.prefetch(15); + assertEquals(2, counter.get()); + storedFields.prefetch(14); + // 14 is in the same block as 15, so the prefetch was skipped + assertEquals(2, counter.get()); + // Already prefetched in the past, so skipped again + storedFields.prefetch(1); + assertEquals(2, counter.get()); + } + } + } } diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/index/AssertingLeafReader.java b/lucene/test-framework/src/java/org/apache/lucene/tests/index/AssertingLeafReader.java index c61c88ff143..8f55dfb5322 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/index/AssertingLeafReader.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/index/AssertingLeafReader.java @@ -129,6 +129,12 @@ public class AssertingLeafReader extends FilterLeafReader { this.in = in; } + @Override + public void prefetch(int docID) throws IOException { + assertThread("StoredFields", creationThread); + in.prefetch(docID); + } + @Override public void document(int docID, StoredFieldVisitor visitor) throws IOException { assertThread("StoredFields", creationThread);