From b3e7146b224b41d86688dfddb11805409b799d77 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 14 Aug 2015 13:29:33 +0200 Subject: [PATCH] Simplify ContextIndexSearcher. In particular this commit moves collector wrapping logic from ContextIndexSearcher to QueryPhase. --- .../query/TransportValidateQueryAction.java | 2 +- .../percolator/PercolateContext.java | 23 +-- .../search/aggregations/AggregationPhase.java | 4 +- .../elasticsearch/search/dfs/DfsPhase.java | 4 - .../search/internal/ContextIndexSearcher.java | 132 +------------- .../search/internal/DefaultSearchContext.java | 39 ++-- .../internal/FilteredSearchContext.java | 18 +- .../search/internal/SearchContext.java | 16 +- .../search/internal/SubSearchContext.java | 5 - .../search/query/QueryPhase.java | 167 +++++++++++++----- .../search/scan/ScanContext.java | 23 ++- .../search/scan/ScanContextTests.java | 13 +- .../elasticsearch/test/TestSearchContext.java | 17 +- 13 files changed, 205 insertions(+), 258 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java index 4437e80d96d..bed778bd4c9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java @@ -185,7 +185,7 @@ public class TransportValidateQueryAction extends TransportBroadcastAction subPhaseContexts = new HashMap<>(); + private final Map, Collector> queryCollectors = new HashMap<>(); public PercolateContext(PercolateShardRequest request, SearchShardTarget searchShardTarget, IndexShard indexShard, IndexService indexService, PageCacheRecycler pageCacheRecycler, @@ -232,7 +233,6 @@ public class PercolateContext extends SearchContext { public SearchContext parsedQuery(ParsedQuery query) { this.parsedQuery = query; this.query = query.query(); - this.queryRewritten = false; return this; } @@ -246,18 +246,6 @@ public class PercolateContext extends SearchContext { return query; } - @Override - public boolean queryRewritten() { - return queryRewritten; - } - - @Override - public SearchContext updateRewriteQuery(Query rewriteQuery) { - queryRewritten = true; - query = rewriteQuery; - return this; - } - @Override public String[] types() { return types; @@ -768,4 +756,9 @@ public class PercolateContext extends SearchContext { public void copyContextAndHeadersFrom(HasContextAndHeaders other) { assert false : "percolatecontext does not support contexts & headers"; } + + @Override + public Map, Collector> queryCollectors() { + return queryCollectors; + } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index c983c40495f..d4ed3b1af42 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -86,7 +86,7 @@ public class AggregationPhase implements SearchPhase { if (!collectors.isEmpty()) { final BucketCollector collector = BucketCollector.wrap(collectors); collector.preCollection(); - context.searcher().queryCollectors().put(AggregationPhase.class, collector); + context.queryCollectors().put(AggregationPhase.class, collector); } } catch (IOException e) { throw new AggregationInitializationException("Could not initialize aggregators", e); @@ -162,7 +162,7 @@ public class AggregationPhase implements SearchPhase { // disable aggregations so that they don't run on next pages in case of scrolling context.aggregations(null); - context.searcher().queryCollectors().remove(AggregationPhase.class); + context.queryCollectors().remove(AggregationPhase.class); } } diff --git a/core/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java b/core/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java index 3a87979c719..f5522921e7d 100644 --- a/core/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java +++ b/core/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java @@ -57,10 +57,6 @@ public class DfsPhase implements SearchPhase { public void execute(SearchContext context) { final ObjectHashSet termsSet = new ObjectHashSet<>(); try { - if (!context.queryRewritten()) { - context.updateRewriteQuery(context.searcher().rewrite(context.query())); - } - context.searcher().createNormalizedWeight(context.query(), true).extractTerms(new DelegateSet(termsSet)); for (RescoreSearchContext rescoreContext : context.rescore()) { rescoreContext.rescorer().extractTerms(context, rescoreContext, new DelegateSet(termsSet)); diff --git a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index dd7489cc7a5..e80d006ea72 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -20,41 +20,25 @@ package org.elasticsearch.search.internal; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.BooleanClause.Occur; -import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Collector; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.MultiCollector; import org.apache.lucene.search.Query; -import org.apache.lucene.search.TimeLimitingCollector; import org.apache.lucene.search.Weight; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.MinimumScoreCollector; -import org.elasticsearch.common.lucene.search.FilteredCollector; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.search.SearchService; import org.elasticsearch.search.dfs.CachedDfSource; import org.elasticsearch.search.internal.SearchContext.Lifetime; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * Context-aware extension of {@link IndexSearcher}. */ public class ContextIndexSearcher extends IndexSearcher implements Releasable { - public static enum Stage { - NA, - MAIN_QUERY - } - /** The wrapped {@link IndexSearcher}. The reason why we sometimes prefer delegating to this searcher instead of super is that * this instance may have more assertions, for example if it comes from MockInternalEngine which wraps the IndexSearcher into an * AssertingIndexSearcher. */ @@ -64,10 +48,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private CachedDfSource dfSource; - private Map, Collector> queryCollectors; - - private Stage currentState = Stage.NA; - public ContextIndexSearcher(SearchContext searchContext, Engine.Searcher searcher) { super(searcher.reader()); in = searcher.searcher(); @@ -83,49 +63,11 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { this.dfSource = dfSource; } - /** - * Adds a query level collector that runs at {@link Stage#MAIN_QUERY}. Note, supports - * {@link org.elasticsearch.common.lucene.search.XCollector} allowing for a callback - * when collection is done. - */ - public Map, Collector> queryCollectors() { - if (queryCollectors == null) { - queryCollectors = new HashMap<>(); - } - return queryCollectors; - } - - public void inStage(Stage stage) { - this.currentState = stage; - } - - public void finishStage(Stage stage) { - assert currentState == stage : "Expected stage " + stage + " but was stage " + currentState; - this.currentState = Stage.NA; - } - - @Override - public Query rewrite(Query original) throws IOException { - if (original == searchContext.query() || original == searchContext.parsedQuery().query()) { - // optimize in case its the top level search query and we already rewrote it... - if (searchContext.queryRewritten()) { - return searchContext.query(); - } - Query rewriteQuery = in.rewrite(original); - searchContext.updateRewriteQuery(rewriteQuery); - return rewriteQuery; - } else { - return in.rewrite(original); - } - } - @Override public Weight createNormalizedWeight(Query query, boolean needsScores) throws IOException { - // TODO: needsScores - // can we avoid dfs stuff here if we dont need scores? try { - // if its the main query, use we have dfs data, only then do it - if (dfSource != null && (query == searchContext.query() || query == searchContext.parsedQuery().query())) { + // if scores are needed and we have dfs data then use it + if (dfSource != null && needsScores) { return dfSource.createNormalizedWeight(query, needsScores); } return in.createNormalizedWeight(query, needsScores); @@ -135,81 +77,19 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { } } - @Override - public void search(Query query, Collector collector) throws IOException { - // Wrap the caller's collector with various wrappers e.g. those used to siphon - // matches off for aggregation or to impose a time-limit on collection. - final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis(); - final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER; - - if (timeoutSet) { - // TODO: change to use our own counter that uses the scheduler in ThreadPool - // throws TimeLimitingCollector.TimeExceededException when timeout has reached - collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeEstimateCounter(), searchContext.timeoutInMillis()); - } - if (terminateAfterSet) { - // throws Lucene.EarlyTerminationException when given count is reached - collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter()); - } - if (currentState == Stage.MAIN_QUERY) { - if (searchContext.parsedPostFilter() != null) { - // this will only get applied to the actual search collector and not - // to any scoped collectors, also, it will only be applied to the main collector - // since that is where the filter should only work - final Weight filterWeight = createNormalizedWeight(searchContext.parsedPostFilter().query(), false); - collector = new FilteredCollector(collector, filterWeight); - } - if (queryCollectors != null && !queryCollectors.isEmpty()) { - ArrayList allCollectors = new ArrayList<>(queryCollectors.values()); - allCollectors.add(collector); - collector = MultiCollector.wrap(allCollectors); - } - - // apply the minimum score after multi collector so we filter aggs as well - if (searchContext.minimumScore() != null) { - collector = new MinimumScoreCollector(collector, searchContext.minimumScore()); - } - } - super.search(query, collector); - } - - @Override - public void search(List leaves, Weight weight, Collector collector) throws IOException { - final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis(); - final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER; + public Explanation explain(Query query, int doc) throws IOException { try { - if (timeoutSet || terminateAfterSet) { - try { - super.search(leaves, weight, collector); - } catch (TimeLimitingCollector.TimeExceededException e) { - assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set"; - searchContext.queryResult().searchTimedOut(true); - } catch (Lucene.EarlyTerminationException e) { - assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set"; - searchContext.queryResult().terminatedEarly(true); - } - if (terminateAfterSet && searchContext.queryResult().terminatedEarly() == null) { - searchContext.queryResult().terminatedEarly(false); - } - } else { - super.search(leaves, weight, collector); - } + return in.explain(query, doc); } finally { searchContext.clearReleasables(Lifetime.COLLECTION); } } @Override - public Explanation explain(Query query, int doc) throws IOException { + protected void search(List leaves, Weight weight, Collector collector) throws IOException { try { - if (searchContext.aliasFilter() == null) { - return super.explain(query, doc); - } - BooleanQuery filteredQuery = new BooleanQuery(); - filteredQuery.add(query, Occur.MUST); - filteredQuery.add(searchContext.aliasFilter(), Occur.FILTER); - return super.explain(filteredQuery, doc); + super.search(leaves, weight, collector); } finally { searchContext.clearReleasables(Lifetime.COLLECTION); } diff --git a/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java index f420986272d..03c13c61e93 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java @@ -22,6 +22,7 @@ package org.elasticsearch.search.internal; import com.carrotsearch.hppc.ObjectObjectAssociativeContainer; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.*; import org.apache.lucene.util.Counter; @@ -66,6 +67,7 @@ import org.elasticsearch.search.rescore.RescoreSearchContext; import org.elasticsearch.search.scan.ScanContext; import org.elasticsearch.search.suggest.SuggestionSearchContext; +import java.io.IOException; import java.util.*; /** @@ -119,7 +121,6 @@ public class DefaultSearchContext extends SearchContext { private SuggestionSearchContext suggest; private List rescore; private SearchLookup searchLookup; - private boolean queryRewritten; private volatile long keepAlive; private ScoreDoc lastEmittedDoc; private final long originNanoTime = System.nanoTime(); @@ -127,6 +128,7 @@ public class DefaultSearchContext extends SearchContext { private InnerHitsContext innerHitsContext; private final Map subPhaseContexts = new HashMap<>(); + private final Map, Collector> queryCollectors = new HashMap<>(); public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, IndexService indexService, IndexShard indexShard, @@ -197,10 +199,15 @@ public class DefaultSearchContext extends SearchContext { parsedQuery(new ParsedQuery(filtered, parsedQuery())); } } + try { + this.query = searcher().rewrite(this.query); + } catch (IOException e) { + throw new QueryPhaseExecutionException(this, "Failed to rewrite main query", e); + } } @Override - public Filter searchFilter(String[] types) { + public Query searchFilter(String[] types) { Query filter = mapperService().searchFilter(types); if (filter == null && aliasFilter == null) { return null; @@ -212,7 +219,7 @@ public class DefaultSearchContext extends SearchContext { if (aliasFilter != null) { bq.add(aliasFilter, Occur.MUST); } - return new QueryWrapperFilter(bq); + return new ConstantScoreQuery(bq); } @Override @@ -513,7 +520,6 @@ public class DefaultSearchContext extends SearchContext { @Override public SearchContext parsedQuery(ParsedQuery query) { - queryRewritten = false; this.originalQuery = query; this.query = query.query(); return this; @@ -525,31 +531,13 @@ public class DefaultSearchContext extends SearchContext { } /** - * The query to execute, might be rewritten. + * The query to execute, in its rewritten form. */ @Override public Query query() { return this.query; } - /** - * Has the query been rewritten already? - */ - @Override - public boolean queryRewritten() { - return queryRewritten; - } - - /** - * Rewrites the query and updates it. Only happens once. - */ - @Override - public SearchContext updateRewriteQuery(Query rewriteQuery) { - query = rewriteQuery; - queryRewritten = true; - return this; - } - @Override public int from() { return from; @@ -810,4 +798,9 @@ public class DefaultSearchContext extends SearchContext { public void copyContextAndHeadersFrom(HasContextAndHeaders other) { request.copyContextAndHeadersFrom(other); } + + @Override + public Map, Collector> queryCollectors() { + return queryCollectors; + } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index c22842e7093..e2f6b48f02d 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -20,6 +20,8 @@ package org.elasticsearch.search.internal; import com.carrotsearch.hppc.ObjectObjectAssociativeContainer; + +import org.apache.lucene.search.Collector; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; @@ -58,6 +60,7 @@ import org.elasticsearch.search.scan.ScanContext; import org.elasticsearch.search.suggest.SuggestionSearchContext; import java.util.List; +import java.util.Map; import java.util.Set; public abstract class FilteredSearchContext extends SearchContext { @@ -375,16 +378,6 @@ public abstract class FilteredSearchContext extends SearchContext { return in.query(); } - @Override - public boolean queryRewritten() { - return in.queryRewritten(); - } - - @Override - public SearchContext updateRewriteQuery(Query rewriteQuery) { - return in.updateRewriteQuery(rewriteQuery); - } - @Override public int from() { return in.from(); @@ -624,4 +617,9 @@ public abstract class FilteredSearchContext extends SearchContext { public SubPhaseContext getFetchSubPhaseContext(FetchSubPhase.ContextFactory contextFactory) { return in.getFetchSubPhaseContext(contextFactory); } + + @Override + public Map, Collector> queryCollectors() { + return in.queryCollectors(); + } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 901b721c3b5..72feec7ba3e 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -21,6 +21,8 @@ package org.elasticsearch.search.internal; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.collect.MultimapBuilder; + +import org.apache.lucene.search.Collector; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; @@ -65,6 +67,7 @@ import org.elasticsearch.search.suggest.SuggestionSearchContext; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; public abstract class SearchContext implements Releasable, HasContextAndHeaders { @@ -257,16 +260,6 @@ public abstract class SearchContext implements Releasable, HasContextAndHeaders */ public abstract Query query(); - /** - * Has the query been rewritten already? - */ - public abstract boolean queryRewritten(); - - /** - * Rewrites the query and updates it. Only happens once. - */ - public abstract SearchContext updateRewriteQuery(Query rewriteQuery); - public abstract int from(); public abstract SearchContext from(int from); @@ -359,6 +352,9 @@ public abstract class SearchContext implements Releasable, HasContextAndHeaders public abstract Counter timeEstimateCounter(); + /** Return a view of the additional query collectors that should be run for this context. */ + public abstract Map, Collector> queryCollectors(); + /** * The life time of an object that is used during search execution. */ diff --git a/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java index be445c99e88..f70c9a7e1a4 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java @@ -206,11 +206,6 @@ public class SubSearchContext extends FilteredSearchContext { throw new UnsupportedOperationException("Not supported"); } - @Override - public SearchContext updateRewriteQuery(Query rewriteQuery) { - throw new UnsupportedOperationException("Not supported"); - } - @Override public int from() { return from; diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 00157061803..a9105d59237 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -21,24 +21,42 @@ package org.elasticsearch.search.query; import com.google.common.collect.ImmutableMap; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MultiCollector; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TimeLimitingCollector; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.search.TopFieldCollector; +import org.apache.lucene.search.TopScoreDocCollector; +import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.search.Weight; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.MinimumScoreCollector; +import org.elasticsearch.common.lucene.search.FilteredCollector; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchPhase; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.AggregationPhase; -import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.SearchContext.Lifetime; import org.elasticsearch.search.rescore.RescorePhase; import org.elasticsearch.search.rescore.RescoreSearchContext; +import org.elasticsearch.search.scan.ScanContext.ScanCollector; import org.elasticsearch.search.sort.SortParseElement; import org.elasticsearch.search.sort.TrackScoresParseElement; import org.elasticsearch.search.suggest.SuggestPhase; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; /** * @@ -97,66 +115,133 @@ public class QueryPhase implements SearchPhase { searchContext.queryResult().searchTimedOut(false); - searchContext.searcher().inStage(ContextIndexSearcher.Stage.MAIN_QUERY); boolean rescore = false; try { searchContext.queryResult().from(searchContext.from()); searchContext.queryResult().size(searchContext.size()); + final IndexSearcher searcher = searchContext.searcher(); Query query = searchContext.query(); - final TopDocs topDocs; - int numDocs = searchContext.from() + searchContext.size(); + final int totalNumDocs = searcher.getIndexReader().numDocs(); + int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs); + + Collector collector; + final Callable topDocsCallable; if (searchContext.size() == 0) { // no matter what the value of from is - topDocs = new TopDocs(searchContext.searcher().count(query), Lucene.EMPTY_SCORE_DOCS, 0); + final TotalHitCountCollector totalHitCountCollector = new TotalHitCountCollector(); + collector = totalHitCountCollector; + topDocsCallable = new Callable() { + @Override + public TopDocs call() throws Exception { + return new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0); + } + }; } else if (searchContext.searchType() == SearchType.SCAN) { - topDocs = searchContext.scanContext().execute(searchContext); + query = searchContext.scanContext().wrapQuery(query); + final ScanCollector scanCollector = searchContext.scanContext().collector(searchContext); + collector = scanCollector; + topDocsCallable = new Callable() { + @Override + public TopDocs call() throws Exception { + return scanCollector.topDocs(); + } + }; } else { // Perhaps have a dedicated scroll phase? + final TopDocsCollector topDocsCollector; + ScoreDoc lastEmittedDoc; if (searchContext.request().scroll() != null) { - numDocs = searchContext.size(); - ScoreDoc lastEmittedDoc = searchContext.lastEmittedDoc(); - if (searchContext.sort() != null) { - topDocs = searchContext.searcher().searchAfter( - lastEmittedDoc, query, null, numDocs, searchContext.sort(), - searchContext.trackScores(), searchContext.trackScores() - ); - } else { - rescore = !searchContext.rescore().isEmpty(); - for (RescoreSearchContext rescoreContext : searchContext.rescore()) { - numDocs = Math.max(rescoreContext.window(), numDocs); - } - topDocs = searchContext.searcher().searchAfter(lastEmittedDoc, query, numDocs); - } - - int size = topDocs.scoreDocs.length; - if (size > 0) { - // In the case of *QUERY_AND_FETCH we don't get back to shards telling them which least - // relevant docs got emitted as hit, we can simply mark the last doc as last emitted - if (searchContext.searchType() == SearchType.QUERY_AND_FETCH || - searchContext.searchType() == SearchType.DFS_QUERY_AND_FETCH) { - searchContext.lastEmittedDoc(topDocs.scoreDocs[size - 1]); - } - } + numDocs = Math.min(searchContext.size(), totalNumDocs); + lastEmittedDoc = searchContext.lastEmittedDoc(); } else { - if (searchContext.sort() != null) { - topDocs = searchContext.searcher().search(query, null, numDocs, searchContext.sort(), - searchContext.trackScores(), searchContext.trackScores()); - } else { - rescore = !searchContext.rescore().isEmpty(); - for (RescoreSearchContext rescoreContext : searchContext.rescore()) { - numDocs = Math.max(rescoreContext.window(), numDocs); - } - topDocs = searchContext.searcher().search(query, numDocs); + lastEmittedDoc = null; + } + if (totalNumDocs == 0) { + // top collectors don't like a size of 0 + numDocs = 1; + } + assert numDocs > 0; + if (searchContext.sort() != null) { + topDocsCollector = TopFieldCollector.create(searchContext.sort(), numDocs, + (FieldDoc) lastEmittedDoc, true, searchContext.trackScores(), searchContext.trackScores()); + } else { + rescore = !searchContext.rescore().isEmpty(); + for (RescoreSearchContext rescoreContext : searchContext.rescore()) { + numDocs = Math.max(rescoreContext.window(), numDocs); + } + topDocsCollector = TopScoreDocCollector.create(numDocs, lastEmittedDoc); + } + collector = topDocsCollector; + topDocsCallable = new Callable() { + @Override + public TopDocs call() throws Exception { + return topDocsCollector.topDocs(); + } + }; + } + + final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER; + if (terminateAfterSet) { + // throws Lucene.EarlyTerminationException when given count is reached + collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter()); + } + + if (searchContext.parsedPostFilter() != null) { + // this will only get applied to the actual search collector and not + // to any scoped collectors, also, it will only be applied to the main collector + // since that is where the filter should only work + final Weight filterWeight = searcher.createNormalizedWeight(searchContext.parsedPostFilter().query(), false); + collector = new FilteredCollector(collector, filterWeight); + } + + // plug in additional collectors, like aggregations + List allCollectors = new ArrayList<>(); + allCollectors.add(collector); + allCollectors.addAll(searchContext.queryCollectors().values()); + collector = MultiCollector.wrap(allCollectors); + + // apply the minimum score after multi collector so we filter aggs as well + if (searchContext.minimumScore() != null) { + collector = new MinimumScoreCollector(collector, searchContext.minimumScore()); + } + + final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis(); + if (timeoutSet) { + // TODO: change to use our own counter that uses the scheduler in ThreadPool + // throws TimeLimitingCollector.TimeExceededException when timeout has reached + collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeEstimateCounter(), searchContext.timeoutInMillis()); + } + + try { + searcher.search(query, collector); + } catch (TimeLimitingCollector.TimeExceededException e) { + assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set"; + searchContext.queryResult().searchTimedOut(true); + } catch (Lucene.EarlyTerminationException e) { + assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set"; + searchContext.queryResult().terminatedEarly(true); + } + if (terminateAfterSet && searchContext.queryResult().terminatedEarly() == null) { + searchContext.queryResult().terminatedEarly(false); + } + + final TopDocs topDocs = topDocsCallable.call(); + if (searchContext.request().scroll() != null) { + int size = topDocs.scoreDocs.length; + if (size > 0) { + // In the case of *QUERY_AND_FETCH we don't get back to shards telling them which least + // relevant docs got emitted as hit, we can simply mark the last doc as last emitted + if (searchContext.searchType() == SearchType.QUERY_AND_FETCH || + searchContext.searchType() == SearchType.DFS_QUERY_AND_FETCH) { + searchContext.lastEmittedDoc(topDocs.scoreDocs[size - 1]); } } } searchContext.queryResult().topDocs(topDocs); } catch (Throwable e) { throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e); - } finally { - searchContext.searcher().finishStage(ContextIndexSearcher.Stage.MAIN_QUERY); } if (rescore) { // only if we do a regular search rescorePhase.execute(searchContext); diff --git a/core/src/main/java/org/elasticsearch/search/scan/ScanContext.java b/core/src/main/java/org/elasticsearch/search/scan/ScanContext.java index 7aaead1c42f..c0018d41020 100644 --- a/core/src/main/java/org/elasticsearch/search/scan/ScanContext.java +++ b/core/src/main/java/org/elasticsearch/search/scan/ScanContext.java @@ -47,18 +47,23 @@ public class ScanContext { private volatile int docUpTo; - public TopDocs execute(SearchContext context) throws IOException { - return execute(context.searcher(), context.query(), context.size(), context.trackScores()); + public ScanCollector collector(SearchContext context) { + return collector(context.size(), context.trackScores()); } - TopDocs execute(IndexSearcher searcher, Query query, int size, boolean trackScores) throws IOException { - ScanCollector collector = new ScanCollector(size, trackScores); - Query q = Queries.filtered(query, new MinDocQuery(docUpTo)); - searcher.search(q, collector); - return collector.topDocs(); + /** Create a {@link ScanCollector} for the given page size. */ + ScanCollector collector(int size, boolean trackScores) { + return new ScanCollector(size, trackScores); } - private class ScanCollector extends SimpleCollector { + /** + * Wrap the query so that it can skip directly to the right document. + */ + public Query wrapQuery(Query query) { + return Queries.filtered(query, new MinDocQuery(docUpTo)); + } + + public class ScanCollector extends SimpleCollector { private final List docs; @@ -70,7 +75,7 @@ public class ScanContext { private int docBase; - ScanCollector(int size, boolean trackScores) { + private ScanCollector(int size, boolean trackScores) { this.trackScores = trackScores; this.docs = new ArrayList<>(size); this.size = size; diff --git a/core/src/test/java/org/elasticsearch/search/scan/ScanContextTests.java b/core/src/test/java/org/elasticsearch/search/scan/ScanContextTests.java index 918bdd696c8..e804393e0ed 100644 --- a/core/src/test/java/org/elasticsearch/search/scan/ScanContextTests.java +++ b/core/src/test/java/org/elasticsearch/search/scan/ScanContextTests.java @@ -31,8 +31,10 @@ import org.apache.lucene.search.QueryUtils; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.elasticsearch.search.scan.ScanContext.MinDocQuery; +import org.elasticsearch.search.scan.ScanContext.ScanCollector; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -69,6 +71,13 @@ public class ScanContextTests extends ESTestCase { dir.close(); } + private static TopDocs execute(IndexSearcher searcher, ScanContext ctx, Query query, int pageSize, boolean trackScores) throws IOException { + query = ctx.wrapQuery(query); + ScanCollector collector = ctx.collector(pageSize, trackScores); + searcher.search(query, collector); + return collector.topDocs(); + } + public void testRandom() throws Exception { final int numDocs = randomIntBetween(10, 200); final Document doc1 = new Document(); @@ -93,10 +102,10 @@ public class ScanContextTests extends ESTestCase { final List actual = new ArrayList<>(); ScanContext context = new ScanContext(); while (true) { - final ScoreDoc[] page = context.execute(searcher, query, pageSize, trackScores).scoreDocs; + final ScoreDoc[] page = execute(searcher,context, query, pageSize, trackScores).scoreDocs; assertTrue(page.length <= pageSize); if (page.length == 0) { - assertEquals(0, context.execute(searcher, query, pageSize, trackScores).scoreDocs.length); + assertEquals(0, execute(searcher, context, query, pageSize, trackScores).scoreDocs.length); break; } actual.addAll(Arrays.asList(page)); diff --git a/core/src/test/java/org/elasticsearch/test/TestSearchContext.java b/core/src/test/java/org/elasticsearch/test/TestSearchContext.java index 44004fe6f48..4527fb5f4b0 100644 --- a/core/src/test/java/org/elasticsearch/test/TestSearchContext.java +++ b/core/src/test/java/org/elasticsearch/test/TestSearchContext.java @@ -20,6 +20,7 @@ package org.elasticsearch.test; import com.carrotsearch.hppc.ObjectObjectAssociativeContainer; +import org.apache.lucene.search.Collector; import org.apache.lucene.search.Filter; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; @@ -74,6 +75,7 @@ public class TestSearchContext extends SearchContext { final IndexFieldDataService indexFieldDataService; final BitsetFilterCache fixedBitSetFilterCache; final ThreadPool threadPool; + final Map, Collector> queryCollectors = new HashMap<>(); ContextIndexSearcher searcher; int size; @@ -410,16 +412,6 @@ public class TestSearchContext extends SearchContext { return null; } - @Override - public boolean queryRewritten() { - return false; - } - - @Override - public SearchContext updateRewriteQuery(Query rewriteQuery) { - return null; - } - @Override public int from() { return 0; @@ -667,4 +659,9 @@ public class TestSearchContext extends SearchContext { @Override public void copyContextAndHeadersFrom(HasContextAndHeaders other) {} + + @Override + public Map, Collector> queryCollectors() { + return queryCollectors; + } }