Add prefetching support to stored fields. (#13424)

This adds `StoredFields#prefetch(int)`, which mostly delegates to
`IndexInput#prefetch`. Callers can take advantage of this API to parallelize
I/O across multiple stored documents by first calling `StoredFields#prefetch`
on all doc IDs before calling `StoredFields#document` on all doc IDs.

I added a cache of recently prefetched blocks to the default codec, in order to
avoid prefetching the same block multiple times in a short period of time. This
felt sensible given that doc ID reordering via recursive graph bisection or
index sorting are likely to result in search results being clustered.
This commit is contained in:
Adrien Grand 2024-06-03 09:25:23 +02:00 committed by GitHub
parent a6f920d989
commit edd7747370
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 214 additions and 3 deletions

View File

@ -21,8 +21,19 @@ import java.io.IOException;
abstract class FieldsIndex implements Cloneable, Closeable {
/** Get the start pointer for the block that contains the given docID. */
abstract long getStartPointer(int docID);
/** Get the ID of the block that contains the given docID. */
abstract long getBlockID(int docID);
/** Get the start pointer of the block with the given ID. */
abstract long getBlockStartPointer(long blockID);
/** Get the number of bytes of the block with the given ID. */
abstract long getBlockLength(long blockID);
/** Get the start pointer of the block that contains the given docID. */
final long getStartPointer(int docID) {
return getBlockStartPointer(getBlockID(docID));
}
/** Check the integrity of the index. */
abstract void checkIntegrity() throws IOException;

View File

@ -117,15 +117,31 @@ final class FieldsIndexReader extends FieldsIndex {
}
@Override
long getStartPointer(int docID) {
long getBlockID(int docID) {
Objects.checkIndex(docID, maxDoc);
long blockIndex = docs.binarySearch(0, numChunks, docID);
if (blockIndex < 0) {
blockIndex = -2 - blockIndex;
}
return blockIndex;
}
@Override
long getBlockStartPointer(long blockIndex) {
return startPointers.get(blockIndex);
}
@Override
long getBlockLength(long blockIndex) {
final long endPointer;
if (blockIndex == numChunks - 1) {
endPointer = maxPointer;
} else {
endPointer = startPointers.get(blockIndex + 1);
}
return endPointer - getBlockStartPointer(blockIndex);
}
@Override
public FieldsIndex clone() {
try {

View File

@ -40,6 +40,7 @@ import static org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingS
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.compressing.CompressionMode;
@ -72,6 +73,9 @@ import org.apache.lucene.util.LongsRef;
*/
public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsReader {
private static final int PREFETCH_CACHE_SIZE = 1 << 4;
private static final int PREFETCH_CACHE_MASK = PREFETCH_CACHE_SIZE - 1;
private final int version;
private final FieldInfos fieldInfos;
private final FieldsIndex indexReader;
@ -86,6 +90,11 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
private final long numChunks; // number of written blocks
private final long numDirtyChunks; // number of incomplete compressed blocks written
private final long numDirtyDocs; // cumulative number of docs in incomplete chunks
// Cache of recently prefetched block IDs. This helps reduce chances of prefetching the same block
// multiple times, which is otherwise likely due to index sorting or recursive graph bisection
// clustering similar documents together. NOTE: this cache must be small since it's fully scanned.
private final long[] prefetchedBlockIDCache;
private int prefetchedBlockIDCacheIndex;
private boolean closed;
// used by clone
@ -103,6 +112,8 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
this.numChunks = reader.numChunks;
this.numDirtyChunks = reader.numDirtyChunks;
this.numDirtyDocs = reader.numDirtyDocs;
this.prefetchedBlockIDCache = new long[PREFETCH_CACHE_SIZE];
Arrays.fill(prefetchedBlockIDCache, -1);
this.merging = merging;
this.state = new BlockState();
this.closed = false;
@ -150,6 +161,8 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
chunkSize = metaIn.readVInt();
decompressor = compressionMode.newDecompressor();
this.prefetchedBlockIDCache = new long[PREFETCH_CACHE_SIZE];
Arrays.fill(prefetchedBlockIDCache, -1);
this.merging = false;
this.state = new BlockState();
@ -609,6 +622,23 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
}
}
@Override
public void prefetch(int docID) throws IOException {
final long blockID = indexReader.getBlockID(docID);
for (long prefetchedBlockID : prefetchedBlockIDCache) {
if (prefetchedBlockID == blockID) {
return;
}
}
final long blockStartPointer = indexReader.getBlockStartPointer(blockID);
final long blockLength = indexReader.getBlockLength(blockID);
fieldsStream.prefetch(blockStartPointer, blockLength);
prefetchedBlockIDCache[prefetchedBlockIDCacheIndex++ & PREFETCH_CACHE_MASK] = blockID;
}
SerializedDocument serializedDocument(int docID) throws IOException {
if (state.contains(docID) == false) {
fieldsStream.seek(indexReader.getStartPointer(docID));

View File

@ -163,6 +163,15 @@ public abstract class BaseCompositeReader<R extends IndexReader> extends Composi
ensureOpen();
StoredFields[] subFields = new StoredFields[subReaders.length];
return new StoredFields() {
@Override
public void prefetch(int docID) throws IOException {
final int i = readerIndex(docID); // find subreader num
if (subFields[i] == null) {
subFields[i] = subReaders[i].storedFields();
}
subFields[i].prefetch(docID - starts[i]);
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
final int i = readerIndex(docID); // find subreader num

View File

@ -3164,6 +3164,9 @@ public final class CheckIndex implements Closeable {
// Intentionally pull even deleted documents to
// make sure they too are not corrupt:
DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
if ((j & 0x03) == 0) {
storedFields.prefetch(j);
}
storedFields.document(j, visitor);
Document doc = visitor.getDocument();
if (liveDocs == null || liveDocs.get(j)) {

View File

@ -87,6 +87,13 @@ public abstract class CodecReader extends LeafReader {
public final StoredFields storedFields() throws IOException {
final StoredFields reader = getFieldsReader();
return new StoredFields() {
@Override
public void prefetch(int docID) throws IOException {
// Don't trust the codec to do proper checks
Objects.checkIndex(docID, maxDoc());
reader.prefetch(docID);
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
// Don't trust the codec to do proper checks

View File

@ -280,6 +280,13 @@ public class ParallelLeafReader extends LeafReader {
fields[i] = storedFieldsReaders[i].storedFields();
}
return new StoredFields() {
@Override
public void prefetch(int docID) throws IOException {
for (StoredFields reader : fields) {
reader.prefetch(docID);
}
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
for (StoredFields reader : fields) {

View File

@ -258,6 +258,11 @@ public final class SlowCodecReaderWrapper {
throw new UncheckedIOException(e);
}
return new StoredFieldsReader() {
@Override
public void prefetch(int docID) throws IOException {
storedFields.prefetch(docID);
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
storedFields.document(docID, visitor);

View File

@ -154,6 +154,12 @@ final class SlowCompositeCodecReaderWrapper extends CodecReader {
}
}
@Override
public void prefetch(int docID) throws IOException {
int readerId = docIdToReaderId(docID);
readers[readerId].prefetch(docID - docStarts[readerId]);
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
int readerId = docIdToReaderId(docID);

View File

@ -441,6 +441,11 @@ public final class SortingCodecReader extends FilterCodecReader {
private StoredFieldsReader newStoredFieldsReader(StoredFieldsReader delegate) {
return new StoredFieldsReader() {
@Override
public void prefetch(int docID) throws IOException {
delegate.prefetch(docMap.newToOld(docID));
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
delegate.document(docMap.newToOld(docID), visitor);

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Set;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DocumentStoredFieldVisitor;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.Bits;
/**
@ -32,6 +33,18 @@ public abstract class StoredFields {
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
protected StoredFields() {}
/**
* Optional method: Give a hint to this {@link StoredFields} instance that the given document will
* be read in the near future. This typically delegates to {@link IndexInput#prefetch} and is
* useful to parallelize I/O across multiple documents.
*
* <p>NOTE: This API is expected to be called on a small enough set of doc IDs that they could all
* fit in the page cache. If you plan on retrieving a very large number of documents, it may be a
* good idea to perform calls to {@link #prefetch} and {@link #document} in batches instead of
* prefetching all documents up-front.
*/
public void prefetch(int docID) throws IOException {}
/**
* Returns the stored fields of the <code>n</code><sup>th</sup> <code>Document</code> in this
* index. This is just sugar for using {@link DocumentStoredFieldVisitor}.

View File

@ -16,7 +16,22 @@
*/
package org.apache.lucene.codecs.lucene90;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.FilterIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.tests.codecs.compressing.dummy.DummyCompressingCodec;
import org.apache.lucene.tests.index.BaseStoredFieldsFormatTestCase;
import org.apache.lucene.tests.util.TestUtil;
@ -25,4 +40,82 @@ public class TestLucene90StoredFieldsFormat extends BaseStoredFieldsFormatTestCa
protected Codec getCodec() {
return TestUtil.getDefaultCodec();
}
private static class CountingPrefetchDirectory extends FilterDirectory {
private final AtomicInteger counter;
CountingPrefetchDirectory(Directory in, AtomicInteger counter) {
super(in);
this.counter = counter;
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return new CountingPrefetchIndexInput(super.openInput(name, context), counter);
}
}
private static class CountingPrefetchIndexInput extends FilterIndexInput {
private final AtomicInteger counter;
public CountingPrefetchIndexInput(IndexInput input, AtomicInteger counter) {
super(input.toString(), input);
this.counter = counter;
}
@Override
public void prefetch(long offset, long length) throws IOException {
in.prefetch(offset, length);
counter.incrementAndGet();
}
@Override
public IndexInput clone() {
return new CountingPrefetchIndexInput(in.clone(), counter);
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
return new CountingPrefetchIndexInput(in.slice(sliceDescription, offset, length), counter);
}
}
public void testSkipRedundantPrefetches() throws IOException {
// Use the "dummy" codec, which has the same base class as Lucene90StoredFieldsFormat but allows
// configuring the number of docs per chunk.
Codec codec = new DummyCompressingCodec(1 << 10, 2, false, 16);
try (Directory origDir = newDirectory()) {
AtomicInteger counter = new AtomicInteger();
Directory dir = new CountingPrefetchDirectory(origDir, counter);
try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig().setCodec(codec))) {
for (int i = 0; i < 100; ++i) {
Document doc = new Document();
doc.add(new StoredField("content", TestUtil.randomSimpleString(random())));
w.addDocument(doc);
}
w.forceMerge(1);
}
try (IndexReader reader = DirectoryReader.open(dir)) {
StoredFields storedFields = reader.storedFields();
counter.set(0);
assertEquals(0, counter.get());
storedFields.prefetch(0);
assertEquals(1, counter.get());
storedFields.prefetch(1);
// This format has 2 docs per block, so the second prefetch is skipped
assertEquals(1, counter.get());
storedFields.prefetch(15);
assertEquals(2, counter.get());
storedFields.prefetch(14);
// 14 is in the same block as 15, so the prefetch was skipped
assertEquals(2, counter.get());
// Already prefetched in the past, so skipped again
storedFields.prefetch(1);
assertEquals(2, counter.get());
}
}
}
}

View File

@ -129,6 +129,12 @@ public class AssertingLeafReader extends FilterLeafReader {
this.in = in;
}
@Override
public void prefetch(int docID) throws IOException {
assertThread("StoredFields", creationThread);
in.prefetch(docID);
}
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
assertThread("StoredFields", creationThread);