Simplify ContextIndexSearcher.
In particular this commit moves collector wrapping logic from ContextIndexSearcher to QueryPhase.
This commit is contained in:
parent
470f5370b9
commit
b3e7146b22
|
@ -185,7 +185,7 @@ public class TransportValidateQueryAction extends TransportBroadcastAction<Valid
|
|||
|
||||
valid = true;
|
||||
if (request.explain()) {
|
||||
explanation = searchContext.query().toString();
|
||||
explanation = searchContext.parsedQuery().query().toString();
|
||||
}
|
||||
if (request.rewrite()) {
|
||||
explanation = getRewrittenQuery(searcher.searcher(), searchContext.query());
|
||||
|
|
|
@ -20,9 +20,11 @@ package org.elasticsearch.percolator;
|
|||
|
||||
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
|
@ -32,7 +34,6 @@ import org.apache.lucene.util.Counter;
|
|||
import org.elasticsearch.action.percolate.PercolateShardRequest;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.cache.recycler.PageCacheRecycler;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.*;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
|
@ -111,13 +112,13 @@ public class PercolateContext extends SearchContext {
|
|||
private SearchLookup searchLookup;
|
||||
private ParsedQuery parsedQuery;
|
||||
private Query query;
|
||||
private boolean queryRewritten;
|
||||
private Query percolateQuery;
|
||||
private FetchSubPhase.HitContext hitContext;
|
||||
private SearchContextAggregations aggregations;
|
||||
private QuerySearchResult querySearchResult;
|
||||
private Sort sort;
|
||||
private final Map<String, FetchSubPhaseContext> subPhaseContexts = new HashMap<>();
|
||||
private final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
|
||||
|
||||
public PercolateContext(PercolateShardRequest request, SearchShardTarget searchShardTarget, IndexShard indexShard,
|
||||
IndexService indexService, PageCacheRecycler pageCacheRecycler,
|
||||
|
@ -232,7 +233,6 @@ public class PercolateContext extends SearchContext {
|
|||
public SearchContext parsedQuery(ParsedQuery query) {
|
||||
this.parsedQuery = query;
|
||||
this.query = query.query();
|
||||
this.queryRewritten = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -246,18 +246,6 @@ public class PercolateContext extends SearchContext {
|
|||
return query;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean queryRewritten() {
|
||||
return queryRewritten;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext updateRewriteQuery(Query rewriteQuery) {
|
||||
queryRewritten = true;
|
||||
query = rewriteQuery;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] types() {
|
||||
return types;
|
||||
|
@ -768,4 +756,9 @@ public class PercolateContext extends SearchContext {
|
|||
public void copyContextAndHeadersFrom(HasContextAndHeaders other) {
|
||||
assert false : "percolatecontext does not support contexts & headers";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Class<?>, Collector> queryCollectors() {
|
||||
return queryCollectors;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ public class AggregationPhase implements SearchPhase {
|
|||
if (!collectors.isEmpty()) {
|
||||
final BucketCollector collector = BucketCollector.wrap(collectors);
|
||||
collector.preCollection();
|
||||
context.searcher().queryCollectors().put(AggregationPhase.class, collector);
|
||||
context.queryCollectors().put(AggregationPhase.class, collector);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AggregationInitializationException("Could not initialize aggregators", e);
|
||||
|
@ -162,7 +162,7 @@ public class AggregationPhase implements SearchPhase {
|
|||
|
||||
// disable aggregations so that they don't run on next pages in case of scrolling
|
||||
context.aggregations(null);
|
||||
context.searcher().queryCollectors().remove(AggregationPhase.class);
|
||||
context.queryCollectors().remove(AggregationPhase.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -57,10 +57,6 @@ public class DfsPhase implements SearchPhase {
|
|||
public void execute(SearchContext context) {
|
||||
final ObjectHashSet<Term> termsSet = new ObjectHashSet<>();
|
||||
try {
|
||||
if (!context.queryRewritten()) {
|
||||
context.updateRewriteQuery(context.searcher().rewrite(context.query()));
|
||||
}
|
||||
|
||||
context.searcher().createNormalizedWeight(context.query(), true).extractTerms(new DelegateSet(termsSet));
|
||||
for (RescoreSearchContext rescoreContext : context.rescore()) {
|
||||
rescoreContext.rescorer().extractTerms(context, rescoreContext, new DelegateSet(termsSet));
|
||||
|
|
|
@ -20,41 +20,25 @@
|
|||
package org.elasticsearch.search.internal;
|
||||
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.Explanation;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MultiCollector;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.TimeLimitingCollector;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.MinimumScoreCollector;
|
||||
import org.elasticsearch.common.lucene.search.FilteredCollector;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.dfs.CachedDfSource;
|
||||
import org.elasticsearch.search.internal.SearchContext.Lifetime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Context-aware extension of {@link IndexSearcher}.
|
||||
*/
|
||||
public class ContextIndexSearcher extends IndexSearcher implements Releasable {
|
||||
|
||||
public static enum Stage {
|
||||
NA,
|
||||
MAIN_QUERY
|
||||
}
|
||||
|
||||
/** The wrapped {@link IndexSearcher}. The reason why we sometimes prefer delegating to this searcher instead of <tt>super</tt> is that
|
||||
* this instance may have more assertions, for example if it comes from MockInternalEngine which wraps the IndexSearcher into an
|
||||
* AssertingIndexSearcher. */
|
||||
|
@ -64,10 +48,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
|
|||
|
||||
private CachedDfSource dfSource;
|
||||
|
||||
private Map<Class<?>, Collector> queryCollectors;
|
||||
|
||||
private Stage currentState = Stage.NA;
|
||||
|
||||
public ContextIndexSearcher(SearchContext searchContext, Engine.Searcher searcher) {
|
||||
super(searcher.reader());
|
||||
in = searcher.searcher();
|
||||
|
@ -83,49 +63,11 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
|
|||
this.dfSource = dfSource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a query level collector that runs at {@link Stage#MAIN_QUERY}. Note, supports
|
||||
* {@link org.elasticsearch.common.lucene.search.XCollector} allowing for a callback
|
||||
* when collection is done.
|
||||
*/
|
||||
public Map<Class<?>, Collector> queryCollectors() {
|
||||
if (queryCollectors == null) {
|
||||
queryCollectors = new HashMap<>();
|
||||
}
|
||||
return queryCollectors;
|
||||
}
|
||||
|
||||
public void inStage(Stage stage) {
|
||||
this.currentState = stage;
|
||||
}
|
||||
|
||||
public void finishStage(Stage stage) {
|
||||
assert currentState == stage : "Expected stage " + stage + " but was stage " + currentState;
|
||||
this.currentState = Stage.NA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query rewrite(Query original) throws IOException {
|
||||
if (original == searchContext.query() || original == searchContext.parsedQuery().query()) {
|
||||
// optimize in case its the top level search query and we already rewrote it...
|
||||
if (searchContext.queryRewritten()) {
|
||||
return searchContext.query();
|
||||
}
|
||||
Query rewriteQuery = in.rewrite(original);
|
||||
searchContext.updateRewriteQuery(rewriteQuery);
|
||||
return rewriteQuery;
|
||||
} else {
|
||||
return in.rewrite(original);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Weight createNormalizedWeight(Query query, boolean needsScores) throws IOException {
|
||||
// TODO: needsScores
|
||||
// can we avoid dfs stuff here if we dont need scores?
|
||||
try {
|
||||
// if its the main query, use we have dfs data, only then do it
|
||||
if (dfSource != null && (query == searchContext.query() || query == searchContext.parsedQuery().query())) {
|
||||
// if scores are needed and we have dfs data then use it
|
||||
if (dfSource != null && needsScores) {
|
||||
return dfSource.createNormalizedWeight(query, needsScores);
|
||||
}
|
||||
return in.createNormalizedWeight(query, needsScores);
|
||||
|
@ -135,81 +77,19 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void search(Query query, Collector collector) throws IOException {
|
||||
// Wrap the caller's collector with various wrappers e.g. those used to siphon
|
||||
// matches off for aggregation or to impose a time-limit on collection.
|
||||
final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis();
|
||||
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
|
||||
|
||||
if (timeoutSet) {
|
||||
// TODO: change to use our own counter that uses the scheduler in ThreadPool
|
||||
// throws TimeLimitingCollector.TimeExceededException when timeout has reached
|
||||
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeEstimateCounter(), searchContext.timeoutInMillis());
|
||||
}
|
||||
if (terminateAfterSet) {
|
||||
// throws Lucene.EarlyTerminationException when given count is reached
|
||||
collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
|
||||
}
|
||||
if (currentState == Stage.MAIN_QUERY) {
|
||||
if (searchContext.parsedPostFilter() != null) {
|
||||
// this will only get applied to the actual search collector and not
|
||||
// to any scoped collectors, also, it will only be applied to the main collector
|
||||
// since that is where the filter should only work
|
||||
final Weight filterWeight = createNormalizedWeight(searchContext.parsedPostFilter().query(), false);
|
||||
collector = new FilteredCollector(collector, filterWeight);
|
||||
}
|
||||
if (queryCollectors != null && !queryCollectors.isEmpty()) {
|
||||
ArrayList<Collector> allCollectors = new ArrayList<>(queryCollectors.values());
|
||||
allCollectors.add(collector);
|
||||
collector = MultiCollector.wrap(allCollectors);
|
||||
}
|
||||
|
||||
// apply the minimum score after multi collector so we filter aggs as well
|
||||
if (searchContext.minimumScore() != null) {
|
||||
collector = new MinimumScoreCollector(collector, searchContext.minimumScore());
|
||||
}
|
||||
}
|
||||
super.search(query, collector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
|
||||
final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis();
|
||||
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
|
||||
public Explanation explain(Query query, int doc) throws IOException {
|
||||
try {
|
||||
if (timeoutSet || terminateAfterSet) {
|
||||
try {
|
||||
super.search(leaves, weight, collector);
|
||||
} catch (TimeLimitingCollector.TimeExceededException e) {
|
||||
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
|
||||
searchContext.queryResult().searchTimedOut(true);
|
||||
} catch (Lucene.EarlyTerminationException e) {
|
||||
assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set";
|
||||
searchContext.queryResult().terminatedEarly(true);
|
||||
}
|
||||
if (terminateAfterSet && searchContext.queryResult().terminatedEarly() == null) {
|
||||
searchContext.queryResult().terminatedEarly(false);
|
||||
}
|
||||
} else {
|
||||
super.search(leaves, weight, collector);
|
||||
}
|
||||
return in.explain(query, doc);
|
||||
} finally {
|
||||
searchContext.clearReleasables(Lifetime.COLLECTION);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Explanation explain(Query query, int doc) throws IOException {
|
||||
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
|
||||
try {
|
||||
if (searchContext.aliasFilter() == null) {
|
||||
return super.explain(query, doc);
|
||||
}
|
||||
BooleanQuery filteredQuery = new BooleanQuery();
|
||||
filteredQuery.add(query, Occur.MUST);
|
||||
filteredQuery.add(searchContext.aliasFilter(), Occur.FILTER);
|
||||
return super.explain(filteredQuery, doc);
|
||||
super.search(leaves, weight, collector);
|
||||
} finally {
|
||||
searchContext.clearReleasables(Lifetime.COLLECTION);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.search.internal;
|
|||
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.util.Counter;
|
||||
|
@ -66,6 +67,7 @@ import org.elasticsearch.search.rescore.RescoreSearchContext;
|
|||
import org.elasticsearch.search.scan.ScanContext;
|
||||
import org.elasticsearch.search.suggest.SuggestionSearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
|
@ -119,7 +121,6 @@ public class DefaultSearchContext extends SearchContext {
|
|||
private SuggestionSearchContext suggest;
|
||||
private List<RescoreSearchContext> rescore;
|
||||
private SearchLookup searchLookup;
|
||||
private boolean queryRewritten;
|
||||
private volatile long keepAlive;
|
||||
private ScoreDoc lastEmittedDoc;
|
||||
private final long originNanoTime = System.nanoTime();
|
||||
|
@ -127,6 +128,7 @@ public class DefaultSearchContext extends SearchContext {
|
|||
private InnerHitsContext innerHitsContext;
|
||||
|
||||
private final Map<String, FetchSubPhaseContext> subPhaseContexts = new HashMap<>();
|
||||
private final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
|
||||
|
||||
public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
|
||||
Engine.Searcher engineSearcher, IndexService indexService, IndexShard indexShard,
|
||||
|
@ -197,10 +199,15 @@ public class DefaultSearchContext extends SearchContext {
|
|||
parsedQuery(new ParsedQuery(filtered, parsedQuery()));
|
||||
}
|
||||
}
|
||||
try {
|
||||
this.query = searcher().rewrite(this.query);
|
||||
} catch (IOException e) {
|
||||
throw new QueryPhaseExecutionException(this, "Failed to rewrite main query", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Filter searchFilter(String[] types) {
|
||||
public Query searchFilter(String[] types) {
|
||||
Query filter = mapperService().searchFilter(types);
|
||||
if (filter == null && aliasFilter == null) {
|
||||
return null;
|
||||
|
@ -212,7 +219,7 @@ public class DefaultSearchContext extends SearchContext {
|
|||
if (aliasFilter != null) {
|
||||
bq.add(aliasFilter, Occur.MUST);
|
||||
}
|
||||
return new QueryWrapperFilter(bq);
|
||||
return new ConstantScoreQuery(bq);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -513,7 +520,6 @@ public class DefaultSearchContext extends SearchContext {
|
|||
|
||||
@Override
|
||||
public SearchContext parsedQuery(ParsedQuery query) {
|
||||
queryRewritten = false;
|
||||
this.originalQuery = query;
|
||||
this.query = query.query();
|
||||
return this;
|
||||
|
@ -525,31 +531,13 @@ public class DefaultSearchContext extends SearchContext {
|
|||
}
|
||||
|
||||
/**
|
||||
* The query to execute, might be rewritten.
|
||||
* The query to execute, in its rewritten form.
|
||||
*/
|
||||
@Override
|
||||
public Query query() {
|
||||
return this.query;
|
||||
}
|
||||
|
||||
/**
|
||||
* Has the query been rewritten already?
|
||||
*/
|
||||
@Override
|
||||
public boolean queryRewritten() {
|
||||
return queryRewritten;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rewrites the query and updates it. Only happens once.
|
||||
*/
|
||||
@Override
|
||||
public SearchContext updateRewriteQuery(Query rewriteQuery) {
|
||||
query = rewriteQuery;
|
||||
queryRewritten = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int from() {
|
||||
return from;
|
||||
|
@ -810,4 +798,9 @@ public class DefaultSearchContext extends SearchContext {
|
|||
public void copyContextAndHeadersFrom(HasContextAndHeaders other) {
|
||||
request.copyContextAndHeadersFrom(other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Class<?>, Collector> queryCollectors() {
|
||||
return queryCollectors;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
package org.elasticsearch.search.internal;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
|
||||
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Sort;
|
||||
|
@ -58,6 +60,7 @@ import org.elasticsearch.search.scan.ScanContext;
|
|||
import org.elasticsearch.search.suggest.SuggestionSearchContext;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public abstract class FilteredSearchContext extends SearchContext {
|
||||
|
@ -375,16 +378,6 @@ public abstract class FilteredSearchContext extends SearchContext {
|
|||
return in.query();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean queryRewritten() {
|
||||
return in.queryRewritten();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext updateRewriteQuery(Query rewriteQuery) {
|
||||
return in.updateRewriteQuery(rewriteQuery);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int from() {
|
||||
return in.from();
|
||||
|
@ -624,4 +617,9 @@ public abstract class FilteredSearchContext extends SearchContext {
|
|||
public <SubPhaseContext extends FetchSubPhaseContext> SubPhaseContext getFetchSubPhaseContext(FetchSubPhase.ContextFactory<SubPhaseContext> contextFactory) {
|
||||
return in.getFetchSubPhaseContext(contextFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Class<?>, Collector> queryCollectors() {
|
||||
return in.queryCollectors();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.search.internal;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.MultimapBuilder;
|
||||
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Sort;
|
||||
|
@ -65,6 +67,7 @@ import org.elasticsearch.search.suggest.SuggestionSearchContext;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public abstract class SearchContext implements Releasable, HasContextAndHeaders {
|
||||
|
@ -257,16 +260,6 @@ public abstract class SearchContext implements Releasable, HasContextAndHeaders
|
|||
*/
|
||||
public abstract Query query();
|
||||
|
||||
/**
|
||||
* Has the query been rewritten already?
|
||||
*/
|
||||
public abstract boolean queryRewritten();
|
||||
|
||||
/**
|
||||
* Rewrites the query and updates it. Only happens once.
|
||||
*/
|
||||
public abstract SearchContext updateRewriteQuery(Query rewriteQuery);
|
||||
|
||||
public abstract int from();
|
||||
|
||||
public abstract SearchContext from(int from);
|
||||
|
@ -359,6 +352,9 @@ public abstract class SearchContext implements Releasable, HasContextAndHeaders
|
|||
|
||||
public abstract Counter timeEstimateCounter();
|
||||
|
||||
/** Return a view of the additional query collectors that should be run for this context. */
|
||||
public abstract Map<Class<?>, Collector> queryCollectors();
|
||||
|
||||
/**
|
||||
* The life time of an object that is used during search execution.
|
||||
*/
|
||||
|
|
|
@ -206,11 +206,6 @@ public class SubSearchContext extends FilteredSearchContext {
|
|||
throw new UnsupportedOperationException("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext updateRewriteQuery(Query rewriteQuery) {
|
||||
throw new UnsupportedOperationException("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int from() {
|
||||
return from;
|
||||
|
|
|
@ -21,24 +21,42 @@ package org.elasticsearch.search.query;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MultiCollector;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.TimeLimitingCollector;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TopDocsCollector;
|
||||
import org.apache.lucene.search.TopFieldCollector;
|
||||
import org.apache.lucene.search.TopScoreDocCollector;
|
||||
import org.apache.lucene.search.TotalHitCountCollector;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.MinimumScoreCollector;
|
||||
import org.elasticsearch.common.lucene.search.FilteredCollector;
|
||||
import org.elasticsearch.search.SearchParseElement;
|
||||
import org.elasticsearch.search.SearchPhase;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.aggregations.AggregationPhase;
|
||||
import org.elasticsearch.search.internal.ContextIndexSearcher;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.search.internal.SearchContext.Lifetime;
|
||||
import org.elasticsearch.search.rescore.RescorePhase;
|
||||
import org.elasticsearch.search.rescore.RescoreSearchContext;
|
||||
import org.elasticsearch.search.scan.ScanContext.ScanCollector;
|
||||
import org.elasticsearch.search.sort.SortParseElement;
|
||||
import org.elasticsearch.search.sort.TrackScoresParseElement;
|
||||
import org.elasticsearch.search.suggest.SuggestPhase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -97,39 +115,120 @@ public class QueryPhase implements SearchPhase {
|
|||
|
||||
searchContext.queryResult().searchTimedOut(false);
|
||||
|
||||
searchContext.searcher().inStage(ContextIndexSearcher.Stage.MAIN_QUERY);
|
||||
boolean rescore = false;
|
||||
try {
|
||||
searchContext.queryResult().from(searchContext.from());
|
||||
searchContext.queryResult().size(searchContext.size());
|
||||
|
||||
final IndexSearcher searcher = searchContext.searcher();
|
||||
Query query = searchContext.query();
|
||||
|
||||
final TopDocs topDocs;
|
||||
int numDocs = searchContext.from() + searchContext.size();
|
||||
final int totalNumDocs = searcher.getIndexReader().numDocs();
|
||||
int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
|
||||
|
||||
Collector collector;
|
||||
final Callable<TopDocs> topDocsCallable;
|
||||
|
||||
if (searchContext.size() == 0) { // no matter what the value of from is
|
||||
topDocs = new TopDocs(searchContext.searcher().count(query), Lucene.EMPTY_SCORE_DOCS, 0);
|
||||
final TotalHitCountCollector totalHitCountCollector = new TotalHitCountCollector();
|
||||
collector = totalHitCountCollector;
|
||||
topDocsCallable = new Callable<TopDocs>() {
|
||||
@Override
|
||||
public TopDocs call() throws Exception {
|
||||
return new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
|
||||
}
|
||||
};
|
||||
} else if (searchContext.searchType() == SearchType.SCAN) {
|
||||
topDocs = searchContext.scanContext().execute(searchContext);
|
||||
query = searchContext.scanContext().wrapQuery(query);
|
||||
final ScanCollector scanCollector = searchContext.scanContext().collector(searchContext);
|
||||
collector = scanCollector;
|
||||
topDocsCallable = new Callable<TopDocs>() {
|
||||
@Override
|
||||
public TopDocs call() throws Exception {
|
||||
return scanCollector.topDocs();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// Perhaps have a dedicated scroll phase?
|
||||
final TopDocsCollector<?> topDocsCollector;
|
||||
ScoreDoc lastEmittedDoc;
|
||||
if (searchContext.request().scroll() != null) {
|
||||
numDocs = searchContext.size();
|
||||
ScoreDoc lastEmittedDoc = searchContext.lastEmittedDoc();
|
||||
numDocs = Math.min(searchContext.size(), totalNumDocs);
|
||||
lastEmittedDoc = searchContext.lastEmittedDoc();
|
||||
} else {
|
||||
lastEmittedDoc = null;
|
||||
}
|
||||
if (totalNumDocs == 0) {
|
||||
// top collectors don't like a size of 0
|
||||
numDocs = 1;
|
||||
}
|
||||
assert numDocs > 0;
|
||||
if (searchContext.sort() != null) {
|
||||
topDocs = searchContext.searcher().searchAfter(
|
||||
lastEmittedDoc, query, null, numDocs, searchContext.sort(),
|
||||
searchContext.trackScores(), searchContext.trackScores()
|
||||
);
|
||||
topDocsCollector = TopFieldCollector.create(searchContext.sort(), numDocs,
|
||||
(FieldDoc) lastEmittedDoc, true, searchContext.trackScores(), searchContext.trackScores());
|
||||
} else {
|
||||
rescore = !searchContext.rescore().isEmpty();
|
||||
for (RescoreSearchContext rescoreContext : searchContext.rescore()) {
|
||||
numDocs = Math.max(rescoreContext.window(), numDocs);
|
||||
}
|
||||
topDocs = searchContext.searcher().searchAfter(lastEmittedDoc, query, numDocs);
|
||||
topDocsCollector = TopScoreDocCollector.create(numDocs, lastEmittedDoc);
|
||||
}
|
||||
collector = topDocsCollector;
|
||||
topDocsCallable = new Callable<TopDocs>() {
|
||||
@Override
|
||||
public TopDocs call() throws Exception {
|
||||
return topDocsCollector.topDocs();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
|
||||
if (terminateAfterSet) {
|
||||
// throws Lucene.EarlyTerminationException when given count is reached
|
||||
collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
|
||||
}
|
||||
|
||||
if (searchContext.parsedPostFilter() != null) {
|
||||
// this will only get applied to the actual search collector and not
|
||||
// to any scoped collectors, also, it will only be applied to the main collector
|
||||
// since that is where the filter should only work
|
||||
final Weight filterWeight = searcher.createNormalizedWeight(searchContext.parsedPostFilter().query(), false);
|
||||
collector = new FilteredCollector(collector, filterWeight);
|
||||
}
|
||||
|
||||
// plug in additional collectors, like aggregations
|
||||
List<Collector> allCollectors = new ArrayList<>();
|
||||
allCollectors.add(collector);
|
||||
allCollectors.addAll(searchContext.queryCollectors().values());
|
||||
collector = MultiCollector.wrap(allCollectors);
|
||||
|
||||
// apply the minimum score after multi collector so we filter aggs as well
|
||||
if (searchContext.minimumScore() != null) {
|
||||
collector = new MinimumScoreCollector(collector, searchContext.minimumScore());
|
||||
}
|
||||
|
||||
final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis();
|
||||
if (timeoutSet) {
|
||||
// TODO: change to use our own counter that uses the scheduler in ThreadPool
|
||||
// throws TimeLimitingCollector.TimeExceededException when timeout has reached
|
||||
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeEstimateCounter(), searchContext.timeoutInMillis());
|
||||
}
|
||||
|
||||
try {
|
||||
searcher.search(query, collector);
|
||||
} catch (TimeLimitingCollector.TimeExceededException e) {
|
||||
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
|
||||
searchContext.queryResult().searchTimedOut(true);
|
||||
} catch (Lucene.EarlyTerminationException e) {
|
||||
assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set";
|
||||
searchContext.queryResult().terminatedEarly(true);
|
||||
}
|
||||
if (terminateAfterSet && searchContext.queryResult().terminatedEarly() == null) {
|
||||
searchContext.queryResult().terminatedEarly(false);
|
||||
}
|
||||
|
||||
final TopDocs topDocs = topDocsCallable.call();
|
||||
if (searchContext.request().scroll() != null) {
|
||||
int size = topDocs.scoreDocs.length;
|
||||
if (size > 0) {
|
||||
// In the case of *QUERY_AND_FETCH we don't get back to shards telling them which least
|
||||
|
@ -139,24 +238,10 @@ public class QueryPhase implements SearchPhase {
|
|||
searchContext.lastEmittedDoc(topDocs.scoreDocs[size - 1]);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (searchContext.sort() != null) {
|
||||
topDocs = searchContext.searcher().search(query, null, numDocs, searchContext.sort(),
|
||||
searchContext.trackScores(), searchContext.trackScores());
|
||||
} else {
|
||||
rescore = !searchContext.rescore().isEmpty();
|
||||
for (RescoreSearchContext rescoreContext : searchContext.rescore()) {
|
||||
numDocs = Math.max(rescoreContext.window(), numDocs);
|
||||
}
|
||||
topDocs = searchContext.searcher().search(query, numDocs);
|
||||
}
|
||||
}
|
||||
}
|
||||
searchContext.queryResult().topDocs(topDocs);
|
||||
} catch (Throwable e) {
|
||||
throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
|
||||
} finally {
|
||||
searchContext.searcher().finishStage(ContextIndexSearcher.Stage.MAIN_QUERY);
|
||||
}
|
||||
if (rescore) { // only if we do a regular search
|
||||
rescorePhase.execute(searchContext);
|
||||
|
|
|
@ -47,18 +47,23 @@ public class ScanContext {
|
|||
|
||||
private volatile int docUpTo;
|
||||
|
||||
public TopDocs execute(SearchContext context) throws IOException {
|
||||
return execute(context.searcher(), context.query(), context.size(), context.trackScores());
|
||||
public ScanCollector collector(SearchContext context) {
|
||||
return collector(context.size(), context.trackScores());
|
||||
}
|
||||
|
||||
TopDocs execute(IndexSearcher searcher, Query query, int size, boolean trackScores) throws IOException {
|
||||
ScanCollector collector = new ScanCollector(size, trackScores);
|
||||
Query q = Queries.filtered(query, new MinDocQuery(docUpTo));
|
||||
searcher.search(q, collector);
|
||||
return collector.topDocs();
|
||||
/** Create a {@link ScanCollector} for the given page size. */
|
||||
ScanCollector collector(int size, boolean trackScores) {
|
||||
return new ScanCollector(size, trackScores);
|
||||
}
|
||||
|
||||
private class ScanCollector extends SimpleCollector {
|
||||
/**
|
||||
* Wrap the query so that it can skip directly to the right document.
|
||||
*/
|
||||
public Query wrapQuery(Query query) {
|
||||
return Queries.filtered(query, new MinDocQuery(docUpTo));
|
||||
}
|
||||
|
||||
public class ScanCollector extends SimpleCollector {
|
||||
|
||||
private final List<ScoreDoc> docs;
|
||||
|
||||
|
@ -70,7 +75,7 @@ public class ScanContext {
|
|||
|
||||
private int docBase;
|
||||
|
||||
ScanCollector(int size, boolean trackScores) {
|
||||
private ScanCollector(int size, boolean trackScores) {
|
||||
this.trackScores = trackScores;
|
||||
this.docs = new ArrayList<>(size);
|
||||
this.size = size;
|
||||
|
|
|
@ -31,8 +31,10 @@ import org.apache.lucene.search.QueryUtils;
|
|||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.elasticsearch.search.scan.ScanContext.MinDocQuery;
|
||||
import org.elasticsearch.search.scan.ScanContext.ScanCollector;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -69,6 +71,13 @@ public class ScanContextTests extends ESTestCase {
|
|||
dir.close();
|
||||
}
|
||||
|
||||
private static TopDocs execute(IndexSearcher searcher, ScanContext ctx, Query query, int pageSize, boolean trackScores) throws IOException {
|
||||
query = ctx.wrapQuery(query);
|
||||
ScanCollector collector = ctx.collector(pageSize, trackScores);
|
||||
searcher.search(query, collector);
|
||||
return collector.topDocs();
|
||||
}
|
||||
|
||||
public void testRandom() throws Exception {
|
||||
final int numDocs = randomIntBetween(10, 200);
|
||||
final Document doc1 = new Document();
|
||||
|
@ -93,10 +102,10 @@ public class ScanContextTests extends ESTestCase {
|
|||
final List<ScoreDoc> actual = new ArrayList<>();
|
||||
ScanContext context = new ScanContext();
|
||||
while (true) {
|
||||
final ScoreDoc[] page = context.execute(searcher, query, pageSize, trackScores).scoreDocs;
|
||||
final ScoreDoc[] page = execute(searcher,context, query, pageSize, trackScores).scoreDocs;
|
||||
assertTrue(page.length <= pageSize);
|
||||
if (page.length == 0) {
|
||||
assertEquals(0, context.execute(searcher, query, pageSize, trackScores).scoreDocs.length);
|
||||
assertEquals(0, execute(searcher, context, query, pageSize, trackScores).scoreDocs.length);
|
||||
break;
|
||||
}
|
||||
actual.addAll(Arrays.asList(page));
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.test;
|
|||
|
||||
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
|
||||
|
||||
import org.apache.lucene.search.Collector;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
|
@ -74,6 +75,7 @@ public class TestSearchContext extends SearchContext {
|
|||
final IndexFieldDataService indexFieldDataService;
|
||||
final BitsetFilterCache fixedBitSetFilterCache;
|
||||
final ThreadPool threadPool;
|
||||
final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
|
||||
|
||||
ContextIndexSearcher searcher;
|
||||
int size;
|
||||
|
@ -410,16 +412,6 @@ public class TestSearchContext extends SearchContext {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean queryRewritten() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchContext updateRewriteQuery(Query rewriteQuery) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int from() {
|
||||
return 0;
|
||||
|
@ -667,4 +659,9 @@ public class TestSearchContext extends SearchContext {
|
|||
|
||||
@Override
|
||||
public void copyContextAndHeadersFrom(HasContextAndHeaders other) {}
|
||||
|
||||
@Override
|
||||
public Map<Class<?>, Collector> queryCollectors() {
|
||||
return queryCollectors;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue