Ensure LuceneChangesSnapshot reads in leaf order (#31246)

Today we re-initialize DV instances while we read docs
for the snapshot. This is caused by the fact that we sort the
docs by seqID which causes then to be our of order. This change
sorts documents temporarily by docID, fetches the metadata (not source)
into a in-memory datastructure and sorts it back.
This allows efficient reuse of DV instances.
This commit is contained in:
Simon Willnauer 2018-06-12 10:34:06 +02:00 committed by GitHub
parent 9feff9809b
commit b4469f82b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 88 additions and 47 deletions

View File

@ -23,9 +23,7 @@ import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
@ -33,6 +31,7 @@ import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.VersionType;
@ -47,6 +46,7 @@ import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@ -67,9 +67,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private int docIndex = 0;
private final int totalHits;
private ScoreDoc[] scoreDocs;
private final ParallelArray parallelArray;
private final Closeable onClose;
private final CombinedDocValues[] docValues; // Cache of DocValues
/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
@ -97,15 +96,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.requiredFullRange = requiredFullRange;
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.parallelArray = new ParallelArray(searchBatchSize);
final TopDocs topDocs = searchOperations(null);
this.totalHits = Math.toIntExact(topDocs.totalHits);
this.scoreDocs = topDocs.scoreDocs;
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
this.docValues = new CombinedDocValues[leaves.size()];
for (LeafReaderContext leaf : leaves) {
this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader());
}
this.onClose = engineSearcher;
fillParallelArray(scoreDocs, parallelArray);
}
@Override
@ -126,8 +123,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
@Override
public Translog.Operation next() throws IOException {
Translog.Operation op = null;
for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) {
op = readDocAsOp(docId);
for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) {
op = readDocAsOp(idx);
if (op != null) {
break;
}
@ -156,19 +153,58 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
}
}
private int nextDocId() throws IOException {
private int nextDocIndex() throws IOException {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
docIndex = 0;
}
if (docIndex < scoreDocs.length) {
int docId = scoreDocs[docIndex].doc;
int idx = docIndex;
docIndex++;
return docId;
return idx;
}
return -1;
}
private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray) throws IOException {
if (scoreDocs.length > 0) {
for (int i = 0; i < scoreDocs.length; i++) {
scoreDocs[i].shardIndex = i;
}
// for better loading performance we sort the array by docID and
// then visit all leaves in order.
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc));
int docBase = -1;
int maxDoc = 0;
List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
int readerIndex = 0;
CombinedDocValues combinedDocValues = null;
LeafReaderContext leaf = null;
for (int i = 0; i < scoreDocs.length; i++) {
ScoreDoc scoreDoc = scoreDocs[i];
if (scoreDoc.doc >= docBase + maxDoc) {
do {
leaf = leaves.get(readerIndex++);
docBase = leaf.docBase;
maxDoc = leaf.reader().maxDoc();
} while (scoreDoc.doc >= docBase + maxDoc);
combinedDocValues = new CombinedDocValues(leaf.reader());
}
final int segmentDocID = scoreDoc.doc - docBase;
final int index = scoreDoc.shardIndex;
parallelArray.leafReaderContexts[index] = leaf;
parallelArray.seqNo[index] = combinedDocValues.docSeqNo(segmentDocID);
parallelArray.primaryTerm[index] = combinedDocValues.docPrimaryTerm(segmentDocID);
parallelArray.version[index] = combinedDocValues.docVersion(segmentDocID);
parallelArray.isTombStone[index] = combinedDocValues.isTombstone(segmentDocID);
parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID);
}
// now sort back based on the shardIndex. we use this to store the previous index
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex));
}
return DocIdSetIterator.NO_MORE_DOCS;
}
private TopDocs searchOperations(ScoreDoc after) throws IOException {
@ -180,31 +216,30 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm);
}
private Translog.Operation readDocAsOp(int docID) throws IOException {
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves));
final int segmentDocID = docID - leaf.docBase;
final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID);
private Translog.Operation readDocAsOp(int docIndex) throws IOException {
final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex];
final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase;
final long primaryTerm = parallelArray.primaryTerm[docIndex];
// We don't have to read the nested child documents - those docs don't have primary terms.
if (primaryTerm == -1) {
skippedOperations++;
return null;
}
final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID);
final long seqNo = parallelArray.seqNo[docIndex];
// Only pick the first seen seq#
if (seqNo == lastSeenSeqNo) {
skippedOperations++;
return null;
}
final long version = docValues[leaf.ord].docVersion(segmentDocID);
final String sourceField = docValues[leaf.ord].hasRecoverySource(segmentDocID) ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
final long version = parallelArray.version[docIndex];
final String sourceField = parallelArray.hasRecoverySource[docIndex] ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
SourceFieldMapper.NAME;
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
indexSearcher.doc(docID, fields);
leaf.reader().document(segmentDocID, fields);
fields.postProcess(mapperService);
final Translog.Operation op;
final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID);
final boolean isTombstone = parallelArray.isTombStone[docIndex];
if (isTombstone && fields.uid() == null) {
op = new Translog.NoOp(seqNo, primaryTerm, fields.source().utf8ToString());
assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]";
@ -237,16 +272,32 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
return ndv.longValue() == 1;
}
private static final class ParallelArray {
final LeafReaderContext[] leafReaderContexts;
final long[] version;
final long[] seqNo;
final long[] primaryTerm;
final boolean[] isTombStone;
final boolean[] hasRecoverySource;
ParallelArray(int size) {
version = new long[size];
seqNo = new long[size];
primaryTerm = new long[size];
isTombStone = new boolean[size];
hasRecoverySource = new boolean[size];
leafReaderContexts = new LeafReaderContext[size];
}
}
private static final class CombinedDocValues {
private final LeafReader leafReader;
private NumericDocValues versionDV;
private NumericDocValues seqNoDV;
private NumericDocValues primaryTermDV;
private NumericDocValues tombstoneDV;
private NumericDocValues recoverySource;
private final NumericDocValues versionDV;
private final NumericDocValues seqNoDV;
private final NumericDocValues primaryTermDV;
private final NumericDocValues tombstoneDV;
private final NumericDocValues recoverySource;
CombinedDocValues(LeafReader leafReader) throws IOException {
this.leafReader = leafReader;
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
this.primaryTermDV = Objects.requireNonNull(
@ -256,9 +307,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
}
long docVersion(int segmentDocId) throws IOException {
if (versionDV.docID() > segmentDocId) {
versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
}
assert versionDV.docID() < segmentDocId;
if (versionDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
}
@ -266,9 +315,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
}
long docSeqNo(int segmentDocId) throws IOException {
if (seqNoDV.docID() > segmentDocId) {
seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
}
assert seqNoDV.docID() < segmentDocId;
if (seqNoDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
}
@ -279,9 +326,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
if (primaryTermDV == null) {
return -1L;
}
if (primaryTermDV.docID() > segmentDocId) {
primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
}
assert primaryTermDV.docID() < segmentDocId;
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
if (primaryTermDV.advanceExact(segmentDocId) == false) {
return -1;
@ -293,9 +338,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
if (tombstoneDV == null) {
return false;
}
if (tombstoneDV.docID() > segmentDocId) {
tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
}
assert tombstoneDV.docID() < segmentDocId;
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
}
@ -303,9 +346,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
if (recoverySource == null) {
return false;
}
if (recoverySource.docID() > segmentDocId) {
recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}
assert recoverySource.docID() < segmentDocId;
return recoverySource.advanceExact(segmentDocId);
}
}