From 8793ebcda093f66d69e46fc6c19023686b0539da Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 30 May 2018 18:36:06 -0400 Subject: [PATCH] Limit num hits when reading Lucene changes (#30908) Today we don't limit the number of hits when reading changes from Lucene index. If the index and the requesting seq# range both are large, the searcher may consume a huge amount of memory. This commit uses a fixed size batch with search_after to avoid the problem. --- .../index/engine/InternalEngine.java | 4 +- .../index/engine/LuceneChangesSnapshot.java | 49 +++++++++++-------- .../engine/LuceneChangesSnapshotTests.java | 15 ++++-- 3 files changed, 42 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f43b2c19116..d58540de729 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2349,6 +2349,7 @@ public class InternalEngine extends Engine { return numDocUpdates.count(); } + @Override public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService, long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException { // TODO: Should we defer the refresh until we really need it? @@ -2358,7 +2359,8 @@ public class InternalEngine extends Engine { } Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); try { - LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, minSeqNo, maxSeqNo, requiredFullRange); + LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot( + searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, minSeqNo, maxSeqNo, requiredFullRange); searcher = null; return snapshot; } finally { diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 1d55a834e47..17878e1143d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -25,12 +25,10 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.Term; -import org.apache.lucene.search.BooleanClause; -import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; @@ -55,6 +53,9 @@ import java.util.Objects; * A {@link Translog.Snapshot} from changes in a Lucene index */ final class LuceneChangesSnapshot implements Translog.Snapshot { + static final int DEFAULT_BATCH_SIZE = 1024; + + private final int searchBatchSize; private final long fromSeqNo, toSeqNo; private long lastSeenSeqNo; private int skippedOperations; @@ -63,7 +64,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { private final IndexSearcher indexSearcher; private final MapperService mapperService; private int docIndex = 0; - private final TopDocs topDocs; + private final int totalHits; + private ScoreDoc[] scoreDocs; private final Closeable onClose; private final CombinedDocValues[] docValues; // Cache of DocValues @@ -73,23 +75,30 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { * * @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully * @param mapperService the mapper service which will be mainly used to resolve the document's type and uid + * @param searchBatchSize the number of documents should be returned by each search * @param fromSeqNo the min requesting seq# - inclusive * @param toSeqNo the maximum requesting seq# - inclusive * @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo */ - LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, + LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); } + if (searchBatchSize < 0) { + throw new IllegalArgumentException("Search_batch_size must not be negative [" + searchBatchSize + "]"); + } this.mapperService = mapperService; + this.searchBatchSize = searchBatchSize; this.fromSeqNo = fromSeqNo; this.toSeqNo = toSeqNo; this.lastSeenSeqNo = fromSeqNo - 1; this.requiredFullRange = requiredFullRange; this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); this.indexSearcher.setQueryCache(null); - this.topDocs = searchOperations(indexSearcher); + final TopDocs topDocs = searchOperations(null); + this.totalHits = Math.toIntExact(topDocs.totalHits); + this.scoreDocs = topDocs.scoreDocs; final List leaves = indexSearcher.getIndexReader().leaves(); this.docValues = new CombinedDocValues[leaves.size()]; for (LeafReaderContext leaf : leaves) { @@ -105,7 +114,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { @Override public int totalOperations() { - return Math.toIntExact(topDocs.totalHits); + return totalHits; } @Override @@ -146,28 +155,28 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { } } - private int nextDocId() { - if (docIndex < topDocs.scoreDocs.length) { - final int docId = topDocs.scoreDocs[docIndex].doc; + private int nextDocId() throws IOException { + // we have processed all docs in the current search - fetch the next batch + if (docIndex == scoreDocs.length && docIndex > 0) { + final ScoreDoc prev = scoreDocs[scoreDocs.length - 1]; + scoreDocs = searchOperations(prev).scoreDocs; + docIndex = 0; + } + if (docIndex < scoreDocs.length) { + int docId = scoreDocs[docIndex].doc; docIndex++; return docId; - } else { - return DocIdSetIterator.NO_MORE_DOCS; } + return DocIdSetIterator.NO_MORE_DOCS; } - private TopDocs searchOperations(IndexSearcher searcher) throws IOException { - final Query rangeQuery = new BooleanQuery.Builder() - .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER) - .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.FILTER) - .build(); + private TopDocs searchOperations(ScoreDoc after) throws IOException { + final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); final Sort sortedBySeqNoThenByTerm = new Sort( new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG), new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true) ); - // norelease - limits the number of hits - final long numHits = Math.min((toSeqNo + 1 - fromSeqNo) * 2, Integer.MAX_VALUE - 1); - return searcher.search(rangeQuery, Math.toIntExact(numHits), sortedBySeqNoThenByTerm); + return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm); } private Translog.Operation readDocAsOp(int docID) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 17a93036780..c4fb01deef3 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -85,7 +85,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase { toSeqNo = randomLongBetween(fromSeqNo, numOps * 2); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( + searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) { searcher = null; assertThat(snapshot, SnapshotMatchers.size(0)); } finally { @@ -93,7 +94,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase { } searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( + searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { searcher = null; IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat(error.getMessage(), @@ -105,14 +107,16 @@ public class LuceneChangesSnapshotTests extends EngineTestCase { fromSeqNo = randomLongBetween(0, refreshedSeqNo); toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, false)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( + searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) { searcher = null; assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo)); }finally { IOUtils.close(searcher); } searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( + searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { searcher = null; IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat(error.getMessage(), @@ -122,7 +126,8 @@ public class LuceneChangesSnapshotTests extends EngineTestCase { } toSeqNo = randomLongBetween(fromSeqNo, refreshedSeqNo); searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot( + searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { searcher = null; assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); }finally {