mirror of https://github.com/apache/lucene.git
Use IndexInput#prefetch for postings, skip data and impacts (#13364)
This uses the `IndexInput#prefetch` API for postings. This relies on heuristics, as we don't know ahead of time what data we will need from a postings list: - Postings lists are prefetched entirely when they are short (< 16kB). - Impacts enums also prefetch the first page of skip data. - Postings enums prefetc skip data on the first call to advance(). Positions, offsets and payloads are never prefetched. Putting the `IndexInput#prefetch` call in `TermsEnum#postings` and `TermsEnum#impacts` works well because `BooleanQuery` will first create postings/impacts enums for all clauses before it starts unioning/intersecting them. This allows the prefetching logic to run in parallel across all clauses of the same query on the same segment.
This commit is contained in:
parent
3d671a0fbe
commit
c5331df1c4
|
@ -53,6 +53,9 @@ import org.apache.lucene.util.IOUtils;
|
||||||
*/
|
*/
|
||||||
public final class Lucene99PostingsReader extends PostingsReaderBase {
|
public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
|
|
||||||
|
/** Maximum byte size of a postings list to be fully prefetched. */
|
||||||
|
private static final int MAX_POSTINGS_SIZE_FOR_FULL_PREFETCH = 16_384;
|
||||||
|
|
||||||
private final IndexInput docIn;
|
private final IndexInput docIn;
|
||||||
private final IndexInput posIn;
|
private final IndexInput posIn;
|
||||||
private final IndexInput payIn;
|
private final IndexInput payIn;
|
||||||
|
@ -321,6 +324,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
|
|
||||||
private Lucene99SkipReader skipper;
|
private Lucene99SkipReader skipper;
|
||||||
private boolean skipped;
|
private boolean skipped;
|
||||||
|
private boolean prefetchedSkipData;
|
||||||
|
|
||||||
final IndexInput startDocIn;
|
final IndexInput startDocIn;
|
||||||
|
|
||||||
|
@ -393,7 +397,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
// lazy init
|
// lazy init
|
||||||
docIn = startDocIn.clone();
|
docIn = startDocIn.clone();
|
||||||
}
|
}
|
||||||
docIn.seek(docTermStartFP);
|
seekAndPrefetchPostings(docIn, termState);
|
||||||
}
|
}
|
||||||
|
|
||||||
doc = -1;
|
doc = -1;
|
||||||
|
@ -409,6 +413,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
nextSkipDoc = BLOCK_SIZE - 1; // we won't skip if target is found in first block
|
nextSkipDoc = BLOCK_SIZE - 1; // we won't skip if target is found in first block
|
||||||
docBufferUpto = BLOCK_SIZE;
|
docBufferUpto = BLOCK_SIZE;
|
||||||
skipped = false;
|
skipped = false;
|
||||||
|
prefetchedSkipData = false;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -501,44 +506,52 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
public int advance(int target) throws IOException {
|
public int advance(int target) throws IOException {
|
||||||
// current skip docID < docIDs generated from current buffer <= next skip docID
|
// current skip docID < docIDs generated from current buffer <= next skip docID
|
||||||
// we don't need to skip if target is buffered already
|
// we don't need to skip if target is buffered already
|
||||||
if (docFreq > BLOCK_SIZE && target > nextSkipDoc) {
|
if (docFreq > BLOCK_SIZE) {
|
||||||
|
if (target <= nextSkipDoc) {
|
||||||
|
// We don't need skip data yet, but we have evidence that advance() is called, so let's
|
||||||
|
// prefetch skip data in the background.
|
||||||
|
if (prefetchedSkipData == false) {
|
||||||
|
prefetchSkipData(docIn, docTermStartFP, skipOffset);
|
||||||
|
prefetchedSkipData = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (skipper == null) {
|
||||||
|
// Lazy init: first time this enum has ever been used for skipping
|
||||||
|
skipper =
|
||||||
|
new Lucene99SkipReader(
|
||||||
|
docIn.clone(), MAX_SKIP_LEVELS, indexHasPos, indexHasOffsets, indexHasPayloads);
|
||||||
|
}
|
||||||
|
|
||||||
if (skipper == null) {
|
if (!skipped) {
|
||||||
// Lazy init: first time this enum has ever been used for skipping
|
assert skipOffset != -1;
|
||||||
skipper =
|
// This is the first time this enum has skipped
|
||||||
new Lucene99SkipReader(
|
// since reset() was called; load the skip data:
|
||||||
docIn.clone(), MAX_SKIP_LEVELS, indexHasPos, indexHasOffsets, indexHasPayloads);
|
skipper.init(docTermStartFP + skipOffset, docTermStartFP, 0, 0, docFreq);
|
||||||
|
skipped = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// always plus one to fix the result, since skip position in Lucene99SkipReader
|
||||||
|
// is a little different from MultiLevelSkipListReader
|
||||||
|
final int newDocUpto = skipper.skipTo(target) + 1;
|
||||||
|
|
||||||
|
if (newDocUpto >= blockUpto) {
|
||||||
|
// Skipper moved
|
||||||
|
assert newDocUpto % BLOCK_SIZE == 0 : "got " + newDocUpto;
|
||||||
|
blockUpto = newDocUpto;
|
||||||
|
|
||||||
|
// Force to read next block
|
||||||
|
docBufferUpto = BLOCK_SIZE;
|
||||||
|
accum = skipper.getDoc(); // actually, this is just lastSkipEntry
|
||||||
|
docIn.seek(skipper.getDocPointer()); // now point to the block we want to search
|
||||||
|
// even if freqBuffer were not read from the previous block, we will mark them as read,
|
||||||
|
// as we don't need to skip the previous block freqBuffer in refillDocs,
|
||||||
|
// as we have already positioned docIn where in needs to be.
|
||||||
|
isFreqsRead = true;
|
||||||
|
}
|
||||||
|
// next time we call advance, this is used to
|
||||||
|
// foresee whether skipper is necessary.
|
||||||
|
nextSkipDoc = skipper.getNextSkipDoc();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!skipped) {
|
|
||||||
assert skipOffset != -1;
|
|
||||||
// This is the first time this enum has skipped
|
|
||||||
// since reset() was called; load the skip data:
|
|
||||||
skipper.init(docTermStartFP + skipOffset, docTermStartFP, 0, 0, docFreq);
|
|
||||||
skipped = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// always plus one to fix the result, since skip position in Lucene99SkipReader
|
|
||||||
// is a little different from MultiLevelSkipListReader
|
|
||||||
final int newDocUpto = skipper.skipTo(target) + 1;
|
|
||||||
|
|
||||||
if (newDocUpto >= blockUpto) {
|
|
||||||
// Skipper moved
|
|
||||||
assert newDocUpto % BLOCK_SIZE == 0 : "got " + newDocUpto;
|
|
||||||
blockUpto = newDocUpto;
|
|
||||||
|
|
||||||
// Force to read next block
|
|
||||||
docBufferUpto = BLOCK_SIZE;
|
|
||||||
accum = skipper.getDoc(); // actually, this is just lastSkipEntry
|
|
||||||
docIn.seek(skipper.getDocPointer()); // now point to the block we want to search
|
|
||||||
// even if freqBuffer were not read from the previous block, we will mark them as read,
|
|
||||||
// as we don't need to skip the previous block freqBuffer in refillDocs,
|
|
||||||
// as we have already positioned docIn where in needs to be.
|
|
||||||
isFreqsRead = true;
|
|
||||||
}
|
|
||||||
// next time we call advance, this is used to
|
|
||||||
// foresee whether skipper is necessary.
|
|
||||||
nextSkipDoc = skipper.getNextSkipDoc();
|
|
||||||
}
|
}
|
||||||
if (docBufferUpto == BLOCK_SIZE) {
|
if (docBufferUpto == BLOCK_SIZE) {
|
||||||
refillDocs();
|
refillDocs();
|
||||||
|
@ -594,6 +607,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
|
|
||||||
private Lucene99SkipReader skipper;
|
private Lucene99SkipReader skipper;
|
||||||
private boolean skipped;
|
private boolean skipped;
|
||||||
|
private boolean prefetchedSkipData;
|
||||||
|
|
||||||
final IndexInput startDocIn;
|
final IndexInput startDocIn;
|
||||||
|
|
||||||
|
@ -715,7 +729,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
// lazy init
|
// lazy init
|
||||||
docIn = startDocIn.clone();
|
docIn = startDocIn.clone();
|
||||||
}
|
}
|
||||||
docIn.seek(docTermStartFP);
|
seekAndPrefetchPostings(docIn, termState);
|
||||||
}
|
}
|
||||||
posPendingFP = posTermStartFP;
|
posPendingFP = posTermStartFP;
|
||||||
payPendingFP = payTermStartFP;
|
payPendingFP = payTermStartFP;
|
||||||
|
@ -741,6 +755,7 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
}
|
}
|
||||||
docBufferUpto = BLOCK_SIZE;
|
docBufferUpto = BLOCK_SIZE;
|
||||||
skipped = false;
|
skipped = false;
|
||||||
|
prefetchedSkipData = false;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -902,6 +917,13 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
payloadByteUpto = skipper.getPayloadByteUpto();
|
payloadByteUpto = skipper.getPayloadByteUpto();
|
||||||
}
|
}
|
||||||
nextSkipDoc = skipper.getNextSkipDoc();
|
nextSkipDoc = skipper.getNextSkipDoc();
|
||||||
|
} else {
|
||||||
|
// We don't need skip data yet, but we have evidence that advance() is used, so prefetch it
|
||||||
|
// in the background.
|
||||||
|
if (prefetchedSkipData == false) {
|
||||||
|
prefetchSkipData(docIn, docTermStartFP, skipOffset);
|
||||||
|
prefetchedSkipData = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (docBufferUpto == BLOCK_SIZE) {
|
if (docBufferUpto == BLOCK_SIZE) {
|
||||||
refillDocs();
|
refillDocs();
|
||||||
|
@ -1097,7 +1119,9 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
this.docIn = Lucene99PostingsReader.this.docIn.clone();
|
this.docIn = Lucene99PostingsReader.this.docIn.clone();
|
||||||
|
|
||||||
docFreq = termState.docFreq;
|
docFreq = termState.docFreq;
|
||||||
docIn.seek(termState.docStartFP);
|
seekAndPrefetchPostings(docIn, termState);
|
||||||
|
// Impacts almost certainly need skip data
|
||||||
|
prefetchSkipData(docIn, termState.docStartFP, termState.skipOffset);
|
||||||
|
|
||||||
doc = -1;
|
doc = -1;
|
||||||
accum = 0;
|
accum = 0;
|
||||||
|
@ -1318,7 +1342,8 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
posTermStartFP = termState.posStartFP;
|
posTermStartFP = termState.posStartFP;
|
||||||
payTermStartFP = termState.payStartFP;
|
payTermStartFP = termState.payStartFP;
|
||||||
totalTermFreq = termState.totalTermFreq;
|
totalTermFreq = termState.totalTermFreq;
|
||||||
docIn.seek(docTermStartFP);
|
seekAndPrefetchPostings(docIn, termState);
|
||||||
|
prefetchSkipData(docIn, termState.docStartFP, termState.skipOffset);
|
||||||
posPendingFP = posTermStartFP;
|
posPendingFP = posTermStartFP;
|
||||||
posPendingCount = 0;
|
posPendingCount = 0;
|
||||||
if (termState.totalTermFreq < BLOCK_SIZE) {
|
if (termState.totalTermFreq < BLOCK_SIZE) {
|
||||||
|
@ -1672,7 +1697,8 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
posTermStartFP = termState.posStartFP;
|
posTermStartFP = termState.posStartFP;
|
||||||
payTermStartFP = termState.payStartFP;
|
payTermStartFP = termState.payStartFP;
|
||||||
totalTermFreq = termState.totalTermFreq;
|
totalTermFreq = termState.totalTermFreq;
|
||||||
docIn.seek(docTermStartFP);
|
seekAndPrefetchPostings(docIn, termState);
|
||||||
|
prefetchSkipData(docIn, termState.docStartFP, termState.skipOffset);
|
||||||
posPendingFP = posTermStartFP;
|
posPendingFP = posTermStartFP;
|
||||||
payPendingFP = payTermStartFP;
|
payPendingFP = payTermStartFP;
|
||||||
posPendingCount = 0;
|
posPendingCount = 0;
|
||||||
|
@ -2049,6 +2075,41 @@ public final class Lucene99PostingsReader extends PostingsReaderBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void seekAndPrefetchPostings(IndexInput docIn, IntBlockTermState state)
|
||||||
|
throws IOException {
|
||||||
|
if (docIn.getFilePointer() != state.docStartFP) {
|
||||||
|
// Don't prefetch if the input is already positioned at the right offset, which suggests that
|
||||||
|
// the caller is streaming the entire inverted index (e.g. for merging), let the read-ahead
|
||||||
|
// logic do its work instead. Note that this heuristic doesn't work for terms that have skip
|
||||||
|
// data, since skip data is stored after the last term, but handling all terms that have <128
|
||||||
|
// docs is a good start already.
|
||||||
|
docIn.seek(state.docStartFP);
|
||||||
|
if (state.skipOffset < 0) {
|
||||||
|
// This postings list is very short as it doesn't have skip data, prefetch the page that
|
||||||
|
// holds the first byte of the postings list.
|
||||||
|
docIn.prefetch(state.docStartFP, 1);
|
||||||
|
} else if (state.skipOffset <= MAX_POSTINGS_SIZE_FOR_FULL_PREFETCH) {
|
||||||
|
// This postings list is short as it fits on a few pages, prefetch it all, plus one byte to
|
||||||
|
// make sure to include some skip data.
|
||||||
|
docIn.prefetch(state.docStartFP, state.skipOffset + 1);
|
||||||
|
} else {
|
||||||
|
// Default case: prefetch the page that holds the first byte of postings. We'll prefetch
|
||||||
|
// skip data when we have evidence that it is used.
|
||||||
|
docIn.prefetch(state.docStartFP, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Note: we don't prefetch positions or offsets, which are less likely to be needed.
|
||||||
|
}
|
||||||
|
|
||||||
|
private void prefetchSkipData(IndexInput docIn, long docStartFP, long skipOffset)
|
||||||
|
throws IOException {
|
||||||
|
if (skipOffset > MAX_POSTINGS_SIZE_FOR_FULL_PREFETCH) {
|
||||||
|
// If skipOffset is less than this value, skip data was already prefetched when doing
|
||||||
|
// #seekAndPrefetchPostings
|
||||||
|
docIn.prefetch(docStartFP + skipOffset, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkIntegrity() throws IOException {
|
public void checkIntegrity() throws IOException {
|
||||||
if (docIn != null) {
|
if (docIn != null) {
|
||||||
|
|
Loading…
Reference in New Issue