Limit num hits when reading Lucene changes (#30908)

Today we don't limit the number of hits when reading changes from Lucene
index. If the index and the requesting seq# range both are large, the
searcher may consume a huge amount of memory.

This commit uses a fixed size batch with search_after to avoid the
problem.
This commit is contained in:
Nhat Nguyen 2018-05-30 18:36:06 -04:00 committed by GitHub
parent f25ee254cc
commit 8793ebcda0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 26 deletions

View File

@ -2349,6 +2349,7 @@ public class InternalEngine extends Engine {
return numDocUpdates.count();
}
@Override
public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException {
// TODO: Should we defer the refresh until we really need it?
@ -2358,7 +2359,8 @@ public class InternalEngine extends Engine {
}
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, minSeqNo, maxSeqNo, requiredFullRange);
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, minSeqNo, maxSeqNo, requiredFullRange);
searcher = null;
return snapshot;
} finally {

View File

@ -25,12 +25,10 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
@ -55,6 +53,9 @@ import java.util.Objects;
* A {@link Translog.Snapshot} from changes in a Lucene index
*/
final class LuceneChangesSnapshot implements Translog.Snapshot {
static final int DEFAULT_BATCH_SIZE = 1024;
private final int searchBatchSize;
private final long fromSeqNo, toSeqNo;
private long lastSeenSeqNo;
private int skippedOperations;
@ -63,7 +64,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private final IndexSearcher indexSearcher;
private final MapperService mapperService;
private int docIndex = 0;
private final TopDocs topDocs;
private final int totalHits;
private ScoreDoc[] scoreDocs;
private final Closeable onClose;
private final CombinedDocValues[] docValues; // Cache of DocValues
@ -73,23 +75,30 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
*
* @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid
* @param searchBatchSize the number of documents should be returned by each search
* @param fromSeqNo the min requesting seq# - inclusive
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService,
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
if (searchBatchSize < 0) {
throw new IllegalArgumentException("Search_batch_size must not be negative [" + searchBatchSize + "]");
}
this.mapperService = mapperService;
this.searchBatchSize = searchBatchSize;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
this.lastSeenSeqNo = fromSeqNo - 1;
this.requiredFullRange = requiredFullRange;
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.topDocs = searchOperations(indexSearcher);
final TopDocs topDocs = searchOperations(null);
this.totalHits = Math.toIntExact(topDocs.totalHits);
this.scoreDocs = topDocs.scoreDocs;
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
this.docValues = new CombinedDocValues[leaves.size()];
for (LeafReaderContext leaf : leaves) {
@ -105,7 +114,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
@Override
public int totalOperations() {
return Math.toIntExact(topDocs.totalHits);
return totalHits;
}
@Override
@ -146,28 +155,28 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
}
}
private int nextDocId() {
if (docIndex < topDocs.scoreDocs.length) {
final int docId = topDocs.scoreDocs[docIndex].doc;
private int nextDocId() throws IOException {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
docIndex = 0;
}
if (docIndex < scoreDocs.length) {
int docId = scoreDocs[docIndex].doc;
docIndex++;
return docId;
} else {
}
return DocIdSetIterator.NO_MORE_DOCS;
}
}
private TopDocs searchOperations(IndexSearcher searcher) throws IOException {
final Query rangeQuery = new BooleanQuery.Builder()
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.FILTER)
.build();
private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Sort sortedBySeqNoThenByTerm = new Sort(
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
);
// norelease - limits the number of hits
final long numHits = Math.min((toSeqNo + 1 - fromSeqNo) * 2, Integer.MAX_VALUE - 1);
return searcher.search(rangeQuery, Math.toIntExact(numHits), sortedBySeqNoThenByTerm);
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm);
}
private Translog.Operation readDocAsOp(int docID) throws IOException {

View File

@ -85,7 +85,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
toSeqNo = randomLongBetween(fromSeqNo, numOps * 2);
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.size(0));
} finally {
@ -93,7 +94,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
}
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
searcher = null;
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
@ -105,14 +107,16 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
fromSeqNo = randomLongBetween(0, refreshedSeqNo);
toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2);
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo));
}finally {
IOUtils.close(searcher);
}
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
searcher = null;
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
@ -122,7 +126,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase {
}
toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo);
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) {
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
}finally {