Search: Make SCAN faster.
When scrolling, SCAN previously collected documents until it reached where it had stopped on the previous iteration. This makes pagination slower and slower as you request deep pages. With this change, SCAN now directly jumps to the doc ID where is had previously stopped.
This commit is contained in:
parent
f05808d59e
commit
22bba91a16
|
@ -1,100 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.lucene.docset;
|
||||
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A {@link DocIdSet} that matches all docs up to a {@code maxDoc}.
|
||||
*/
|
||||
public class AllDocIdSet extends DocIdSet {
|
||||
|
||||
private final int maxDoc;
|
||||
|
||||
public AllDocIdSet(int maxDoc) {
|
||||
this.maxDoc = maxDoc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Does not go to the reader and ask for data, so can be cached.
|
||||
*/
|
||||
@Override
|
||||
public boolean isCacheable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
return RamUsageEstimator.NUM_BYTES_INT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocIdSetIterator iterator() throws IOException {
|
||||
return new Iterator(maxDoc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bits bits() throws IOException {
|
||||
return new Bits.MatchAllBits(maxDoc);
|
||||
}
|
||||
|
||||
public static final class Iterator extends DocIdSetIterator {
|
||||
|
||||
private final int maxDoc;
|
||||
private int doc = -1;
|
||||
|
||||
public Iterator(int maxDoc) {
|
||||
this.maxDoc = maxDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int docID() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int nextDoc() throws IOException {
|
||||
if (++doc < maxDoc) {
|
||||
return doc;
|
||||
}
|
||||
return doc = NO_MORE_DOCS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int advance(int target) throws IOException {
|
||||
doc = target;
|
||||
if (doc < maxDoc) {
|
||||
return doc;
|
||||
}
|
||||
return doc = NO_MORE_DOCS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long cost() {
|
||||
|
||||
return maxDoc;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -204,9 +204,7 @@ public class DefaultSearchContext extends SearchContext {
|
|||
|
||||
@Override
|
||||
public void doClose() {
|
||||
if (scanContext != null) {
|
||||
scanContext.clear();
|
||||
}
|
||||
scanContext = null;
|
||||
// clear and scope phase we have
|
||||
Releasables.close(searcher, engineSearcher);
|
||||
}
|
||||
|
|
|
@ -19,59 +19,50 @@
|
|||
|
||||
package org.elasticsearch.search.scan;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.BitsFilteredDocIdSet;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.search.FilteredQuery;
|
||||
import org.apache.lucene.search.CollectionTerminatedException;
|
||||
import org.apache.lucene.search.ConstantScoreScorer;
|
||||
import org.apache.lucene.search.ConstantScoreWeight;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.SimpleCollector;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.elasticsearch.common.lucene.docset.AllDocIdSet;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The scan context allows to optimize readers we already processed during scanning. We do that by keeping track
|
||||
* of the count per reader, and if we are done with it, we no longer process it by using a filter that returns
|
||||
* null docIdSet for this reader.
|
||||
* of the last collected doc ID and only collecting doc IDs that are greater.
|
||||
*/
|
||||
public class ScanContext {
|
||||
|
||||
private final ConcurrentMap<IndexReader, ReaderState> readerStates = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
public void clear() {
|
||||
readerStates.clear();
|
||||
}
|
||||
private volatile int docUpTo;
|
||||
|
||||
public TopDocs execute(SearchContext context) throws IOException {
|
||||
ScanCollector collector = new ScanCollector(readerStates, context.from(), context.size(), context.trackScores());
|
||||
Query query = new FilteredQuery(context.query(), new ScanFilter(readerStates, collector));
|
||||
try {
|
||||
context.searcher().search(query, collector);
|
||||
} catch (ScanCollector.StopCollectingException e) {
|
||||
// all is well
|
||||
}
|
||||
return execute(context.searcher(), context.query(), context.size(), context.trackScores());
|
||||
}
|
||||
|
||||
TopDocs execute(IndexSearcher searcher, Query query, int size, boolean trackScores) throws IOException {
|
||||
ScanCollector collector = new ScanCollector(size, trackScores);
|
||||
Query q = Queries.filtered(query, new MinDocQuery(docUpTo));
|
||||
searcher.search(q, collector);
|
||||
return collector.topDocs();
|
||||
}
|
||||
|
||||
static class ScanCollector extends SimpleCollector {
|
||||
private class ScanCollector extends SimpleCollector {
|
||||
|
||||
private final ConcurrentMap<IndexReader, ReaderState> readerStates;
|
||||
private final List<ScoreDoc> docs;
|
||||
|
||||
private final int from;
|
||||
|
||||
private final int to;
|
||||
|
||||
private final ArrayList<ScoreDoc> docs;
|
||||
private final int size;
|
||||
|
||||
private final boolean trackScores;
|
||||
|
||||
|
@ -79,21 +70,10 @@ public class ScanContext {
|
|||
|
||||
private int docBase;
|
||||
|
||||
private int counter;
|
||||
|
||||
private IndexReader currentReader;
|
||||
private ReaderState readerState;
|
||||
|
||||
ScanCollector(ConcurrentMap<IndexReader, ReaderState> readerStates, int from, int size, boolean trackScores) {
|
||||
this.readerStates = readerStates;
|
||||
this.from = from;
|
||||
this.to = from + size;
|
||||
ScanCollector(int size, boolean trackScores) {
|
||||
this.trackScores = trackScores;
|
||||
this.docs = new ArrayList<>(size);
|
||||
}
|
||||
|
||||
void incCounter(int count) {
|
||||
this.counter += count;
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
public TopDocs topDocs() {
|
||||
|
@ -112,70 +92,114 @@ public class ScanContext {
|
|||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
if (counter >= from) {
|
||||
docs.add(new ScoreDoc(docBase + doc, trackScores ? scorer.score() : 0f));
|
||||
}
|
||||
readerState.count++;
|
||||
counter++;
|
||||
if (counter >= to) {
|
||||
throw StopCollectingException;
|
||||
int topLevelDoc = docBase + doc;
|
||||
docs.add(new ScoreDoc(topLevelDoc, trackScores ? scorer.score() : 0f));
|
||||
// record that we collected up to this document
|
||||
assert topLevelDoc >= docUpTo;
|
||||
docUpTo = topLevelDoc + 1;
|
||||
if (docs.size() >= size) {
|
||||
throw new CollectionTerminatedException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doSetNextReader(LeafReaderContext context) throws IOException {
|
||||
// if we have a reader state, and we haven't registered one already, register it
|
||||
// we need to check in readersState since even when the filter return null, setNextReader is still
|
||||
// called for that reader (before)
|
||||
if (currentReader != null && !readerStates.containsKey(currentReader)) {
|
||||
assert readerState != null;
|
||||
readerState.done = true;
|
||||
readerStates.put(currentReader, readerState);
|
||||
}
|
||||
this.currentReader = context.reader();
|
||||
this.docBase = context.docBase;
|
||||
this.readerState = new ReaderState();
|
||||
}
|
||||
|
||||
public static final RuntimeException StopCollectingException = new StopCollectingException();
|
||||
|
||||
static class StopCollectingException extends RuntimeException {
|
||||
@Override
|
||||
public Throwable fillInStackTrace() {
|
||||
return null;
|
||||
if (docs.size() >= size || context.docBase + context.reader().maxDoc() <= docUpTo) {
|
||||
// no need to collect a new segment, we either already collected enough
|
||||
// or the segment is not competitive
|
||||
throw new CollectionTerminatedException();
|
||||
}
|
||||
docBase = context.docBase;
|
||||
}
|
||||
}
|
||||
|
||||
public static class ScanFilter extends Filter {
|
||||
/**
|
||||
* A filtering query that matches all doc IDs that are not deleted and
|
||||
* greater than or equal to the configured doc ID.
|
||||
*/
|
||||
// pkg-private for testing
|
||||
static class MinDocQuery extends Query {
|
||||
|
||||
private final ConcurrentMap<IndexReader, ReaderState> readerStates;
|
||||
private final int minDoc;
|
||||
|
||||
private final ScanCollector scanCollector;
|
||||
|
||||
public ScanFilter(ConcurrentMap<IndexReader, ReaderState> readerStates, ScanCollector scanCollector) {
|
||||
this.readerStates = readerStates;
|
||||
this.scanCollector = scanCollector;
|
||||
MinDocQuery(int minDoc) {
|
||||
this.minDoc = minDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocIdSet getDocIdSet(LeafReaderContext context, Bits acceptedDocs) throws IOException {
|
||||
ReaderState readerState = readerStates.get(context.reader());
|
||||
if (readerState != null && readerState.done) {
|
||||
scanCollector.incCounter(readerState.count);
|
||||
return null;
|
||||
public int hashCode() {
|
||||
return 31 * super.hashCode() + minDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (super.equals(obj) == false) {
|
||||
return false;
|
||||
}
|
||||
return BitsFilteredDocIdSet.wrap(new AllDocIdSet(context.reader().maxDoc()), acceptedDocs);
|
||||
MinDocQuery that = (MinDocQuery) obj;
|
||||
return minDoc == that.minDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
|
||||
return new ConstantScoreWeight(this) {
|
||||
@Override
|
||||
public Scorer scorer(LeafReaderContext context, final Bits acceptDocs) throws IOException {
|
||||
final int maxDoc = context.reader().maxDoc();
|
||||
if (context.docBase + maxDoc <= minDoc) {
|
||||
return null;
|
||||
}
|
||||
final int segmentMinDoc = Math.max(0, minDoc - context.docBase);
|
||||
final DocIdSetIterator disi = new DocIdSetIterator() {
|
||||
|
||||
int doc = -1;
|
||||
|
||||
@Override
|
||||
public int docID() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int nextDoc() throws IOException {
|
||||
return advance(doc + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int advance(int target) throws IOException {
|
||||
assert target > doc;
|
||||
if (doc == -1) {
|
||||
// skip directly to minDoc
|
||||
doc = Math.max(target, segmentMinDoc);
|
||||
} else {
|
||||
doc = target;
|
||||
}
|
||||
while (doc < maxDoc) {
|
||||
if (acceptDocs == null || acceptDocs.get(doc)) {
|
||||
break;
|
||||
}
|
||||
doc += 1;
|
||||
}
|
||||
if (doc >= maxDoc) {
|
||||
doc = NO_MORE_DOCS;
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long cost() {
|
||||
return maxDoc - minDoc;
|
||||
}
|
||||
|
||||
};
|
||||
return new ConstantScoreScorer(this, score(), disi);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(String field) {
|
||||
return "ScanFilter";
|
||||
return "MinDocQuery(minDoc=" + minDoc + ")";
|
||||
}
|
||||
}
|
||||
|
||||
static class ReaderState {
|
||||
public int count;
|
||||
public boolean done;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.scan;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.QueryUtils;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.elasticsearch.search.scan.ScanContext.MinDocQuery;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class ScanContextTests extends ElasticsearchTestCase {
|
||||
|
||||
public void testMinDocQueryBasics() {
|
||||
MinDocQuery query1 = new MinDocQuery(42);
|
||||
MinDocQuery query2 = new MinDocQuery(42);
|
||||
MinDocQuery query3 = new MinDocQuery(43);
|
||||
QueryUtils.check(query1);
|
||||
QueryUtils.checkEqual(query1, query2);
|
||||
QueryUtils.checkUnequal(query1, query3);
|
||||
}
|
||||
|
||||
public void testMinDocQueryRandom() throws IOException {
|
||||
final int numDocs = randomIntBetween(10, 200);
|
||||
final Document doc = new Document();
|
||||
final Directory dir = newDirectory();
|
||||
final RandomIndexWriter w = new RandomIndexWriter(getRandom(), dir);
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
w.addDocument(doc);
|
||||
}
|
||||
final IndexReader reader = w.getReader();
|
||||
final IndexSearcher searcher = newSearcher(reader);
|
||||
for (int i = 0; i <= numDocs; ++i) {
|
||||
assertEquals(numDocs - i, searcher.count(new MinDocQuery(i)));
|
||||
}
|
||||
w.close();
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testRandom() throws Exception {
|
||||
final int numDocs = randomIntBetween(10, 200);
|
||||
final Document doc1 = new Document();
|
||||
doc1.add(new StringField("foo", "bar", Store.NO));
|
||||
final Document doc2 = new Document();
|
||||
final Directory dir = newDirectory();
|
||||
final RandomIndexWriter w = new RandomIndexWriter(getRandom(), dir);
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
w.addDocument(randomBoolean() ? doc1 : doc2);
|
||||
}
|
||||
final IndexReader reader = w.getReader();
|
||||
final IndexSearcher searcher = newSearcher(reader);
|
||||
|
||||
final boolean trackScores = randomBoolean();
|
||||
final int pageSize = randomIntBetween(1, numDocs / 2);
|
||||
Query query = new TermQuery(new Term("foo", "bar"));
|
||||
if (trackScores == false) {
|
||||
query.setBoost(0f);
|
||||
}
|
||||
final ScoreDoc[] expected = searcher.search(query, numDocs, Sort.INDEXORDER, true, true).scoreDocs;
|
||||
|
||||
final List<ScoreDoc> actual = new ArrayList<>();
|
||||
ScanContext context = new ScanContext();
|
||||
while (true) {
|
||||
final ScoreDoc[] page = context.execute(searcher, query, pageSize, trackScores).scoreDocs;
|
||||
assertTrue(page.length <= pageSize);
|
||||
if (page.length == 0) {
|
||||
assertEquals(0, context.execute(searcher, query, pageSize, trackScores).scoreDocs.length);
|
||||
break;
|
||||
}
|
||||
actual.addAll(Arrays.asList(page));
|
||||
}
|
||||
assertEquals(expected.length, actual.size());
|
||||
for (int i = 0; i < expected.length; ++i) {
|
||||
ScoreDoc sd1 = expected[i];
|
||||
ScoreDoc sd2 = actual.get(i);
|
||||
assertEquals(sd1.doc, sd2.doc);
|
||||
assertEquals(sd1.score, sd2.score, 0.001f);
|
||||
}
|
||||
w.close();
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue