From c5331df1c42a48b0670ca8d7c0c127f34bb47c95 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 17 May 2024 09:07:07 +0200 Subject: [PATCH] Use IndexInput#prefetch for postings, skip data and impacts (#13364) This uses the `IndexInput#prefetch` API for postings. This relies on heuristics, as we don't know ahead of time what data we will need from a postings list: - Postings lists are prefetched entirely when they are short (< 16kB). - Impacts enums also prefetch the first page of skip data. - Postings enums prefetc skip data on the first call to advance(). Positions, offsets and payloads are never prefetched. Putting the `IndexInput#prefetch` call in `TermsEnum#postings` and `TermsEnum#impacts` works well because `BooleanQuery` will first create postings/impacts enums for all clauses before it starts unioning/intersecting them. This allows the prefetching logic to run in parallel across all clauses of the same query on the same segment. --- .../lucene99/Lucene99PostingsReader.java | 143 +++++++++++++----- 1 file changed, 102 insertions(+), 41 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsReader.java index 362fb34539e..48c093b7570 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsReader.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsReader.java @@ -53,6 +53,9 @@ import org.apache.lucene.util.IOUtils; */ public final class Lucene99PostingsReader extends PostingsReaderBase { + /** Maximum byte size of a postings list to be fully prefetched. */ + private static final int MAX_POSTINGS_SIZE_FOR_FULL_PREFETCH = 16_384; + private final IndexInput docIn; private final IndexInput posIn; private final IndexInput payIn; @@ -321,6 +324,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { private Lucene99SkipReader skipper; private boolean skipped; + private boolean prefetchedSkipData; final IndexInput startDocIn; @@ -393,7 +397,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { // lazy init docIn = startDocIn.clone(); } - docIn.seek(docTermStartFP); + seekAndPrefetchPostings(docIn, termState); } doc = -1; @@ -409,6 +413,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { nextSkipDoc = BLOCK_SIZE - 1; // we won't skip if target is found in first block docBufferUpto = BLOCK_SIZE; skipped = false; + prefetchedSkipData = false; return this; } @@ -501,44 +506,52 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { public int advance(int target) throws IOException { // current skip docID < docIDs generated from current buffer <= next skip docID // we don't need to skip if target is buffered already - if (docFreq > BLOCK_SIZE && target > nextSkipDoc) { + if (docFreq > BLOCK_SIZE) { + if (target <= nextSkipDoc) { + // We don't need skip data yet, but we have evidence that advance() is called, so let's + // prefetch skip data in the background. + if (prefetchedSkipData == false) { + prefetchSkipData(docIn, docTermStartFP, skipOffset); + prefetchedSkipData = true; + } + } else { + if (skipper == null) { + // Lazy init: first time this enum has ever been used for skipping + skipper = + new Lucene99SkipReader( + docIn.clone(), MAX_SKIP_LEVELS, indexHasPos, indexHasOffsets, indexHasPayloads); + } - if (skipper == null) { - // Lazy init: first time this enum has ever been used for skipping - skipper = - new Lucene99SkipReader( - docIn.clone(), MAX_SKIP_LEVELS, indexHasPos, indexHasOffsets, indexHasPayloads); + if (!skipped) { + assert skipOffset != -1; + // This is the first time this enum has skipped + // since reset() was called; load the skip data: + skipper.init(docTermStartFP + skipOffset, docTermStartFP, 0, 0, docFreq); + skipped = true; + } + + // always plus one to fix the result, since skip position in Lucene99SkipReader + // is a little different from MultiLevelSkipListReader + final int newDocUpto = skipper.skipTo(target) + 1; + + if (newDocUpto >= blockUpto) { + // Skipper moved + assert newDocUpto % BLOCK_SIZE == 0 : "got " + newDocUpto; + blockUpto = newDocUpto; + + // Force to read next block + docBufferUpto = BLOCK_SIZE; + accum = skipper.getDoc(); // actually, this is just lastSkipEntry + docIn.seek(skipper.getDocPointer()); // now point to the block we want to search + // even if freqBuffer were not read from the previous block, we will mark them as read, + // as we don't need to skip the previous block freqBuffer in refillDocs, + // as we have already positioned docIn where in needs to be. + isFreqsRead = true; + } + // next time we call advance, this is used to + // foresee whether skipper is necessary. + nextSkipDoc = skipper.getNextSkipDoc(); } - - if (!skipped) { - assert skipOffset != -1; - // This is the first time this enum has skipped - // since reset() was called; load the skip data: - skipper.init(docTermStartFP + skipOffset, docTermStartFP, 0, 0, docFreq); - skipped = true; - } - - // always plus one to fix the result, since skip position in Lucene99SkipReader - // is a little different from MultiLevelSkipListReader - final int newDocUpto = skipper.skipTo(target) + 1; - - if (newDocUpto >= blockUpto) { - // Skipper moved - assert newDocUpto % BLOCK_SIZE == 0 : "got " + newDocUpto; - blockUpto = newDocUpto; - - // Force to read next block - docBufferUpto = BLOCK_SIZE; - accum = skipper.getDoc(); // actually, this is just lastSkipEntry - docIn.seek(skipper.getDocPointer()); // now point to the block we want to search - // even if freqBuffer were not read from the previous block, we will mark them as read, - // as we don't need to skip the previous block freqBuffer in refillDocs, - // as we have already positioned docIn where in needs to be. - isFreqsRead = true; - } - // next time we call advance, this is used to - // foresee whether skipper is necessary. - nextSkipDoc = skipper.getNextSkipDoc(); } if (docBufferUpto == BLOCK_SIZE) { refillDocs(); @@ -594,6 +607,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { private Lucene99SkipReader skipper; private boolean skipped; + private boolean prefetchedSkipData; final IndexInput startDocIn; @@ -715,7 +729,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { // lazy init docIn = startDocIn.clone(); } - docIn.seek(docTermStartFP); + seekAndPrefetchPostings(docIn, termState); } posPendingFP = posTermStartFP; payPendingFP = payTermStartFP; @@ -741,6 +755,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { } docBufferUpto = BLOCK_SIZE; skipped = false; + prefetchedSkipData = false; return this; } @@ -902,6 +917,13 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { payloadByteUpto = skipper.getPayloadByteUpto(); } nextSkipDoc = skipper.getNextSkipDoc(); + } else { + // We don't need skip data yet, but we have evidence that advance() is used, so prefetch it + // in the background. + if (prefetchedSkipData == false) { + prefetchSkipData(docIn, docTermStartFP, skipOffset); + prefetchedSkipData = true; + } } if (docBufferUpto == BLOCK_SIZE) { refillDocs(); @@ -1097,7 +1119,9 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { this.docIn = Lucene99PostingsReader.this.docIn.clone(); docFreq = termState.docFreq; - docIn.seek(termState.docStartFP); + seekAndPrefetchPostings(docIn, termState); + // Impacts almost certainly need skip data + prefetchSkipData(docIn, termState.docStartFP, termState.skipOffset); doc = -1; accum = 0; @@ -1318,7 +1342,8 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { posTermStartFP = termState.posStartFP; payTermStartFP = termState.payStartFP; totalTermFreq = termState.totalTermFreq; - docIn.seek(docTermStartFP); + seekAndPrefetchPostings(docIn, termState); + prefetchSkipData(docIn, termState.docStartFP, termState.skipOffset); posPendingFP = posTermStartFP; posPendingCount = 0; if (termState.totalTermFreq < BLOCK_SIZE) { @@ -1672,7 +1697,8 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { posTermStartFP = termState.posStartFP; payTermStartFP = termState.payStartFP; totalTermFreq = termState.totalTermFreq; - docIn.seek(docTermStartFP); + seekAndPrefetchPostings(docIn, termState); + prefetchSkipData(docIn, termState.docStartFP, termState.skipOffset); posPendingFP = posTermStartFP; payPendingFP = payTermStartFP; posPendingCount = 0; @@ -2049,6 +2075,41 @@ public final class Lucene99PostingsReader extends PostingsReaderBase { } } + private void seekAndPrefetchPostings(IndexInput docIn, IntBlockTermState state) + throws IOException { + if (docIn.getFilePointer() != state.docStartFP) { + // Don't prefetch if the input is already positioned at the right offset, which suggests that + // the caller is streaming the entire inverted index (e.g. for merging), let the read-ahead + // logic do its work instead. Note that this heuristic doesn't work for terms that have skip + // data, since skip data is stored after the last term, but handling all terms that have <128 + // docs is a good start already. + docIn.seek(state.docStartFP); + if (state.skipOffset < 0) { + // This postings list is very short as it doesn't have skip data, prefetch the page that + // holds the first byte of the postings list. + docIn.prefetch(state.docStartFP, 1); + } else if (state.skipOffset <= MAX_POSTINGS_SIZE_FOR_FULL_PREFETCH) { + // This postings list is short as it fits on a few pages, prefetch it all, plus one byte to + // make sure to include some skip data. + docIn.prefetch(state.docStartFP, state.skipOffset + 1); + } else { + // Default case: prefetch the page that holds the first byte of postings. We'll prefetch + // skip data when we have evidence that it is used. + docIn.prefetch(state.docStartFP, 1); + } + } + // Note: we don't prefetch positions or offsets, which are less likely to be needed. + } + + private void prefetchSkipData(IndexInput docIn, long docStartFP, long skipOffset) + throws IOException { + if (skipOffset > MAX_POSTINGS_SIZE_FOR_FULL_PREFETCH) { + // If skipOffset is less than this value, skip data was already prefetched when doing + // #seekAndPrefetchPostings + docIn.prefetch(docStartFP + skipOffset, 1); + } + } + @Override public void checkIntegrity() throws IOException { if (docIn != null) {