Merge pull request #11180 from jpountz/enhancement/faster_scan
Search: Make SCAN faster.
This commit is contained in:
commit
1a967ce267
|
@ -1,100 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.lucene.docset;
|
||||
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A {@link DocIdSet} that matches all docs up to a {@code maxDoc}.
|
||||
*/
|
||||
public class AllDocIdSet extends DocIdSet {
|
||||
|
||||
private final int maxDoc;
|
||||
|
||||
public AllDocIdSet(int maxDoc) {
|
||||
this.maxDoc = maxDoc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Does not go to the reader and ask for data, so can be cached.
|
||||
*/
|
||||
@Override
|
||||
public boolean isCacheable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
return RamUsageEstimator.NUM_BYTES_INT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocIdSetIterator iterator() throws IOException {
|
||||
return new Iterator(maxDoc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bits bits() throws IOException {
|
||||
return new Bits.MatchAllBits(maxDoc);
|
||||
}
|
||||
|
||||
public static final class Iterator extends DocIdSetIterator {
|
||||
|
||||
private final int maxDoc;
|
||||
private int doc = -1;
|
||||
|
||||
public Iterator(int maxDoc) {
|
||||
this.maxDoc = maxDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int docID() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int nextDoc() throws IOException {
|
||||
if (++doc < maxDoc) {
|
||||
return doc;
|
||||
}
|
||||
return doc = NO_MORE_DOCS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int advance(int target) throws IOException {
|
||||
doc = target;
|
||||
if (doc < maxDoc) {
|
||||
return doc;
|
||||
}
|
||||
return doc = NO_MORE_DOCS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long cost() {
|
||||
|
||||
return maxDoc;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -204,9 +204,7 @@ public class DefaultSearchContext extends SearchContext {
|
|||
|
||||
@Override
|
||||
public void doClose() {
|
||||
if (scanContext != null) {
|
||||
scanContext.clear();
|
||||
}
|
||||
scanContext = null;
|
||||
// clear and scope phase we have
|
||||
Releasables.close(searcher, engineSearcher);
|
||||
}
|
||||
|
|
|
@ -19,59 +19,50 @@
|
|||
|
||||
package org.elasticsearch.search.scan;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.BitsFilteredDocIdSet;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.search.FilteredQuery;
|
||||
import org.apache.lucene.search.CollectionTerminatedException;
|
||||
import org.apache.lucene.search.ConstantScoreScorer;
|
||||
import org.apache.lucene.search.ConstantScoreWeight;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.SimpleCollector;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.elasticsearch.common.lucene.docset.AllDocIdSet;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The scan context allows to optimize readers we already processed during scanning. We do that by keeping track
|
||||
* of the count per reader, and if we are done with it, we no longer process it by using a filter that returns
|
||||
* null docIdSet for this reader.
|
||||
* of the last collected doc ID and only collecting doc IDs that are greater.
|
||||
*/
|
||||
public class ScanContext {
|
||||
|
||||
private final ConcurrentMap<IndexReader, ReaderState> readerStates = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
public void clear() {
|
||||
readerStates.clear();
|
||||
}
|
||||
private volatile int docUpTo;
|
||||
|
||||
public TopDocs execute(SearchContext context) throws IOException {
|
||||
ScanCollector collector = new ScanCollector(readerStates, context.from(), context.size(), context.trackScores());
|
||||
Query query = new FilteredQuery(context.query(), new ScanFilter(readerStates, collector));
|
||||
try {
|
||||
context.searcher().search(query, collector);
|
||||
} catch (ScanCollector.StopCollectingException e) {
|
||||
// all is well
|
||||
}
|
||||
return execute(context.searcher(), context.query(), context.size(), context.trackScores());
|
||||
}
|
||||
|
||||
TopDocs execute(IndexSearcher searcher, Query query, int size, boolean trackScores) throws IOException {
|
||||
ScanCollector collector = new ScanCollector(size, trackScores);
|
||||
Query q = Queries.filtered(query, new MinDocQuery(docUpTo));
|
||||
searcher.search(q, collector);
|
||||
return collector.topDocs();
|
||||
}
|
||||
|
||||
static class ScanCollector extends SimpleCollector {
|
||||
private class ScanCollector extends SimpleCollector {
|
||||
|
||||
private final ConcurrentMap<IndexReader, ReaderState> readerStates;
|
||||
private final List<ScoreDoc> docs;
|
||||
|
||||
private final int from;
|
||||
|
||||
private final int to;
|
||||
|
||||
private final ArrayList<ScoreDoc> docs;
|
||||
private final int size;
|
||||
|
||||
private final boolean trackScores;
|
||||
|
||||
|
@ -79,21 +70,10 @@ public class ScanContext {
|
|||
|
||||
private int docBase;
|
||||
|
||||
private int counter;
|
||||
|
||||
private IndexReader currentReader;
|
||||
private ReaderState readerState;
|
||||
|
||||
ScanCollector(ConcurrentMap<IndexReader, ReaderState> readerStates, int from, int size, boolean trackScores) {
|
||||
this.readerStates = readerStates;
|
||||
this.from = from;
|
||||
this.to = from + size;
|
||||
ScanCollector(int size, boolean trackScores) {
|
||||
this.trackScores = trackScores;
|
||||
this.docs = new ArrayList<>(size);
|
||||
}
|
||||
|
||||
void incCounter(int count) {
|
||||
this.counter += count;
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
public TopDocs topDocs() {
|
||||
|
@ -112,70 +92,114 @@ public class ScanContext {
|
|||
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
if (counter >= from) {
|
||||
docs.add(new ScoreDoc(docBase + doc, trackScores ? scorer.score() : 0f));
|
||||
}
|
||||
readerState.count++;
|
||||
counter++;
|
||||
if (counter >= to) {
|
||||
throw StopCollectingException;
|
||||
int topLevelDoc = docBase + doc;
|
||||
docs.add(new ScoreDoc(topLevelDoc, trackScores ? scorer.score() : 0f));
|
||||
// record that we collected up to this document
|
||||
assert topLevelDoc >= docUpTo;
|
||||
docUpTo = topLevelDoc + 1;
|
||||
if (docs.size() >= size) {
|
||||
throw new CollectionTerminatedException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doSetNextReader(LeafReaderContext context) throws IOException {
|
||||
// if we have a reader state, and we haven't registered one already, register it
|
||||
// we need to check in readersState since even when the filter return null, setNextReader is still
|
||||
// called for that reader (before)
|
||||
if (currentReader != null && !readerStates.containsKey(currentReader)) {
|
||||
assert readerState != null;
|
||||
readerState.done = true;
|
||||
readerStates.put(currentReader, readerState);
|
||||
}
|
||||
this.currentReader = context.reader();
|
||||
this.docBase = context.docBase;
|
||||
this.readerState = new ReaderState();
|
||||
}
|
||||
|
||||
public static final RuntimeException StopCollectingException = new StopCollectingException();
|
||||
|
||||
static class StopCollectingException extends RuntimeException {
|
||||
@Override
|
||||
public Throwable fillInStackTrace() {
|
||||
return null;
|
||||
if (docs.size() >= size || context.docBase + context.reader().maxDoc() <= docUpTo) {
|
||||
// no need to collect a new segment, we either already collected enough
|
||||
// or the segment is not competitive
|
||||
throw new CollectionTerminatedException();
|
||||
}
|
||||
docBase = context.docBase;
|
||||
}
|
||||
}
|
||||
|
||||
public static class ScanFilter extends Filter {
|
||||
/**
|
||||
* A filtering query that matches all doc IDs that are not deleted and
|
||||
* greater than or equal to the configured doc ID.
|
||||
*/
|
||||
// pkg-private for testing
|
||||
static class MinDocQuery extends Query {
|
||||
|
||||
private final ConcurrentMap<IndexReader, ReaderState> readerStates;
|
||||
private final int minDoc;
|
||||
|
||||
private final ScanCollector scanCollector;
|
||||
|
||||
public ScanFilter(ConcurrentMap<IndexReader, ReaderState> readerStates, ScanCollector scanCollector) {
|
||||
this.readerStates = readerStates;
|
||||
this.scanCollector = scanCollector;
|
||||
MinDocQuery(int minDoc) {
|
||||
this.minDoc = minDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocIdSet getDocIdSet(LeafReaderContext context, Bits acceptedDocs) throws IOException {
|
||||
ReaderState readerState = readerStates.get(context.reader());
|
||||
if (readerState != null && readerState.done) {
|
||||
scanCollector.incCounter(readerState.count);
|
||||
return null;
|
||||
public int hashCode() {
|
||||
return 31 * super.hashCode() + minDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (super.equals(obj) == false) {
|
||||
return false;
|
||||
}
|
||||
return BitsFilteredDocIdSet.wrap(new AllDocIdSet(context.reader().maxDoc()), acceptedDocs);
|
||||
MinDocQuery that = (MinDocQuery) obj;
|
||||
return minDoc == that.minDoc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
|
||||
return new ConstantScoreWeight(this) {
|
||||
@Override
|
||||
public Scorer scorer(LeafReaderContext context, final Bits acceptDocs) throws IOException {
|
||||
final int maxDoc = context.reader().maxDoc();
|
||||
if (context.docBase + maxDoc <= minDoc) {
|
||||
return null;
|
||||
}
|
||||
final int segmentMinDoc = Math.max(0, minDoc - context.docBase);
|
||||
final DocIdSetIterator disi = new DocIdSetIterator() {
|
||||
|
||||
int doc = -1;
|
||||
|
||||
@Override
|
||||
public int docID() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int nextDoc() throws IOException {
|
||||
return advance(doc + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int advance(int target) throws IOException {
|
||||
assert target > doc;
|
||||
if (doc == -1) {
|
||||
// skip directly to minDoc
|
||||
doc = Math.max(target, segmentMinDoc);
|
||||
} else {
|
||||
doc = target;
|
||||
}
|
||||
while (doc < maxDoc) {
|
||||
if (acceptDocs == null || acceptDocs.get(doc)) {
|
||||
break;
|
||||
}
|
||||
doc += 1;
|
||||
}
|
||||
if (doc >= maxDoc) {
|
||||
doc = NO_MORE_DOCS;
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long cost() {
|
||||
return maxDoc - minDoc;
|
||||
}
|
||||
|
||||
};
|
||||
return new ConstantScoreScorer(this, score(), disi);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(String field) {
|
||||
return "ScanFilter";
|
||||
return "MinDocQuery(minDoc=" + minDoc + ")";
|
||||
}
|
||||
}
|
||||
|
||||
static class ReaderState {
|
||||
public int count;
|
||||
public boolean done;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.scan;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.QueryUtils;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.elasticsearch.search.scan.ScanContext.MinDocQuery;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class ScanContextTests extends ElasticsearchTestCase {
|
||||
|
||||
public void testMinDocQueryBasics() {
|
||||
MinDocQuery query1 = new MinDocQuery(42);
|
||||
MinDocQuery query2 = new MinDocQuery(42);
|
||||
MinDocQuery query3 = new MinDocQuery(43);
|
||||
QueryUtils.check(query1);
|
||||
QueryUtils.checkEqual(query1, query2);
|
||||
QueryUtils.checkUnequal(query1, query3);
|
||||
}
|
||||
|
||||
public void testMinDocQueryRandom() throws IOException {
|
||||
final int numDocs = randomIntBetween(10, 200);
|
||||
final Document doc = new Document();
|
||||
final Directory dir = newDirectory();
|
||||
final RandomIndexWriter w = new RandomIndexWriter(getRandom(), dir);
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
w.addDocument(doc);
|
||||
}
|
||||
final IndexReader reader = w.getReader();
|
||||
final IndexSearcher searcher = newSearcher(reader);
|
||||
for (int i = 0; i <= numDocs; ++i) {
|
||||
assertEquals(numDocs - i, searcher.count(new MinDocQuery(i)));
|
||||
}
|
||||
w.close();
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testRandom() throws Exception {
|
||||
final int numDocs = randomIntBetween(10, 200);
|
||||
final Document doc1 = new Document();
|
||||
doc1.add(new StringField("foo", "bar", Store.NO));
|
||||
final Document doc2 = new Document();
|
||||
final Directory dir = newDirectory();
|
||||
final RandomIndexWriter w = new RandomIndexWriter(getRandom(), dir);
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
w.addDocument(randomBoolean() ? doc1 : doc2);
|
||||
}
|
||||
final IndexReader reader = w.getReader();
|
||||
final IndexSearcher searcher = newSearcher(reader);
|
||||
|
||||
final boolean trackScores = randomBoolean();
|
||||
final int pageSize = randomIntBetween(1, numDocs / 2);
|
||||
Query query = new TermQuery(new Term("foo", "bar"));
|
||||
if (trackScores == false) {
|
||||
query.setBoost(0f);
|
||||
}
|
||||
final ScoreDoc[] expected = searcher.search(query, numDocs, Sort.INDEXORDER, true, true).scoreDocs;
|
||||
|
||||
final List<ScoreDoc> actual = new ArrayList<>();
|
||||
ScanContext context = new ScanContext();
|
||||
while (true) {
|
||||
final ScoreDoc[] page = context.execute(searcher, query, pageSize, trackScores).scoreDocs;
|
||||
assertTrue(page.length <= pageSize);
|
||||
if (page.length == 0) {
|
||||
assertEquals(0, context.execute(searcher, query, pageSize, trackScores).scoreDocs.length);
|
||||
break;
|
||||
}
|
||||
actual.addAll(Arrays.asList(page));
|
||||
}
|
||||
assertEquals(expected.length, actual.size());
|
||||
for (int i = 0; i < expected.length; ++i) {
|
||||
ScoreDoc sd1 = expected[i];
|
||||
ScoreDoc sd2 = actual.get(i);
|
||||
assertEquals(sd1.doc, sd2.doc);
|
||||
assertEquals(sd1.score, sd2.score, 0.001f);
|
||||
}
|
||||
w.close();
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue