diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index c611752bd1b..ae5ff1f31a9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -23,9 +23,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.Term; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; @@ -33,6 +31,7 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.VersionType; @@ -47,6 +46,7 @@ import org.elasticsearch.index.translog.Translog; import java.io.Closeable; import java.io.IOException; +import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -67,9 +67,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { private int docIndex = 0; private final int totalHits; private ScoreDoc[] scoreDocs; - + private final ParallelArray parallelArray; private final Closeable onClose; - private final CombinedDocValues[] docValues; // Cache of DocValues /** * Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range. @@ -97,15 +96,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { this.requiredFullRange = requiredFullRange; this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); this.indexSearcher.setQueryCache(null); + this.parallelArray = new ParallelArray(searchBatchSize); final TopDocs topDocs = searchOperations(null); + this.totalHits = Math.toIntExact(topDocs.totalHits); this.scoreDocs = topDocs.scoreDocs; - final List leaves = indexSearcher.getIndexReader().leaves(); - this.docValues = new CombinedDocValues[leaves.size()]; - for (LeafReaderContext leaf : leaves) { - this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader()); - } this.onClose = engineSearcher; + fillParallelArray(scoreDocs, parallelArray); } @Override @@ -126,8 +123,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { @Override public Translog.Operation next() throws IOException { Translog.Operation op = null; - for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) { - op = readDocAsOp(docId); + for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) { + op = readDocAsOp(idx); if (op != null) { break; } @@ -156,19 +153,58 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { } } - private int nextDocId() throws IOException { + private int nextDocIndex() throws IOException { // we have processed all docs in the current search - fetch the next batch if (docIndex == scoreDocs.length && docIndex > 0) { final ScoreDoc prev = scoreDocs[scoreDocs.length - 1]; scoreDocs = searchOperations(prev).scoreDocs; + fillParallelArray(scoreDocs, parallelArray); docIndex = 0; } if (docIndex < scoreDocs.length) { - int docId = scoreDocs[docIndex].doc; + int idx = docIndex; docIndex++; - return docId; + return idx; + } + return -1; + } + + private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray) throws IOException { + if (scoreDocs.length > 0) { + for (int i = 0; i < scoreDocs.length; i++) { + scoreDocs[i].shardIndex = i; + } + // for better loading performance we sort the array by docID and + // then visit all leaves in order. + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc)); + int docBase = -1; + int maxDoc = 0; + List leaves = indexSearcher.getIndexReader().leaves(); + int readerIndex = 0; + CombinedDocValues combinedDocValues = null; + LeafReaderContext leaf = null; + for (int i = 0; i < scoreDocs.length; i++) { + ScoreDoc scoreDoc = scoreDocs[i]; + if (scoreDoc.doc >= docBase + maxDoc) { + do { + leaf = leaves.get(readerIndex++); + docBase = leaf.docBase; + maxDoc = leaf.reader().maxDoc(); + } while (scoreDoc.doc >= docBase + maxDoc); + combinedDocValues = new CombinedDocValues(leaf.reader()); + } + final int segmentDocID = scoreDoc.doc - docBase; + final int index = scoreDoc.shardIndex; + parallelArray.leafReaderContexts[index] = leaf; + parallelArray.seqNo[index] = combinedDocValues.docSeqNo(segmentDocID); + parallelArray.primaryTerm[index] = combinedDocValues.docPrimaryTerm(segmentDocID); + parallelArray.version[index] = combinedDocValues.docVersion(segmentDocID); + parallelArray.isTombStone[index] = combinedDocValues.isTombstone(segmentDocID); + parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID); + } + // now sort back based on the shardIndex. we use this to store the previous index + ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex)); } - return DocIdSetIterator.NO_MORE_DOCS; } private TopDocs searchOperations(ScoreDoc after) throws IOException { @@ -180,31 +216,30 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm); } - private Translog.Operation readDocAsOp(int docID) throws IOException { - final List leaves = indexSearcher.getIndexReader().leaves(); - final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves)); - final int segmentDocID = docID - leaf.docBase; - final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID); + private Translog.Operation readDocAsOp(int docIndex) throws IOException { + final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex]; + final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase; + final long primaryTerm = parallelArray.primaryTerm[docIndex]; // We don't have to read the nested child documents - those docs don't have primary terms. if (primaryTerm == -1) { skippedOperations++; return null; } - final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID); + final long seqNo = parallelArray.seqNo[docIndex]; // Only pick the first seen seq# if (seqNo == lastSeenSeqNo) { skippedOperations++; return null; } - final long version = docValues[leaf.ord].docVersion(segmentDocID); - final String sourceField = docValues[leaf.ord].hasRecoverySource(segmentDocID) ? SourceFieldMapper.RECOVERY_SOURCE_NAME : + final long version = parallelArray.version[docIndex]; + final String sourceField = parallelArray.hasRecoverySource[docIndex] ? SourceFieldMapper.RECOVERY_SOURCE_NAME : SourceFieldMapper.NAME; final FieldsVisitor fields = new FieldsVisitor(true, sourceField); - indexSearcher.doc(docID, fields); + leaf.reader().document(segmentDocID, fields); fields.postProcess(mapperService); final Translog.Operation op; - final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID); + final boolean isTombstone = parallelArray.isTombStone[docIndex]; if (isTombstone && fields.uid() == null) { op = new Translog.NoOp(seqNo, primaryTerm, fields.source().utf8ToString()); assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]"; @@ -237,16 +272,32 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { return ndv.longValue() == 1; } + private static final class ParallelArray { + final LeafReaderContext[] leafReaderContexts; + final long[] version; + final long[] seqNo; + final long[] primaryTerm; + final boolean[] isTombStone; + final boolean[] hasRecoverySource; + + ParallelArray(int size) { + version = new long[size]; + seqNo = new long[size]; + primaryTerm = new long[size]; + isTombStone = new boolean[size]; + hasRecoverySource = new boolean[size]; + leafReaderContexts = new LeafReaderContext[size]; + } + } + private static final class CombinedDocValues { - private final LeafReader leafReader; - private NumericDocValues versionDV; - private NumericDocValues seqNoDV; - private NumericDocValues primaryTermDV; - private NumericDocValues tombstoneDV; - private NumericDocValues recoverySource; + private final NumericDocValues versionDV; + private final NumericDocValues seqNoDV; + private final NumericDocValues primaryTermDV; + private final NumericDocValues tombstoneDV; + private final NumericDocValues recoverySource; CombinedDocValues(LeafReader leafReader) throws IOException { - this.leafReader = leafReader; this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); this.primaryTermDV = Objects.requireNonNull( @@ -256,9 +307,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { } long docVersion(int segmentDocId) throws IOException { - if (versionDV.docID() > segmentDocId) { - versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); - } + assert versionDV.docID() < segmentDocId; if (versionDV.advanceExact(segmentDocId) == false) { throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); } @@ -266,9 +315,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { } long docSeqNo(int segmentDocId) throws IOException { - if (seqNoDV.docID() > segmentDocId) { - seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); - } + assert seqNoDV.docID() < segmentDocId; if (seqNoDV.advanceExact(segmentDocId) == false) { throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); } @@ -279,9 +326,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { if (primaryTermDV == null) { return -1L; } - if (primaryTermDV.docID() > segmentDocId) { - primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - } + assert primaryTermDV.docID() < segmentDocId; // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. if (primaryTermDV.advanceExact(segmentDocId) == false) { return -1; @@ -293,9 +338,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { if (tombstoneDV == null) { return false; } - if (tombstoneDV.docID() > segmentDocId) { - tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - } + assert tombstoneDV.docID() < segmentDocId; return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; } @@ -303,9 +346,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { if (recoverySource == null) { return false; } - if (recoverySource.docID() > segmentDocId) { - recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); - } + assert recoverySource.docID() < segmentDocId; return recoverySource.advanceExact(segmentDocId); } }