Add prefetching support to term vectors. (#13758)

This follows the same approach that we used for stored fields.
This commit is contained in:
Adrien Grand 2024-09-13 07:45:12 +02:00 committed by GitHub
parent 3a8f75ccfc
commit e8e179cd45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 199 additions and 2 deletions

View File

@ -31,6 +31,7 @@ import static org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingT
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
@ -77,6 +78,9 @@ import org.apache.lucene.util.packed.PackedInts;
*/
public final class Lucene90CompressingTermVectorsReader extends TermVectorsReader {
private static final int PREFETCH_CACHE_SIZE = 1 << 4;
private static final int PREFETCH_CACHE_MASK = PREFETCH_CACHE_SIZE - 1;
private final FieldInfos fieldInfos;
final FieldsIndex indexReader;
final IndexInput vectorsStream;
@ -93,6 +97,11 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
private final long numDirtyDocs; // cumulative number of docs in incomplete chunks
private final long maxPointer; // end of the data section
private BlockState blockState = new BlockState(-1, -1, 0);
// Cache of recently prefetched block IDs. This helps reduce chances of prefetching the same block
// multiple times, which is otherwise likely due to index sorting or recursive graph bisection
// clustering similar documents together. NOTE: this cache must be small since it's fully scanned.
private final long[] prefetchedBlockIDCache;
private int prefetchedBlockIDCacheIndex;
// used by clone
private Lucene90CompressingTermVectorsReader(Lucene90CompressingTermVectorsReader reader) {
@ -111,6 +120,8 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
this.numDirtyChunks = reader.numDirtyChunks;
this.numDirtyDocs = reader.numDirtyDocs;
this.maxPointer = reader.maxPointer;
this.prefetchedBlockIDCache = new long[PREFETCH_CACHE_SIZE];
Arrays.fill(prefetchedBlockIDCache, -1);
this.closed = false;
}
@ -212,6 +223,9 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
CodecUtil.checkFooter(metaIn, null);
metaIn.close();
this.prefetchedBlockIDCache = new long[PREFETCH_CACHE_SIZE];
Arrays.fill(prefetchedBlockIDCache, -1);
success = true;
} catch (Throwable t) {
if (metaIn != null) {
@ -327,6 +341,23 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
private record BlockState(long startPointer, int docBase, int chunkDocs) {}
@Override
public void prefetch(int docID) throws IOException {
final long blockID = indexReader.getBlockID(docID);
for (long prefetchedBlockID : prefetchedBlockIDCache) {
if (prefetchedBlockID == blockID) {
return;
}
}
final long blockStartPointer = indexReader.getBlockStartPointer(blockID);
final long blockLength = indexReader.getBlockLength(blockID);
vectorsStream.prefetch(blockStartPointer, blockLength);
prefetchedBlockIDCache[prefetchedBlockIDCacheIndex++ & PREFETCH_CACHE_MASK] = blockID;
}
@Override
public Fields get(int doc) throws IOException {
ensureOpen();

View File

@ -117,6 +117,15 @@ public abstract class BaseCompositeReader<R extends IndexReader> extends Composi
ensureOpen();
TermVectors[] subVectors = new TermVectors[subReaders.length];
return new TermVectors() {
@Override
public void prefetch(int docID) throws IOException {
final int i = readerIndex(docID); // find subreader num
if (subVectors[i] == null) {
subVectors[i] = subReaders[i].termVectors();
}
subVectors[i].prefetch(docID - starts[i]);
}
@Override
public Fields get(int docID) throws IOException {
final int i = readerIndex(docID); // find subreader num

View File

@ -3810,6 +3810,9 @@ public final class CheckIndex implements Closeable {
if (vectorsReader != null) {
vectorsReader = vectorsReader.getMergeInstance();
for (int j = 0; j < reader.maxDoc(); ++j) {
if ((j & 0x03) == 0) {
vectorsReader.prefetch(j);
}
// Intentionally pull/visit (but don't count in
// stats) deleted documents to make sure they too
// are not corrupt:

View File

@ -48,6 +48,7 @@ import org.apache.lucene.util.Version;
* behavior</em>.
*/
public class ParallelLeafReader extends LeafReader {
private final FieldInfos fieldInfos;
private final LeafReader[] parallelReaders, storedFieldsReaders;
private final Set<LeafReader> completeReaderSet =
@ -325,14 +326,32 @@ public class ParallelLeafReader extends LeafReader {
@Override
public TermVectors termVectors() throws IOException {
ensureOpen();
// TODO: optimize
Map<LeafReader, TermVectors> readerToTermVectors = new IdentityHashMap<>();
for (LeafReader reader : parallelReaders) {
if (reader.getFieldInfos().hasVectors()) {
TermVectors termVectors = reader.termVectors();
readerToTermVectors.put(reader, termVectors);
}
}
return new TermVectors() {
@Override
public void prefetch(int docID) throws IOException {
// Prefetch all vectors. Note that this may be wasteful if the consumer doesn't need to read
// all the fields but we have no way to know what fields the consumer needs.
for (TermVectors termVectors : readerToTermVectors.values()) {
termVectors.prefetch(docID);
}
}
@Override
public Fields get(int docID) throws IOException {
ParallelFields fields = null;
for (Map.Entry<String, LeafReader> ent : tvFieldToReader.entrySet()) {
String fieldName = ent.getKey();
Terms vector = ent.getValue().termVectors().get(docID, fieldName);
TermVectors termVectors = readerToTermVectors.get(ent.getValue());
Terms vector = termVectors.get(docID, fieldName);
if (vector != null) {
if (fields == null) {
fields = new ParallelFields();

View File

@ -296,6 +296,11 @@ public final class SlowCodecReaderWrapper {
throw new UncheckedIOException(e);
}
return new TermVectorsReader() {
@Override
public void prefetch(int docID) throws IOException {
termVectors.prefetch(docID);
}
@Override
public Fields get(int docID) throws IOException {
return termVectors.get(docID);

View File

@ -244,6 +244,15 @@ final class SlowCompositeCodecReaderWrapper extends CodecReader {
}
}
@Override
public void prefetch(int doc) throws IOException {
int readerId = docIdToReaderId(doc);
TermVectorsReader reader = readers[readerId];
if (reader != null) {
reader.prefetch(doc - docStarts[readerId]);
}
}
@Override
public Fields get(int doc) throws IOException {
int readerId = docIdToReaderId(doc);

View File

@ -655,6 +655,11 @@ public final class SortingCodecReader extends FilterCodecReader {
private TermVectorsReader newTermVectorsReader(TermVectorsReader delegate) {
return new TermVectorsReader() {
@Override
public void prefetch(int doc) throws IOException {
delegate.prefetch(docMap.newToOld(doc));
}
@Override
public Fields get(int doc) throws IOException {
return delegate.get(docMap.newToOld(doc));

View File

@ -18,6 +18,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.analysis.tokenattributes.OffsetAttribute; // javadocs
import org.apache.lucene.store.IndexInput;
/**
* API for reading term vectors.
@ -30,6 +31,18 @@ public abstract class TermVectors {
/** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
protected TermVectors() {}
/**
* Optional method: Give a hint to this {@link TermVectors} instance that the given document will
* be read in the near future. This typically delegates to {@link IndexInput#prefetch} and is
* useful to parallelize I/O across multiple documents.
*
* <p>NOTE: This API is expected to be called on a small enough set of doc IDs that they could all
* fit in the page cache. If you plan on retrieving a very large number of documents, it may be a
* good idea to perform calls to {@link #prefetch} and {@link #get} in batches instead of
* prefetching all documents up-front.
*/
public void prefetch(int docID) throws IOException {}
/**
* Returns term vectors for this document, or null if term vectors were not indexed.
*

View File

@ -16,7 +16,24 @@
*/
package org.apache.lucene.codecs.lucene90;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.TermVectors;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.FilterIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.tests.codecs.compressing.dummy.DummyCompressingCodec;
import org.apache.lucene.tests.index.BaseTermVectorsFormatTestCase;
import org.apache.lucene.tests.util.TestUtil;
@ -25,4 +42,84 @@ public class TestLucene90TermVectorsFormat extends BaseTermVectorsFormatTestCase
protected Codec getCodec() {
return TestUtil.getDefaultCodec();
}
private static class CountingPrefetchDirectory extends FilterDirectory {
private final AtomicInteger counter;
CountingPrefetchDirectory(Directory in, AtomicInteger counter) {
super(in);
this.counter = counter;
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return new CountingPrefetchIndexInput(super.openInput(name, context), counter);
}
}
private static class CountingPrefetchIndexInput extends FilterIndexInput {
private final AtomicInteger counter;
public CountingPrefetchIndexInput(IndexInput input, AtomicInteger counter) {
super(input.toString(), input);
this.counter = counter;
}
@Override
public void prefetch(long offset, long length) throws IOException {
in.prefetch(offset, length);
counter.incrementAndGet();
}
@Override
public IndexInput clone() {
return new CountingPrefetchIndexInput(in.clone(), counter);
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
return new CountingPrefetchIndexInput(in.slice(sliceDescription, offset, length), counter);
}
}
public void testSkipRedundantPrefetches() throws IOException {
// Use the "dummy" codec, which has the same base class as Lucene90StoredFieldsFormat but allows
// configuring the number of docs per chunk.
Codec codec = new DummyCompressingCodec(1 << 10, 2, false, 16);
try (Directory origDir = newDirectory()) {
AtomicInteger counter = new AtomicInteger();
Directory dir = new CountingPrefetchDirectory(origDir, counter);
try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig().setCodec(codec))) {
FieldType ft = new FieldType(TextField.TYPE_NOT_STORED);
ft.setStoreTermVectors(true);
for (int i = 0; i < 100; ++i) {
Document doc = new Document();
doc.add(new Field("content", Integer.toString(i), ft));
w.addDocument(doc);
}
w.forceMerge(1);
}
try (IndexReader reader = DirectoryReader.open(dir)) {
TermVectors termVectors = reader.termVectors();
counter.set(0);
assertEquals(0, counter.get());
termVectors.prefetch(0);
assertEquals(1, counter.get());
termVectors.prefetch(1);
// This format has 2 docs per block, so the second prefetch is skipped
assertEquals(1, counter.get());
termVectors.prefetch(15);
assertEquals(2, counter.get());
termVectors.prefetch(14);
// 14 is in the same block as 15, so the prefetch was skipped
assertEquals(2, counter.get());
// Already prefetched in the past, so skipped again
termVectors.prefetch(1);
assertEquals(2, counter.get());
}
}
}
}

View File

@ -153,6 +153,12 @@ public class AssertingLeafReader extends FilterLeafReader {
this.in = in;
}
@Override
public void prefetch(int docID) throws IOException {
assertThread("TermVectors", creationThread);
in.prefetch(docID);
}
@Override
public Fields get(int doc) throws IOException {
assertThread("TermVectors", creationThread);