Reduce refresh when lookup term in FollowingEngine (#39184)

Today we always refresh when looking up the primary term in
FollowingEngine. This is not necessary for we can simply
return none for operations before the global checkpoint.
This commit is contained in:
Nhat Nguyen 2019-02-20 12:35:02 -05:00
parent cdec11c4eb
commit a96df5d209
1 changed files with 24 additions and 23 deletions

View File

@ -151,32 +151,33 @@ public final class FollowingEngine extends InternalEngine {
}
private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException {
// Don't need to look up term for operations before the global checkpoint for they were processed on every copies already.
if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {
return OptionalLong.empty();
}
refreshIfNeeded("lookup_primary_term", seqNo);
try (Searcher engineSearcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) {
// We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else
// branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the
// operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already.
if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {
return OptionalLong.empty();
} else {
final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = new BooleanQuery.Builder()
.add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER)
// excludes the non-root nested documents which don't have primary_term.
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
.build();
final TopDocs topDocs = searcher.search(query, 1);
if (topDocs.scoreDocs.length == 1) {
final int docId = topDocs.scoreDocs[0].doc;
final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves()));
final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) {
assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]";
return OptionalLong.of(primaryTermDV.longValue());
}
final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = new BooleanQuery.Builder()
.add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER)
// excludes the non-root nested documents which don't have primary_term.
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
.build();
final TopDocs topDocs = searcher.search(query, 1);
if (topDocs.scoreDocs.length == 1) {
final int docId = topDocs.scoreDocs[0].doc;
final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves()));
final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) {
assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]";
return OptionalLong.of(primaryTermDV.longValue());
}
}
if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {
return OptionalLong.empty(); // we have merged away the looking up operation.
} else {
assert false : "seq_no[" + seqNo + "] does not have primary_term, total_hits=[" + topDocs.totalHits + "]";
throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term (total_hits=" + topDocs.totalHits + ")");
}