From b4469f82b9d6fcb2197e38d6610b229d3b2b993a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 12 Jun 2018 10:34:06 +0200 Subject: [PATCH] Ensure LuceneChangesSnapshot reads in leaf order (#31246) Today we re-initialize DV instances while we read docs for the snapshot. This is caused by the fact that we sort the docs by seqID which causes then to be our of order. This change sorts documents temporarily by docID, fetches the metadata (not source) into a in-memory datastructure and sorts it back. This allows efficient reuse of DV instances. --- .../index/engine/LuceneChangesSnapshot.java | 135 ++++++++++++------ 1 file changed, 88 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index c611752bd1b..ae5ff1f31a9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -23,9 +23,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.Term; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; @@ -33,6 +31,7 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.VersionType; @@ -47,6 +46,7 @@ import org.elasticsearch.index.translog.Translog; import java.io.Closeable; import java.io.IOException; +import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -67,9 +67,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { private int docIndex = 0; private final int totalHits; private ScoreDoc[] scoreDocs; - + private final ParallelArray parallelArray; private final Closeable onClose; - private final CombinedDocValues[] docValues; // Cache of DocValues /** * Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. @@ -97,15 +96,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { this.requiredFullRange = requiredFullRange; this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); this.indexSearcher.setQueryCache(null); + this.parallelArray = new ParallelArray(searchBatchSize); final TopDocs topDocs = searchOperations(null); + this.totalHits = Math.toIntExact(topDocs.totalHits); this.scoreDocs = topDocs.scoreDocs; - final List leaves = indexSearcher.getIndexReader().leaves(); - this.docValues = new CombinedDocValues[leaves.size()]; - for (LeafReaderContext leaf : leaves) { - this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader()); - } this.onClose = engineSearcher; + fillParallelArray(scoreDocs, parallelArray); } @Override @@ -126,8 +123,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { @Override public Translog.Operation next() throws IOException { Translog.Operation op = null; - for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) { - op = readDocAsOp(docId); + for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) { + op = readDocAsOp(idx); if (op != null) { break; } @@ -156,19 +153,58 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { } } - private int nextDocId() throws IOException { + private int nextDocIndex() throws IOException { // we have processed all docs in the current search - fetch the next batch if (docIndex == scoreDocs.length && docIndex > 0) { final ScoreDoc prev = scoreDocs[scoreDocs.length - 1]; scoreDocs = searchOperations(prev).scoreDocs; + fillParallelArray(scoreDocs, parallelArray); docIndex = 0; } if (docIndex < scoreDocs.length) { - int docId = scoreDocs[docIndex].doc; + int idx = docIndex; docIndex++; - return docId; + return idx; + } + return -1; + } + + private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray) throws IOException { + if (scoreDocs.length > 0) { + for (int i = 0; i < scoreDocs.length; i++) { + scoreDocs[i].shardIndex = i; + } + // for better loading performance we sort the array by docID and + // then visit all leaves in order. + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc)); + int docBase = -1; + int maxDoc = 0; + List leaves = indexSearcher.getIndexReader().leaves(); + int readerIndex = 0; + CombinedDocValues combinedDocValues = null; + LeafReaderContext leaf = null; + for (int i = 0; i < scoreDocs.length; i++) { + ScoreDoc scoreDoc = scoreDocs[i]; + if (scoreDoc.doc >= docBase + maxDoc) { + do { + leaf = leaves.get(readerIndex++); + docBase = leaf.docBase; + maxDoc = leaf.reader().maxDoc(); + } while (scoreDoc.doc >= docBase + maxDoc); + combinedDocValues = new CombinedDocValues(leaf.reader()); + } + final int segmentDocID = scoreDoc.doc - docBase; + final int index = scoreDoc.shardIndex; + parallelArray.leafReaderContexts[index] = leaf; + parallelArray.seqNo[index] = combinedDocValues.docSeqNo(segmentDocID); + parallelArray.primaryTerm[index] = combinedDocValues.docPrimaryTerm(segmentDocID); + parallelArray.version[index] = combinedDocValues.docVersion(segmentDocID); + parallelArray.isTombStone[index] = combinedDocValues.isTombstone(segmentDocID); + parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID); + } + // now sort back based on the shardIndex. we use this to store the previous index + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex)); } - return DocIdSetIterator.NO_MORE_DOCS; } private TopDocs searchOperations(ScoreDoc after) throws IOException { @@ -180,31 +216,30 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm); } - private Translog.Operation readDocAsOp(int docID) throws IOException { - final List leaves = indexSearcher.getIndexReader().leaves(); - final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves)); - final int segmentDocID = docID - leaf.docBase; - final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID); + private Translog.Operation readDocAsOp(int docIndex) throws IOException { + final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex]; + final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase; + final long primaryTerm = parallelArray.primaryTerm[docIndex]; // We don't have to read the nested child documents - those docs don't have primary terms. if (primaryTerm == -1) { skippedOperations++; return null; } - final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID); + final long seqNo = parallelArray.seqNo[docIndex]; // Only pick the first seen seq# if (seqNo == lastSeenSeqNo) { skippedOperations++; return null; } - final long version = docValues[leaf.ord].docVersion(segmentDocID); - final String sourceField = docValues[leaf.ord].hasRecoverySource(segmentDocID) ? SourceFieldMapper.RECOVERY_SOURCE_NAME : + final long version = parallelArray.version[docIndex]; + final String sourceField = parallelArray.hasRecoverySource[docIndex] ? SourceFieldMapper.RECOVERY_SOURCE_NAME : SourceFieldMapper.NAME; final FieldsVisitor fields = new FieldsVisitor(true, sourceField); - indexSearcher.doc(docID, fields); + leaf.reader().document(segmentDocID, fields); fields.postProcess(mapperService); final Translog.Operation op; - final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID); + final boolean isTombstone = parallelArray.isTombStone[docIndex]; if (isTombstone && fields.uid() == null) { op = new Translog.NoOp(seqNo, primaryTerm, fields.source().utf8ToString()); assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; @@ -237,16 +272,32 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { return ndv.longValue() == 1; } + private static final class ParallelArray { + final LeafReaderContext[] leafReaderContexts; + final long[] version; + final long[] seqNo; + final long[] primaryTerm; + final boolean[] isTombStone; + final boolean[] hasRecoverySource; + + ParallelArray(int size) { + version = new long[size]; + seqNo = new long[size]; + primaryTerm = new long[size]; + isTombStone = new boolean[size]; + hasRecoverySource = new boolean[size]; + leafReaderContexts = new LeafReaderContext[size]; + } + } + private static final class CombinedDocValues { - private final LeafReader leafReader; - private NumericDocValues versionDV; - private NumericDocValues seqNoDV; - private NumericDocValues primaryTermDV; - private NumericDocValues tombstoneDV; - private NumericDocValues recoverySource; + private final NumericDocValues versionDV; + private final NumericDocValues seqNoDV; + private final NumericDocValues primaryTermDV; + private final NumericDocValues tombstoneDV; + private final NumericDocValues recoverySource; CombinedDocValues(LeafReader leafReader) throws IOException { - this.leafReader = leafReader; this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); this.primaryTermDV = Objects.requireNonNull( @@ -256,9 +307,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { } long docVersion(int segmentDocId) throws IOException { - if (versionDV.docID() > segmentDocId) { - versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); - } + assert versionDV.docID() < segmentDocId; if (versionDV.advanceExact(segmentDocId) == false) { throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); } @@ -266,9 +315,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { } long docSeqNo(int segmentDocId) throws IOException { - if (seqNoDV.docID() > segmentDocId) { - seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); - } + assert seqNoDV.docID() < segmentDocId; if (seqNoDV.advanceExact(segmentDocId) == false) { throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); } @@ -279,9 +326,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { if (primaryTermDV == null) { return -1L; } - if (primaryTermDV.docID() > segmentDocId) { - primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - } + assert primaryTermDV.docID() < segmentDocId; // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. if (primaryTermDV.advanceExact(segmentDocId) == false) { return -1; @@ -293,9 +338,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { if (tombstoneDV == null) { return false; } - if (tombstoneDV.docID() > segmentDocId) { - tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - } + assert tombstoneDV.docID() < segmentDocId; return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; } @@ -303,9 +346,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { if (recoverySource == null) { return false; } - if (recoverySource.docID() > segmentDocId) { - recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); - } + assert recoverySource.docID() < segmentDocId; return recoverySource.advanceExact(segmentDocId); } }