Automatically early terminate search query based on index sorting (#24864)

This commit refactors the query phase in order to be able
to automatically detect queries that can be early terminated.
If the index sort matches the query sort, the top docs collection is early terminated
on each segment and the computing of the total number of hits that match the query is delegated to a simple TotalHitCountCollector.
This change also adds a new parameter to the search request called `track_total_hits`.
It indicates if the total number of hits that match the query should be tracked.
If false, queries sorted by the index sort will not try to compute this information and 
and will limit the collection to the first N documents per segment.
Aggregations are not impacted and will continue to see every document
even when the index sort matches the query sort and `track_total_hits` is false.

Relates #6720
This commit is contained in:
Jim Ferenczi 2017-06-08 12:10:46 +02:00 committed by GitHub
parent 21a57c1494
commit 36a5cf8f35
30 changed files with 1575 additions and 467 deletions

View File

@ -40,7 +40,7 @@ import java.util.Collection;
abstract class CollapsingDocValuesSource<T> extends GroupSelector<T> {
protected final String field;
CollapsingDocValuesSource(String field) throws IOException {
CollapsingDocValuesSource(String field) {
this.field = field;
}
@ -58,7 +58,7 @@ abstract class CollapsingDocValuesSource<T> extends GroupSelector<T> {
private long value;
private boolean hasValue;
Numeric(String field) throws IOException {
Numeric(String field) {
super(field);
}
@ -148,7 +148,7 @@ abstract class CollapsingDocValuesSource<T> extends GroupSelector<T> {
private SortedDocValues values;
private int ord;
Keyword(String field) throws IOException {
Keyword(String field) {
super(field);
}

View File

@ -46,7 +46,7 @@ public final class CollapsingTopDocsCollector<T> extends FirstPassGroupingCollec
private final boolean trackMaxScore;
CollapsingTopDocsCollector(GroupSelector<T> groupSelector, String collapseField, Sort sort,
int topN, boolean trackMaxScore) throws IOException {
int topN, boolean trackMaxScore) {
super(groupSelector, sort, topN);
this.collapseField = collapseField;
this.trackMaxScore = trackMaxScore;
@ -60,7 +60,7 @@ public final class CollapsingTopDocsCollector<T> extends FirstPassGroupingCollec
/**
* Transform {@link FirstPassGroupingCollector#getTopGroups(int, boolean)} output in
* {@link CollapseTopFieldDocs}. The collapsing needs only one pass so we can create the final top docs at the end
* {@link CollapseTopFieldDocs}. The collapsing needs only one pass so we can get the final top docs at the end
* of the first pass.
*/
public CollapseTopFieldDocs getTopDocs() throws IOException {
@ -132,10 +132,9 @@ public final class CollapsingTopDocsCollector<T> extends FirstPassGroupingCollec
* This must be non-null, ie, if you want to groupSort by relevance
* use Sort.RELEVANCE.
* @param topN How many top groups to keep.
* @throws IOException When I/O related errors occur
*/
public static CollapsingTopDocsCollector<?> createNumeric(String collapseField, Sort sort,
int topN, boolean trackMaxScore) throws IOException {
int topN, boolean trackMaxScore) {
return new CollapsingTopDocsCollector<>(new CollapsingDocValuesSource.Numeric(collapseField),
collapseField, sort, topN, trackMaxScore);
}
@ -152,12 +151,10 @@ public final class CollapsingTopDocsCollector<T> extends FirstPassGroupingCollec
* document per collapsed key.
* This must be non-null, ie, if you want to groupSort by relevance use Sort.RELEVANCE.
* @param topN How many top groups to keep.
* @throws IOException When I/O related errors occur
*/
public static CollapsingTopDocsCollector<?> createKeyword(String collapseField, Sort sort,
int topN, boolean trackMaxScore) throws IOException {
int topN, boolean trackMaxScore) {
return new CollapsingTopDocsCollector<>(new CollapsingDocValuesSource.Keyword(collapseField),
collapseField, sort, topN, trackMaxScore);
}
}

View File

@ -829,8 +829,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.transport.SendRequestTransportException::new, 58, UNKNOWN_VERSION_ADDED),
ES_REJECTED_EXECUTION_EXCEPTION(org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class,
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException::new, 59, UNKNOWN_VERSION_ADDED),
EARLY_TERMINATION_EXCEPTION(org.elasticsearch.common.lucene.Lucene.EarlyTerminationException.class,
org.elasticsearch.common.lucene.Lucene.EarlyTerminationException::new, 60, UNKNOWN_VERSION_ADDED),
// 60 used to be for EarlyTerminationException
// 61 used to be for RoutingValidationException
NOT_SERIALIZABLE_EXCEPTION_WRAPPER(org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class,
org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper::new, 62, UNKNOWN_VERSION_ADDED),

View File

@ -405,9 +405,18 @@ public final class SearchPhaseController extends AbstractComponent {
* @param queryResults a list of non-null query shard results
*/
public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(), 0, isScrollRequest);
return reducedQueryPhase(queryResults, isScrollRequest, true);
}
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest, boolean trackTotalHits) {
return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest);
}
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
@ -711,6 +720,7 @@ public final class SearchPhaseController extends AbstractComponent {
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final boolean trackTotalHits = source == null || source.trackTotalHits();
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
@ -722,18 +732,30 @@ public final class SearchPhaseController extends AbstractComponent {
return new InitialSearchPhase.SearchPhaseResults(numShards) {
@Override
public ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest);
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);
}
};
}
static final class TopDocsStats {
final boolean trackTotalHits;
long totalHits;
long fetchHits;
float maxScore = Float.NEGATIVE_INFINITY;
TopDocsStats() {
this(true);
}
TopDocsStats(boolean trackTotalHits) {
this.trackTotalHits = trackTotalHits;
this.totalHits = trackTotalHits ? 0 : -1;
}
void add(TopDocs topDocs) {
totalHits += topDocs.totalHits;
if (trackTotalHits) {
totalHits += topDocs.totalHits;
}
fetchHits += topDocs.scoreDocs.length;
if (!Float.isNaN(topDocs.getMaxScore())) {
maxScore = Math.max(maxScore, topDocs.getMaxScore());

View File

@ -39,6 +39,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* A request to execute search against one or more indices (or all). Best created using
* {@link org.elasticsearch.client.Requests#searchRequest(String...)}.
@ -102,7 +104,12 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (source != null && source.trackTotalHits() == false && scroll() != null) {
validationException =
addValidationError("disabling [track_total_hits] is not allowed in a scroll context", validationException);
}
return validationException;
}
/**

View File

@ -363,14 +363,21 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
}
/**
* Applies when sorting, and controls if scores will be tracked as well. Defaults to
* <tt>false</tt>.
* Applies when sorting, and controls if scores will be tracked as well. Defaults to <tt>false</tt>.
*/
public SearchRequestBuilder setTrackScores(boolean trackScores) {
sourceBuilder().trackScores(trackScores);
return this;
}
/**
* Indicates if the total hit count for the query should be tracked. Defaults to <tt>true</tt>
*/
public SearchRequestBuilder setTrackTotalHits(boolean trackTotalHits) {
sourceBuilder().trackTotalHits(trackTotalHits);
return this;
}
/**
* Adds stored fields to load and return (note, it must be stored) as part of the search request.
* To disable the stored fields entirely (source and metadata fields) use {@code storedField("_none_")}.

View File

@ -246,20 +246,6 @@ public class Lucene {
}.run();
}
/**
* Wraps <code>delegate</code> with count based early termination collector with a threshold of <code>maxCountHits</code>
*/
public static final EarlyTerminatingCollector wrapCountBasedEarlyTerminatingCollector(final Collector delegate, int maxCountHits) {
return new EarlyTerminatingCollector(delegate, maxCountHits);
}
/**
* Wraps <code>delegate</code> with a time limited collector with a timeout of <code>timeoutInMillis</code>
*/
public static final TimeLimitingCollector wrapTimeLimitingCollector(final Collector delegate, final Counter counter, long timeoutInMillis) {
return new TimeLimitingCollector(delegate, counter, timeoutInMillis);
}
/**
* Check whether there is one or more documents matching the provided query.
*/
@ -618,71 +604,6 @@ public class Lucene {
}
}
/**
* This exception is thrown when {@link org.elasticsearch.common.lucene.Lucene.EarlyTerminatingCollector}
* reaches early termination
* */
public static final class EarlyTerminationException extends ElasticsearchException {
public EarlyTerminationException(String msg) {
super(msg);
}
public EarlyTerminationException(StreamInput in) throws IOException{
super(in);
}
}
/**
* A collector that terminates early by throwing {@link org.elasticsearch.common.lucene.Lucene.EarlyTerminationException}
* when count of matched documents has reached <code>maxCountHits</code>
*/
public static final class EarlyTerminatingCollector extends SimpleCollector {
private final int maxCountHits;
private final Collector delegate;
private int count = 0;
private LeafCollector leafCollector;
EarlyTerminatingCollector(final Collector delegate, int maxCountHits) {
this.maxCountHits = maxCountHits;
this.delegate = Objects.requireNonNull(delegate);
}
public int count() {
return count;
}
public boolean exists() {
return count > 0;
}
@Override
public void setScorer(Scorer scorer) throws IOException {
leafCollector.setScorer(scorer);
}
@Override
public void collect(int doc) throws IOException {
leafCollector.collect(doc);
if (++count >= maxCountHits) {
throw new EarlyTerminationException("early termination [CountBased]");
}
}
@Override
public void doSetNextReader(LeafReaderContext atomicReaderContext) throws IOException {
leafCollector = delegate.getLeafCollector(atomicReaderContext);
}
@Override
public boolean needsScores() {
return delegate.needsScores();
}
}
private Lucene() {
}

View File

@ -197,6 +197,10 @@ public class RestSearchAction extends BaseRestHandler {
searchSourceBuilder.trackScores(request.paramAsBoolean("track_scores", false));
}
if (request.hasParam("track_total_hits")) {
searchSourceBuilder.trackTotalHits(request.paramAsBoolean("track_total_hits", true));
}
String sSorts = request.param("sort");
if (sSorts != null) {
String[] sorts = Strings.splitStringByCommaToArray(sSorts);

View File

@ -114,6 +114,7 @@ final class DefaultSearchContext extends SearchContext {
private SortAndFormats sort;
private Float minimumScore;
private boolean trackScores = false; // when sorting, track scores as well...
private boolean trackTotalHits = true;
private FieldDoc searchAfter;
private CollapseContext collapse;
private boolean lowLevelCancellation;
@ -548,6 +549,17 @@ final class DefaultSearchContext extends SearchContext {
return this.trackScores;
}
@Override
public SearchContext trackTotalHits(boolean trackTotalHits) {
this.trackTotalHits = trackTotalHits;
return this;
}
@Override
public boolean trackTotalHits() {
return trackTotalHits;
}
@Override
public SearchContext searchAfter(FieldDoc searchAfter) {
this.searchAfter = searchAfter;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -178,7 +179,17 @@ public final class SearchHits implements Streamable, ToXContent, Iterable<Search
@Override
public void readFrom(StreamInput in) throws IOException {
totalHits = in.readVLong();
final boolean hasTotalHits;
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) {
hasTotalHits = in.readBoolean();
} else {
hasTotalHits = true;
}
if (hasTotalHits) {
totalHits = in.readVLong();
} else {
totalHits = -1;
}
maxScore = in.readFloat();
int size = in.readVInt();
if (size == 0) {
@ -193,7 +204,17 @@ public final class SearchHits implements Streamable, ToXContent, Iterable<Search
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalHits);
final boolean hasTotalHits;
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) {
hasTotalHits = totalHits >= 0;
out.writeBoolean(hasTotalHits);
} else {
assert totalHits >= 0;
hasTotalHits = true;
}
if (hasTotalHits) {
out.writeVLong(totalHits);
}
out.writeFloat(maxScore);
out.writeVInt(hits.length);
if (hits.length > 0) {

View File

@ -635,6 +635,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
context.trackScores(source.trackScores());
if (source.trackTotalHits() == false && context.scrollContext() != null) {
throw new SearchContextException(context, "disabling [track_total_hits] is not allowed in a scroll context");
}
context.trackTotalHits(source.trackTotalHits());
if (source.minScore() != null) {
context.minimumScore(source.minScore());
}

View File

@ -92,6 +92,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
public static final ParseField IGNORE_FAILURE_FIELD = new ParseField("ignore_failure");
public static final ParseField SORT_FIELD = new ParseField("sort");
public static final ParseField TRACK_SCORES_FIELD = new ParseField("track_scores");
public static final ParseField TRACK_TOTAL_HITS_FIELD = new ParseField("track_total_hits");
public static final ParseField INDICES_BOOST_FIELD = new ParseField("indices_boost");
public static final ParseField AGGREGATIONS_FIELD = new ParseField("aggregations");
public static final ParseField AGGS_FIELD = new ParseField("aggs");
@ -142,6 +143,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
private boolean trackScores = false;
private boolean trackTotalHits = true;
private SearchAfterBuilder searchAfterBuilder;
private SliceBuilder sliceBuilder;
@ -224,6 +227,11 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
collapse = in.readOptionalWriteable(CollapseBuilder::new);
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) {
trackTotalHits = in.readBoolean();
} else {
trackTotalHits = true;
}
}
@Override
@ -275,6 +283,9 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
if (out.getVersion().onOrAfter(Version.V_5_3_0)) {
out.writeOptionalWriteable(collapse);
}
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) {
out.writeBoolean(trackTotalHits);
}
}
/**
@ -489,6 +500,17 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
return trackScores;
}
/**
* Indicates if the total hit count for the query should be tracked.
*/
public boolean trackTotalHits() {
return trackTotalHits;
}
public SearchSourceBuilder trackTotalHits(boolean trackTotalHits) {
this.trackTotalHits = trackTotalHits;
return this;
}
/**
* The sort values that indicates which docs this request should "search after".
@ -926,6 +948,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
rewrittenBuilder.terminateAfter = terminateAfter;
rewrittenBuilder.timeout = timeout;
rewrittenBuilder.trackScores = trackScores;
rewrittenBuilder.trackTotalHits = trackTotalHits;
rewrittenBuilder.version = version;
rewrittenBuilder.collapse = collapse;
return rewrittenBuilder;
@ -964,6 +987,8 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
explain = parser.booleanValue();
} else if (TRACK_SCORES_FIELD.match(currentFieldName)) {
trackScores = parser.booleanValue();
} else if (TRACK_TOTAL_HITS_FIELD.match(currentFieldName)) {
trackTotalHits = parser.booleanValue();
} else if (_SOURCE_FIELD.match(currentFieldName)) {
fetchSourceContext = FetchSourceContext.fromXContent(context.parser());
} else if (STORED_FIELDS_FIELD.match(currentFieldName)) {
@ -1174,6 +1199,10 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
builder.field(TRACK_SCORES_FIELD.getPreferredName(), true);
}
if (trackTotalHits == false) {
builder.field(TRACK_TOTAL_HITS_FIELD.getPreferredName(), false);
}
if (searchAfterBuilder != null) {
builder.array(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues());
}
@ -1433,7 +1462,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
return Objects.hash(aggregations, explain, fetchSourceContext, docValueFields, storedFieldsContext, from, highlightBuilder,
indexBoosts, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields, size,
sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeout, trackScores, version,
profile, extBuilders, collapse);
profile, extBuilders, collapse, trackTotalHits);
}
@Override
@ -1470,6 +1499,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
&& Objects.equals(version, other.version)
&& Objects.equals(profile, other.profile)
&& Objects.equals(extBuilders, other.extBuilders)
&& Objects.equals(collapse, other.collapse);
&& Objects.equals(collapse, other.collapse)
&& Objects.equals(trackTotalHits, other.trackTotalHits);
}
}

View File

@ -56,7 +56,7 @@ public class CollapseContext {
return innerHits;
}
public CollapsingTopDocsCollector<?> createTopDocs(Sort sort, int topN, boolean trackMaxScore) throws IOException {
public CollapsingTopDocsCollector<?> createTopDocs(Sort sort, int topN, boolean trackMaxScore) {
if (fieldType instanceof KeywordFieldMapper.KeywordFieldType) {
return CollapsingTopDocsCollector.createKeyword(fieldType.name(), sort, topN, trackMaxScore);
} else if (fieldType instanceof NumberFieldMapper.NumberFieldType) {

View File

@ -321,6 +321,16 @@ public abstract class FilteredSearchContext extends SearchContext {
return in.trackScores();
}
@Override
public SearchContext trackTotalHits(boolean trackTotalHits) {
return in.trackTotalHits(trackTotalHits);
}
@Override
public boolean trackTotalHits() {
return in.trackTotalHits();
}
@Override
public SearchContext searchAfter(FieldDoc searchAfter) {
return in.searchAfter(searchAfter);

View File

@ -240,6 +240,13 @@ public abstract class SearchContext extends AbstractRefCounted implements Releas
public abstract boolean trackScores();
public abstract SearchContext trackTotalHits(boolean trackTotalHits);
/**
* Indicates if the total hit count for the query should be tracked. Defaults to <tt>true</tt>
*/
public abstract boolean trackTotalHits();
public abstract SearchContext searchAfter(FieldDoc searchAfter);
public abstract FieldDoc searchAfter();

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.query;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FilterCollector;
import org.apache.lucene.search.FilterLeafCollector;
import org.apache.lucene.search.LeafCollector;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
*/
public class EarlyTerminatingCollector extends FilterCollector {
private final int maxCountHits;
private int numCollected;
private boolean terminatedEarly = false;
EarlyTerminatingCollector(final Collector delegate, int maxCountHits) {
super(delegate);
this.maxCountHits = maxCountHits;
}
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (numCollected >= maxCountHits) {
throw new CollectionTerminatedException();
}
return new FilterLeafCollector(super.getLeafCollector(context)) {
@Override
public void collect(int doc) throws IOException {
super.collect(doc);
if (++numCollected >= maxCountHits) {
terminatedEarly = true;
throw new CollectionTerminatedException();
}
};
};
}
public boolean terminatedEarly() {
return terminatedEarly;
}
}

View File

@ -0,0 +1,275 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.query;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.EarlyTerminatingSortingCollector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Counter;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector;
import org.elasticsearch.search.profile.query.InternalProfileCollector;
import org.elasticsearch.tasks.TaskCancelledException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.IntSupplier;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_CANCELLED;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MIN_SCORE;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MULTI;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_POST_FILTER;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_TIMEOUT;
import static org.elasticsearch.search.query.TopDocsCollectorContext.shortcutTotalHitCount;
abstract class QueryCollectorContext {
private String profilerName;
QueryCollectorContext(String profilerName) {
this.profilerName = profilerName;
}
/**
* Creates a collector that delegates documents to the provided <code>in</code> collector.
* @param in The delegate collector
*/
abstract Collector create(Collector in) throws IOException;
/**
* Wraps this collector with a profiler
*/
protected InternalProfileCollector createWithProfiler(InternalProfileCollector in) throws IOException {
final Collector collector = create(in);
return new InternalProfileCollector(collector, profilerName, in != null ? Collections.singletonList(in) : Collections.emptyList());
}
/**
* A value of <code>false</code> indicates that the underlying collector can infer
* its results directly from the context (search is not needed).
* Default to true (search is needed).
*/
boolean shouldCollect() {
return true;
}
/**
* Post-process <code>result</code> after search execution.
*
* @param result The query search result to populate
* @param hasCollected True if search was executed
*/
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {}
/**
* Creates the collector tree from the provided <code>collectors</code>
* @param collectors Ordered list of collector context
*/
static Collector createQueryCollector(List<QueryCollectorContext> collectors) throws IOException {
Collector collector = null;
for (QueryCollectorContext ctx : collectors) {
collector = ctx.create(collector);
}
return collector;
}
/**
* Creates the collector tree from the provided <code>collectors</code> and wraps each collector with a profiler
* @param collectors Ordered list of collector context
*/
static InternalProfileCollector createQueryCollectorWithProfiler(List<QueryCollectorContext> collectors) throws IOException {
InternalProfileCollector collector = null;
for (QueryCollectorContext ctx : collectors) {
collector = ctx.createWithProfiler(collector);
}
return collector;
}
/**
* Filters documents with a query score greater than <code>minScore</code>
* @param minScore The minimum score filter
*/
static QueryCollectorContext createMinScoreCollectorContext(float minScore) {
return new QueryCollectorContext(REASON_SEARCH_MIN_SCORE) {
@Override
Collector create(Collector in) {
return new MinimumScoreCollector(in, minScore);
}
};
}
/**
* Filters documents based on the provided <code>query</code>
*/
static QueryCollectorContext createFilteredCollectorContext(IndexSearcher searcher, Query query) {
return new QueryCollectorContext(REASON_SEARCH_POST_FILTER) {
@Override
Collector create(Collector in ) throws IOException {
final Weight filterWeight = searcher.createNormalizedWeight(query, false);
return new FilteredCollector(in, filterWeight);
}
};
}
/**
* Creates a multi collector from the provided <code>subs</code>
*/
static QueryCollectorContext createMultiCollectorContext(Collection<Collector> subs) {
return new QueryCollectorContext(REASON_SEARCH_MULTI) {
@Override
Collector create(Collector in) throws IOException {
List<Collector> subCollectors = new ArrayList<> ();
subCollectors.add(in);
subCollectors.addAll(subs);
return MultiCollector.wrap(subCollectors);
}
@Override
protected InternalProfileCollector createWithProfiler(InternalProfileCollector in) throws IOException {
final List<InternalProfileCollector> subCollectors = new ArrayList<> ();
subCollectors.add(in);
if (subs.stream().anyMatch((col) -> col instanceof InternalProfileCollector == false)) {
throw new IllegalArgumentException("non-profiling collector");
}
for (Collector collector : subs) {
subCollectors.add((InternalProfileCollector) collector);
}
final Collector collector = MultiCollector.wrap(subCollectors);
return new InternalProfileCollector(collector, REASON_SEARCH_MULTI, subCollectors);
}
};
}
/**
* Creates a time limiting collector limiting the collection to <code>timeOutMillis</code>ms.
*/
static QueryCollectorContext createTimeoutCollectorContext(Counter timeEstimate, long timeoutMillis) {
return new QueryCollectorContext(REASON_SEARCH_TIMEOUT) {
@Override
Collector create(Collector in) throws IOException {
return new TimeLimitingCollector(in, timeEstimate, timeoutMillis);
}
@Override
boolean shouldCollect() {
return false;
}
};
}
/**
* Creates a collector that throws {@link TaskCancelledException} if the search is cancelled
*/
static QueryCollectorContext createCancellableCollectorContext(Provider<Boolean> cancelled, boolean lowLevelCancellation) {
return new QueryCollectorContext(REASON_SEARCH_CANCELLED) {
@Override
Collector create(Collector in) throws IOException {
return new CancellableCollector(cancelled, lowLevelCancellation, in);
}
@Override
boolean shouldCollect() {
return false;
}
};
}
/**
* Creates collector limiting the collection to the first <code>numHits</code> documents
*/
static QueryCollectorContext createEarlyTerminationCollectorContext(int numHits) {
return new QueryCollectorContext(REASON_SEARCH_TERMINATE_AFTER_COUNT) {
private EarlyTerminatingCollector collector;
@Override
Collector create(Collector in) throws IOException {
assert collector == null;
this.collector = new EarlyTerminatingCollector(in, numHits);
return collector;
}
@Override
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {
if (hasCollected && collector.terminatedEarly()) {
result.terminatedEarly(true);
}
}
};
}
/**
* Creates a sorting termination collector limiting the collection to the first <code>numHits</code> per segment.
* The total hit count matching the query is also computed if <code>trackTotalHits</code> is true.
*/
static QueryCollectorContext createEarlySortingTerminationCollectorContext(IndexReader reader,
Query query,
Sort indexSort,
int numHits,
boolean trackTotalHits,
boolean shouldCollect) {
return new QueryCollectorContext(REASON_SEARCH_TERMINATE_AFTER_COUNT) {
private BooleanSupplier terminatedEarlySupplier;
private IntSupplier countSupplier = null;
@Override
Collector create(Collector in) throws IOException {
EarlyTerminatingSortingCollector sortingCollector = new EarlyTerminatingSortingCollector(in, indexSort, numHits);
terminatedEarlySupplier = sortingCollector::terminatedEarly;
Collector collector = sortingCollector;
if (trackTotalHits) {
int count = shouldCollect ? -1 : shortcutTotalHitCount(reader, query);
if (count == -1) {
TotalHitCountCollector countCollector = new TotalHitCountCollector();
collector = MultiCollector.wrap(sortingCollector, countCollector);
this.countSupplier = countCollector::getTotalHits;
} else {
this.countSupplier = () -> count;
}
}
return collector;
}
@Override
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {
if (terminatedEarlySupplier.getAsBoolean()) {
result.terminatedEarly(true);
}
if (countSupplier != null) {
final TopDocs topDocs = result.topDocs();
topDocs.totalHits = countSupplier.getAsInt();
result.topDocs(topDocs, result.sortValueFormats());
}
}
};
}
}

View File

@ -19,35 +19,23 @@
package org.elasticsearch.search.query;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.queries.MinDocQuery;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.EarlyTerminatingSortingCollector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.grouping.CollapsingTopDocsCollector;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.collapse.CollapseContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchPhase;
import org.elasticsearch.search.SearchService;
@ -56,18 +44,22 @@ import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.profile.query.CollectorResult;
import org.elasticsearch.search.profile.query.InternalProfileCollector;
import org.elasticsearch.search.rescore.RescorePhase;
import org.elasticsearch.search.rescore.RescoreSearchContext;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.suggest.SuggestPhase;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import static org.elasticsearch.search.query.QueryCollectorContext.createCancellableCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createEarlySortingTerminationCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createMultiCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createTimeoutCollectorContext;
import static org.elasticsearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;
/**
* Query phase of a search request, used to run the query and get back from each shard information about the matching documents
@ -104,8 +96,9 @@ public class QueryPhase implements SearchPhase {
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
// here to make sure it happens during the QUERY phase
aggregationPhase.preProcess(searchContext);
boolean rescore = execute(searchContext, searchContext.searcher());
Sort indexSort = searchContext.mapperService().getIndexSettings().getIndexSortConfig()
.buildIndexSort(searchContext.mapperService()::fullName, searchContext.fieldData()::getForField);
boolean rescore = execute(searchContext, searchContext.searcher(), indexSort);
if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
@ -120,298 +113,149 @@ public class QueryPhase implements SearchPhase {
}
}
private static boolean returnsDocsInOrder(Query query, SortAndFormats sf) {
/**
* In a package-private method so that it can be tested without having to
* wire everything (mapperService, etc.)
* @return whether the rescoring phase should be executed
*/
static boolean execute(SearchContext searchContext, final IndexSearcher searcher, @Nullable Sort indexSort) throws QueryPhaseExecutionException {
QuerySearchResult queryResult = searchContext.queryResult();
queryResult.searchTimedOut(false);
try {
queryResult.from(searchContext.from());
queryResult.size(searchContext.size());
Query query = searchContext.query();
assert query == searcher.rewrite(query); // already rewritten
final ScrollContext scrollContext = searchContext.scrollContext();
if (scrollContext != null) {
if (returnsDocsInOrder(query, searchContext.sort())) {
if (scrollContext.totalHits == -1) {
// first round
assert scrollContext.lastEmittedDoc == null;
// there is not much that we can optimize here since we want to collect all
// documents in order to get the total number of hits
} else {
// now this gets interesting: since we sort in index-order, we can directly
// skip to the desired doc
final ScoreDoc after = scrollContext.lastEmittedDoc;
if (after != null) {
BooleanQuery bq = new BooleanQuery.Builder()
.add(query, BooleanClause.Occur.MUST)
.add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
.build();
query = bq;
}
// ... and stop collecting after ${size} matches
searchContext.terminateAfter(searchContext.size());
searchContext.trackTotalHits(false);
}
}
}
final LinkedList<QueryCollectorContext> collectors = new LinkedList<>();
if (searchContext.parsedPostFilter() != null) {
// add post filters before aggregations
// it will only be applied to top hits
collectors.add(createFilteredCollectorContext(searcher, searchContext.parsedPostFilter().query()));
}
if (searchContext.queryCollectors().isEmpty() == false) {
// plug in additional collectors, like aggregations
collectors.add(createMultiCollectorContext(searchContext.queryCollectors().values()));
}
if (searchContext.minimumScore() != null) {
// apply the minimum score after multi collector so we filter aggs as well
collectors.add(createMinScoreCollectorContext(searchContext.minimumScore()));
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
// apply terminate after after all filters collectors
collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
}
boolean timeoutSet = scrollContext == null && searchContext.timeout() != null &&
searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;
if (timeoutSet) {
// TODO: change to use our own counter that uses the scheduler in ThreadPool
// throws TimeLimitingCollector.TimeExceededException when timeout has reached
collectors.add(createTimeoutCollectorContext(searchContext.timeEstimateCounter(), searchContext.timeout().millis()));
}
// add cancellable
collectors.add(createCancellableCollectorContext(searchContext.getTask()::isCancelled, searchContext.lowLevelCancellation()));
final IndexReader reader = searcher.getIndexReader();
final boolean doProfile = searchContext.getProfilers() != null;
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader,
collectors.stream().anyMatch(QueryCollectorContext::shouldCollect));
final boolean shouldCollect = topDocsFactory.shouldCollect();
if (scrollContext == null && topDocsFactory.numHits() > 0 && canEarlyTerminate(indexSort, searchContext)) {
// top docs collection can be early terminated based on index sort
// add the collector context first so we don't early terminate aggs but only top docs
collectors.addFirst(createEarlySortingTerminationCollectorContext(reader, searchContext.query(), indexSort,
topDocsFactory.numHits(), searchContext.trackTotalHits(), shouldCollect));
}
// add the top docs collector, the first collector context in the chain
collectors.addFirst(topDocsFactory);
final Collector queryCollector;
if (doProfile) {
InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
queryCollector = profileCollector;
} else {
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
try {
if (shouldCollect) {
searcher.search(query, queryCollector);
}
} catch (TimeLimitingCollector.TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
queryResult.searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
final QuerySearchResult result = searchContext.queryResult();
for (QueryCollectorContext ctx : collectors) {
ctx.postProcess(result, shouldCollect);
}
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());
result.profileResults(shardResults);
}
return topDocsFactory.shouldRescore();
} catch (Exception e) {
throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
}
}
/**
* Returns true if the provided <code>query</code> returns docs in index order (internal doc ids).
* @param query The query to execute
* @param sf The query sort
*/
static boolean returnsDocsInOrder(Query query, SortAndFormats sf) {
if (sf == null || Sort.RELEVANCE.equals(sf.sort)) {
// sort by score
// queries that return constant scores will return docs in index
// order since Lucene tie-breaks on the doc id
return query.getClass() == ConstantScoreQuery.class
|| query.getClass() == MatchAllDocsQuery.class;
|| query.getClass() == MatchAllDocsQuery.class;
} else {
return Sort.INDEXORDER.equals(sf.sort);
}
}
/**
* In a package-private method so that it can be tested without having to
* wire everything (mapperService, etc.)
* @return whether the rescoring phase should be executed
* Returns true if the provided <code>searchContext</code> can early terminate based on <code>indexSort</code>
* @param indexSort The index sort specification
* @param context The search context for the request
*/
static boolean execute(SearchContext searchContext, final IndexSearcher searcher) throws QueryPhaseExecutionException {
QuerySearchResult queryResult = searchContext.queryResult();
queryResult.searchTimedOut(false);
final boolean doProfile = searchContext.getProfilers() != null;
boolean rescore = false;
try {
queryResult.from(searchContext.from());
queryResult.size(searchContext.size());
Query query = searchContext.query();
final int totalNumDocs = searcher.getIndexReader().numDocs();
int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
Collector collector;
Callable<TopDocs> topDocsCallable;
DocValueFormat[] sortValueFormats = new DocValueFormat[0];
assert query == searcher.rewrite(query); // already rewritten
if (searchContext.size() == 0) { // no matter what the value of from is
final TotalHitCountCollector totalHitCountCollector = new TotalHitCountCollector();
collector = totalHitCountCollector;
if (searchContext.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_COUNT, Collections.emptyList());
}
topDocsCallable = () -> new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
} else {
// Perhaps have a dedicated scroll phase?
final ScrollContext scrollContext = searchContext.scrollContext();
assert (scrollContext != null) == (searchContext.request().scroll() != null);
final Collector topDocsCollector;
ScoreDoc after = null;
if (searchContext.request().scroll() != null) {
numDocs = Math.min(searchContext.size(), totalNumDocs);
after = scrollContext.lastEmittedDoc;
if (returnsDocsInOrder(query, searchContext.sort())) {
if (scrollContext.totalHits == -1) {
// first round
assert scrollContext.lastEmittedDoc == null;
// there is not much that we can optimize here since we want to collect all
// documents in order to get the total number of hits
} else {
// now this gets interesting: since we sort in index-order, we can directly
// skip to the desired doc and stop collecting after ${size} matches
if (scrollContext.lastEmittedDoc != null) {
BooleanQuery bq = new BooleanQuery.Builder()
.add(query, BooleanClause.Occur.MUST)
.add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
.build();
query = bq;
}
searchContext.terminateAfter(numDocs);
}
}
} else {
after = searchContext.searchAfter();
}
if (totalNumDocs == 0) {
// top collectors don't like a size of 0
numDocs = 1;
}
assert numDocs > 0;
if (searchContext.collapse() == null) {
if (searchContext.sort() != null) {
SortAndFormats sf = searchContext.sort();
topDocsCollector = TopFieldCollector.create(sf.sort, numDocs,
(FieldDoc) after, true, searchContext.trackScores(), searchContext.trackScores());
sortValueFormats = sf.formats;
} else {
rescore = !searchContext.rescore().isEmpty();
for (RescoreSearchContext rescoreContext : searchContext.rescore()) {
numDocs = Math.max(rescoreContext.window(), numDocs);
}
topDocsCollector = TopScoreDocCollector.create(numDocs, after);
}
} else {
Sort sort = Sort.RELEVANCE;
if (searchContext.sort() != null) {
sort = searchContext.sort().sort;
}
CollapseContext collapse = searchContext.collapse();
topDocsCollector = collapse.createTopDocs(sort, numDocs, searchContext.trackScores());
if (searchContext.sort() == null) {
sortValueFormats = new DocValueFormat[] {DocValueFormat.RAW};
} else {
sortValueFormats = searchContext.sort().formats;
}
}
collector = topDocsCollector;
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TOP_HITS, Collections.emptyList());
}
topDocsCallable = () -> {
final TopDocs topDocs;
if (topDocsCollector instanceof TopDocsCollector) {
topDocs = ((TopDocsCollector<?>) topDocsCollector).topDocs();
} else if (topDocsCollector instanceof CollapsingTopDocsCollector) {
topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs();
} else {
throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName());
}
if (scrollContext != null) {
if (scrollContext.totalHits == -1) {
// first round
scrollContext.totalHits = topDocs.totalHits;
scrollContext.maxScore = topDocs.getMaxScore();
} else {
// subsequent round: the total number of hits and
// the maximum score were computed on the first round
topDocs.totalHits = scrollContext.totalHits;
topDocs.setMaxScore(scrollContext.maxScore);
}
if (searchContext.request().numberOfShards() == 1) {
// if we fetch the document in the same roundtrip, we already know the last emitted doc
if (topDocs.scoreDocs.length > 0) {
// set the last emitted doc
scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
}
}
}
return topDocs;
};
}
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
if (terminateAfterSet) {
final Collector child = collector;
// throws Lucene.EarlyTerminationException when given count is reached
collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT,
Collections.singletonList((InternalProfileCollector) child));
}
}
if (searchContext.parsedPostFilter() != null) {
final Collector child = collector;
// this will only get applied to the actual search collector and not
// to any scoped collectors, also, it will only be applied to the main collector
// since that is where the filter should only work
final Weight filterWeight = searcher.createNormalizedWeight(searchContext.parsedPostFilter().query(), false);
collector = new FilteredCollector(collector, filterWeight);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_POST_FILTER,
Collections.singletonList((InternalProfileCollector) child));
}
}
// plug in additional collectors, like aggregations
final List<Collector> subCollectors = new ArrayList<>();
subCollectors.add(collector);
subCollectors.addAll(searchContext.queryCollectors().values());
collector = MultiCollector.wrap(subCollectors);
if (doProfile && collector instanceof InternalProfileCollector == false) {
// When there is a single collector to wrap, MultiCollector returns it
// directly, so only wrap in the case that there are several sub collectors
final List<InternalProfileCollector> children = new AbstractList<InternalProfileCollector>() {
@Override
public InternalProfileCollector get(int index) {
return (InternalProfileCollector) subCollectors.get(index);
}
@Override
public int size() {
return subCollectors.size();
}
};
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_MULTI, children);
}
// apply the minimum score after multi collector so we filter aggs as well
if (searchContext.minimumScore() != null) {
final Collector child = collector;
collector = new MinimumScoreCollector(collector, searchContext.minimumScore());
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_MIN_SCORE,
Collections.singletonList((InternalProfileCollector) child));
}
}
if (collector.getClass() == TotalHitCountCollector.class) {
// Optimize counts in simple cases to return in constant time
// instead of using a collector
while (true) {
// remove wrappers that don't matter for counts
// this is necessary so that we don't only optimize match_all
// queries but also match_all queries that are nested in
// a constant_score query
if (query instanceof ConstantScoreQuery) {
query = ((ConstantScoreQuery) query).getQuery();
} else {
break;
}
}
if (query.getClass() == MatchAllDocsQuery.class) {
collector = null;
topDocsCallable = new Callable<TopDocs>() {
@Override
public TopDocs call() throws Exception {
int count = searcher.getIndexReader().numDocs();
return new TopDocs(count, Lucene.EMPTY_SCORE_DOCS, 0);
}
};
} else if (query.getClass() == TermQuery.class && searcher.getIndexReader().hasDeletions() == false) {
final Term term = ((TermQuery) query).getTerm();
collector = null;
topDocsCallable = new Callable<TopDocs>() {
@Override
public TopDocs call() throws Exception {
int count = 0;
for (LeafReaderContext context : searcher.getIndexReader().leaves()) {
count += context.reader().docFreq(term);
}
return new TopDocs(count, Lucene.EMPTY_SCORE_DOCS, 0);
}
};
}
}
final boolean timeoutSet = searchContext.timeout() != null && !searchContext.timeout().equals(SearchService.NO_TIMEOUT);
if (timeoutSet && collector != null) { // collector might be null if no collection is actually needed
final Collector child = collector;
// TODO: change to use our own counter that uses the scheduler in ThreadPool
// throws TimeLimitingCollector.TimeExceededException when timeout has reached
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeEstimateCounter(), searchContext.timeout().millis());
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TIMEOUT,
Collections.singletonList((InternalProfileCollector) child));
}
}
if (collector != null) {
final Collector child = collector;
collector = new CancellableCollector(searchContext.getTask()::isCancelled, searchContext.lowLevelCancellation(), collector);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_CANCELLED,
Collections.singletonList((InternalProfileCollector) child));
}
}
try {
if (collector != null) {
if (doProfile) {
searchContext.getProfilers().getCurrentQueryProfiler().setCollector((InternalProfileCollector) collector);
}
searcher.search(query, collector);
}
} catch (TimeLimitingCollector.TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
queryResult.searchTimedOut(true);
} catch (Lucene.EarlyTerminationException e) {
assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set";
queryResult.terminatedEarly(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
if (terminateAfterSet && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
queryResult.topDocs(topDocsCallable.call(), sortValueFormats);
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults
.buildShardResults(searchContext.getProfilers());
searchContext.queryResult().profileResults(shardResults);
}
return rescore;
} catch (Exception e) {
throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
}
static boolean canEarlyTerminate(Sort indexSort, SearchContext context) {
final Sort sort = context.sort() == null ? Sort.RELEVANCE : context.sort().sort;
return indexSort != null && EarlyTerminatingSortingCollector.canEarlyTerminate(sort, indexSort);
}
}

View File

@ -0,0 +1,306 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.query;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BoostQuery;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.grouping.CollapsingTopDocsCollector;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.collapse.CollapseContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.rescore.RescoreSearchContext;
import org.elasticsearch.search.sort.SortAndFormats;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_COUNT;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_TOP_HITS;
/**
* A {@link QueryCollectorContext} that creates top docs collector
*/
abstract class TopDocsCollectorContext extends QueryCollectorContext {
protected final int numHits;
TopDocsCollectorContext(String profilerName, int numHits) {
super(profilerName);
this.numHits = numHits;
}
/**
* Returns the number of top docs to retrieve
*/
final int numHits() {
return numHits;
}
/**
* Returns true if the top docs should be re-scored after initial search
*/
boolean shouldRescore() {
return false;
}
static class TotalHitCountCollectorContext extends TopDocsCollectorContext {
private final TotalHitCountCollector collector;
private final int hitCount;
/**
* Ctr
* @param reader The index reader
* @param query The query to execute
* @param shouldCollect True if any previous collector context in the chain forces the search to be executed, false otherwise
*/
private TotalHitCountCollectorContext(IndexReader reader, Query query, boolean shouldCollect) throws IOException {
super(REASON_SEARCH_COUNT, 0);
this.collector = new TotalHitCountCollector();
// implicit total hit counts are valid only when there is no filter collector in the chain
// so we check the shortcut only if shouldCollect is true
this.hitCount = shouldCollect ? -1 : shortcutTotalHitCount(reader, query);
}
@Override
boolean shouldCollect() {
return hitCount == -1;
}
Collector create(Collector in) {
assert in == null;
return collector;
}
@Override
void postProcess(QuerySearchResult result, boolean hasCollected) {
final int totalHitCount;
if (hasCollected) {
totalHitCount = collector.getTotalHits();
} else {
assert hitCount != -1;
totalHitCount = hitCount;
}
result.topDocs(new TopDocs(totalHitCount, Lucene.EMPTY_SCORE_DOCS, 0), null);
}
}
static class CollapsingTopDocsCollectorContext extends TopDocsCollectorContext {
private final DocValueFormat[] sortFmt;
private final CollapsingTopDocsCollector<?> topDocsCollector;
/**
* Ctr
* @param collapseContext The collapsing context
* @param sortAndFormats The query sort
* @param numHits The number of collapsed top hits to retrieve.
* @param trackMaxScore True if max score should be tracked
*/
private CollapsingTopDocsCollectorContext(CollapseContext collapseContext,
@Nullable SortAndFormats sortAndFormats,
int numHits,
boolean trackMaxScore) {
super(REASON_SEARCH_TOP_HITS, numHits);
assert numHits > 0;
assert collapseContext != null;
Sort sort = sortAndFormats == null ? Sort.RELEVANCE : sortAndFormats.sort;
this.sortFmt = sortAndFormats == null ? new DocValueFormat[] { DocValueFormat.RAW } : sortAndFormats.formats;
this.topDocsCollector = collapseContext.createTopDocs(sort, numHits, trackMaxScore);
}
@Override
Collector create(Collector in) throws IOException {
assert in == null;
return topDocsCollector;
}
@Override
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {
assert hasCollected;
result.topDocs(topDocsCollector.getTopDocs(), sortFmt);
}
}
abstract static class SimpleTopDocsCollectorContext extends TopDocsCollectorContext {
private final @Nullable SortAndFormats sortAndFormats;
private final TopDocsCollector<?> topDocsCollector;
/**
* Ctr
* @param sortAndFormats The query sort
* @param numHits The number of top hits to retrieve
* @param searchAfter The doc this request should "search after"
* @param trackMaxScore True if max score should be tracked
*/
private SimpleTopDocsCollectorContext(@Nullable SortAndFormats sortAndFormats,
@Nullable ScoreDoc searchAfter,
int numHits,
boolean trackMaxScore) throws IOException {
super(REASON_SEARCH_TOP_HITS, numHits);
this.sortAndFormats = sortAndFormats;
if (sortAndFormats == null) {
this.topDocsCollector = TopScoreDocCollector.create(numHits, searchAfter);
} else {
this.topDocsCollector = TopFieldCollector.create(sortAndFormats.sort, numHits,
(FieldDoc) searchAfter, true, trackMaxScore, trackMaxScore);
}
}
@Override
Collector create(Collector in) {
assert in == null;
return topDocsCollector;
}
@Override
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {
assert hasCollected;
final TopDocs topDocs = topDocsCollector.topDocs();
result.topDocs(topDocs, sortAndFormats == null ? null : sortAndFormats.formats);
}
}
static class ScrollingTopDocsCollectorContext extends SimpleTopDocsCollectorContext {
private final ScrollContext scrollContext;
private final int numberOfShards;
private ScrollingTopDocsCollectorContext(ScrollContext scrollContext,
@Nullable SortAndFormats sortAndFormats,
int numHits,
boolean trackMaxScore,
int numberOfShards) throws IOException {
super(sortAndFormats, scrollContext.lastEmittedDoc, numHits, trackMaxScore);
this.scrollContext = Objects.requireNonNull(scrollContext);
this.numberOfShards = numberOfShards;
}
@Override
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {
super.postProcess(result, hasCollected);
final TopDocs topDocs = result.topDocs();
if (scrollContext.totalHits == -1) {
// first round
scrollContext.totalHits = topDocs.totalHits;
scrollContext.maxScore = topDocs.getMaxScore();
} else {
// subsequent round: the total number of hits and
// the maximum score were computed on the first round
topDocs.totalHits = scrollContext.totalHits;
topDocs.setMaxScore(scrollContext.maxScore);
}
if (numberOfShards == 1) {
// if we fetch the document in the same roundtrip, we already know the last emitted doc
if (topDocs.scoreDocs.length > 0) {
// set the last emitted doc
scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
}
}
result.topDocs(topDocs, result.sortValueFormats());
}
}
/**
* Returns query total hit count if the <code>query</code> is a {@link MatchAllDocsQuery}
* or a {@link TermQuery} and the <code>reader</code> has no deletions,
* -1 otherwise.
*/
static int shortcutTotalHitCount(IndexReader reader, Query query) throws IOException {
while (true) {
// remove wrappers that don't matter for counts
// this is necessary so that we don't only optimize match_all
// queries but also match_all queries that are nested in
// a constant_score query
if (query instanceof ConstantScoreQuery) {
query = ((ConstantScoreQuery) query).getQuery();
} else if (query instanceof BoostQuery) {
query = ((BoostQuery) query).getQuery();
} else {
break;
}
}
if (query.getClass() == MatchAllDocsQuery.class) {
return reader.numDocs();
} else if (query.getClass() == TermQuery.class && reader.hasDeletions() == false) {
final Term term = ((TermQuery) query).getTerm();
int count = 0;
for (LeafReaderContext context : reader.leaves()) {
count += context.reader().docFreq(term);
}
return count;
} else {
return -1;
}
}
/**
* Creates a {@link TopDocsCollectorContext} from the provided <code>searchContext</code>
*/
static TopDocsCollectorContext createTopDocsCollectorContext(SearchContext searchContext,
IndexReader reader,
boolean shouldCollect) throws IOException {
final Query query = searchContext.query();
// top collectors don't like a size of 0
final int totalNumDocs = Math.max(1, reader.numDocs());
if (searchContext.size() == 0) {
// no matter what the value of from is
return new TotalHitCountCollectorContext(reader, query, shouldCollect);
} else if (searchContext.scrollContext() != null) {
// no matter what the value of from is
int numDocs = Math.min(searchContext.size(), totalNumDocs);
return new ScrollingTopDocsCollectorContext(searchContext.scrollContext(),
searchContext.sort(), numDocs, searchContext.trackScores(), searchContext.numberOfShards());
} else if (searchContext.collapse() != null) {
int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
return new CollapsingTopDocsCollectorContext(searchContext.collapse(),
searchContext.sort(), numDocs, searchContext.trackScores());
} else {
int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
final boolean rescore = searchContext.rescore().isEmpty() == false;
for (RescoreSearchContext rescoreContext : searchContext.rescore()) {
numDocs = Math.max(numDocs, rescoreContext.window());
}
return new SimpleTopDocsCollectorContext(searchContext.sort(),
searchContext.searchAfter(),
numDocs,
searchContext.trackScores()) {
@Override
boolean shouldRescore() {
return rescore;
}
};
}
}
}

View File

@ -720,7 +720,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(57, org.elasticsearch.indices.IndexTemplateMissingException.class);
ids.put(58, org.elasticsearch.transport.SendRequestTransportException.class);
ids.put(59, org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class);
ids.put(60, org.elasticsearch.common.lucene.Lucene.EarlyTerminationException.class);
ids.put(60, null); // EarlyTerminationException was removed in 6.0
ids.put(61, null); // RoutingValidationException was removed in 5.0
ids.put(62, org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class);
ids.put(63, org.elasticsearch.indices.AliasFilterParsingException.class);

View File

@ -148,29 +148,35 @@ public class SearchPhaseControllerTests extends ESTestCase {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<SearchPhaseResult> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);
SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), false);
AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, reducedQueryPhase.scoreDocs,
reducedQueryPhase.suggest);
InternalSearchResponse mergedResponse = searchPhaseController.merge(false,
reducedQueryPhase,
searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get);
int suggestSize = 0;
for (Suggest.Suggestion s : reducedQueryPhase.suggest) {
Stream<CompletionSuggestion.Entry> stream = s.getEntries().stream();
suggestSize += stream.collect(Collectors.summingInt(e -> e.getOptions().size()));
}
assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize));
assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.scoreDocs.length-suggestSize));
Suggest suggestResult = mergedResponse.suggest();
for (Suggest.Suggestion<?> suggestion : reducedQueryPhase.suggest) {
assertThat(suggestion, instanceOf(CompletionSuggestion.class));
if (suggestion.getEntries().get(0).getOptions().size() > 0) {
CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName());
assertNotNull(suggestionResult);
List<CompletionSuggestion.Entry.Option> options = suggestionResult.getEntries().get(0).getOptions();
assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size()));
for (CompletionSuggestion.Entry.Option option : options) {
assertNotNull(option.getHit());
for (boolean trackTotalHits : new boolean[] {true, false}) {
SearchPhaseController.ReducedQueryPhase reducedQueryPhase =
searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits);
AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, reducedQueryPhase.scoreDocs,
reducedQueryPhase.suggest);
InternalSearchResponse mergedResponse = searchPhaseController.merge(false,
reducedQueryPhase,
searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get);
if (trackTotalHits == false) {
assertThat(mergedResponse.hits.totalHits, equalTo(-1L));
}
int suggestSize = 0;
for (Suggest.Suggestion s : reducedQueryPhase.suggest) {
Stream<CompletionSuggestion.Entry> stream = s.getEntries().stream();
suggestSize += stream.collect(Collectors.summingInt(e -> e.getOptions().size()));
}
assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize));
assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.scoreDocs.length - suggestSize));
Suggest suggestResult = mergedResponse.suggest();
for (Suggest.Suggestion<?> suggestion : reducedQueryPhase.suggest) {
assertThat(suggestion, instanceOf(CompletionSuggestion.class));
if (suggestion.getEntries().get(0).getOptions().size() > 0) {
CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName());
assertNotNull(suggestionResult);
List<CompletionSuggestion.Entry.Option> options = suggestionResult.getEntries().get(0).getOptions();
assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size()));
for (CompletionSuggestion.Entry.Option option : options) {
assertNotNull(option.getHit());
}
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.ToXContent;
@ -44,7 +45,7 @@ public class SearchHitsTests extends ESTestCase {
for (int i = 0; i < searchHits; i++) {
hits[i] = SearchHitTests.createTestItem(false); // creating random innerHits could create loops
}
long totalHits = randomLong();
long totalHits = frequently() ? TestUtil.nextLong(random(), 0, Long.MAX_VALUE) : -1;
float maxScore = frequently() ? randomFloat() : Float.NaN;
return new SearchHits(hits, totalHits, maxScore);
}

View File

@ -21,7 +21,9 @@ package org.elasticsearch.search.query;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
@ -29,19 +31,30 @@ import org.apache.lucene.index.MultiReader;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.queries.MinDocQuery;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestSearchContext;
@ -49,6 +62,14 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;
public class QueryPhaseTests extends ESTestCase {
private void countTestCase(Query query, IndexReader reader, boolean shouldCollect) throws Exception {
@ -66,7 +87,7 @@ public class QueryPhaseTests extends ESTestCase {
}
};
final boolean rescore = QueryPhase.execute(context, contextSearcher);
final boolean rescore = QueryPhase.execute(context, contextSearcher, null);
assertFalse(rescore);
assertEquals(searcher.count(query), context.queryResult().topDocs().totalHits);
assertEquals(shouldCollect, collected.get());
@ -135,12 +156,12 @@ public class QueryPhaseTests extends ESTestCase {
}
};
QueryPhase.execute(context, contextSearcher);
QueryPhase.execute(context, contextSearcher, null);
assertEquals(0, context.queryResult().topDocs().totalHits);
assertFalse(collected.get());
context.parsedPostFilter(new ParsedQuery(new MatchNoDocsQuery()));
QueryPhase.execute(context, contextSearcher);
QueryPhase.execute(context, contextSearcher, null);
assertEquals(0, context.queryResult().topDocs().totalHits);
assertTrue(collected.get());
}
@ -159,14 +180,264 @@ public class QueryPhaseTests extends ESTestCase {
}
};
QueryPhase.execute(context, contextSearcher);
QueryPhase.execute(context, contextSearcher, null);
assertEquals(0, context.queryResult().topDocs().totalHits);
assertFalse(collected.get());
context.minimumScore(1);
QueryPhase.execute(context, contextSearcher);
QueryPhase.execute(context, contextSearcher, null);
assertEquals(0, context.queryResult().topDocs().totalHits);
assertTrue(collected.get());
}
public void testInOrderScrollOptimization() throws Exception {
Directory dir = newDirectory();
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT));
IndexWriterConfig iwc = newIndexWriterConfig()
.setIndexSort(sort);
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
final int numDocs = scaledRandomIntBetween(100, 200);
for (int i = 0; i < numDocs; ++i) {
w.addDocument(new Document());
}
w.close();
final AtomicBoolean collected = new AtomicBoolean();
IndexReader reader = DirectoryReader.open(dir);
IndexSearcher contextSearcher = new IndexSearcher(reader) {
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
collected.set(true);
super.search(leaves, weight, collector);
}
};
TestSearchContext context = new TestSearchContext(null);
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
ScrollContext scrollContext = new ScrollContext();
scrollContext.lastEmittedDoc = null;
scrollContext.maxScore = Float.NaN;
scrollContext.totalHits = -1;
context.scrollContext(scrollContext);
context.setTask(new SearchTask(123L, "", "", "", null));
context.setSize(10);
QueryPhase.execute(context, contextSearcher, null);
assertThat(context.queryResult().topDocs().totalHits, equalTo(numDocs));
assertTrue(collected.get());
assertNull(context.queryResult().terminatedEarly());
assertThat(context.terminateAfter(), equalTo(0));
assertThat(context.queryResult().getTotalHits(), equalTo(numDocs));
QueryPhase.execute(context, contextSearcher, null);
assertThat(context.queryResult().topDocs().totalHits, equalTo(numDocs));
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.terminateAfter(), equalTo(10));
assertThat(context.queryResult().getTotalHits(), equalTo(numDocs));
assertThat(context.queryResult().topDocs().scoreDocs[0].doc, greaterThanOrEqualTo(10));
reader.close();
dir.close();
}
public void testTerminateAfterEarlyTermination() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
final int numDocs = scaledRandomIntBetween(100, 200);
for (int i = 0; i < numDocs; ++i) {
Document doc = new Document();
if (randomBoolean()) {
doc.add(new StringField("foo", "bar", Store.NO));
}
if (randomBoolean()) {
doc.add(new StringField("foo", "baz", Store.NO));
}
doc.add(new NumericDocValuesField("rank", numDocs - i));
w.addDocument(doc);
}
w.close();
TestSearchContext context = new TestSearchContext(null);
context.setTask(new SearchTask(123L, "", "", "", null));
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
context.terminateAfter(1);
final AtomicBoolean collected = new AtomicBoolean();
final IndexReader reader = DirectoryReader.open(dir);
IndexSearcher contextSearcher = new IndexSearcher(reader) {
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
collected.set(true);
super.search(leaves, weight, collector);
}
};
{
context.setSize(1);
QueryPhase.execute(context, contextSearcher, null);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
context.setSize(0);
QueryPhase.execute(context, contextSearcher, null);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(0));
}
{
context.setSize(1);
QueryPhase.execute(context, contextSearcher, null);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
}
{
context.setSize(1);
BooleanQuery bq = new BooleanQuery.Builder()
.add(new TermQuery(new Term("foo", "bar")), Occur.SHOULD)
.add(new TermQuery(new Term("foo", "baz")), Occur.SHOULD)
.build();
context.parsedQuery(new ParsedQuery(bq));
collected.set(false);
QueryPhase.execute(context, contextSearcher, null);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
context.setSize(0);
context.parsedQuery(new ParsedQuery(bq));
collected.set(false);
QueryPhase.execute(context, contextSearcher, null);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(0));
}
{
context.setSize(1);
collected.set(false);
TotalHitCountCollector collector = new TotalHitCountCollector();
context.queryCollectors().put(TotalHitCountCollector.class, collector);
QueryPhase.execute(context, contextSearcher, null);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
}
{
context.setSize(0);
collected.set(false);
TotalHitCountCollector collector = new TotalHitCountCollector();
context.queryCollectors().put(TotalHitCountCollector.class, collector);
QueryPhase.execute(context, contextSearcher, null);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(0));
assertThat(collector.getTotalHits(), equalTo(1));
}
reader.close();
dir.close();
}
public void testIndexSortingEarlyTermination() throws Exception {
Directory dir = newDirectory();
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT));
IndexWriterConfig iwc = newIndexWriterConfig()
.setIndexSort(sort);
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
final int numDocs = scaledRandomIntBetween(100, 200);
for (int i = 0; i < numDocs; ++i) {
Document doc = new Document();
if (randomBoolean()) {
doc.add(new StringField("foo", "bar", Store.NO));
}
if (randomBoolean()) {
doc.add(new StringField("foo", "baz", Store.NO));
}
doc.add(new NumericDocValuesField("rank", numDocs - i));
w.addDocument(doc);
}
w.close();
TestSearchContext context = new TestSearchContext(null);
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
context.setSize(1);
context.setTask(new SearchTask(123L, "", "", "", null));
context.sort(new SortAndFormats(sort, new DocValueFormat[] {DocValueFormat.RAW}));
final AtomicBoolean collected = new AtomicBoolean();
final IndexReader reader = DirectoryReader.open(dir);
IndexSearcher contextSearcher = new IndexSearcher(reader) {
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
collected.set(true);
super.search(leaves, weight, collector);
}
};
QueryPhase.execute(context, contextSearcher, sort);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(numDocs));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs[0], instanceOf(FieldDoc.class));
FieldDoc fieldDoc = (FieldDoc) context.queryResult().topDocs().scoreDocs[0];
assertThat(fieldDoc.fields[0], equalTo(1));
{
collected.set(false);
context.parsedPostFilter(new ParsedQuery(new MinDocQuery(1)));
QueryPhase.execute(context, contextSearcher, sort);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(numDocs - 1));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs[0], instanceOf(FieldDoc.class));
assertThat(fieldDoc.fields[0], anyOf(equalTo(1), equalTo(2)));
context.parsedPostFilter(null);
final TotalHitCountCollector totalHitCountCollector = new TotalHitCountCollector();
context.queryCollectors().put(TotalHitCountCollector.class, totalHitCountCollector);
collected.set(false);
QueryPhase.execute(context, contextSearcher, sort);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, equalTo(numDocs));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs[0], instanceOf(FieldDoc.class));
assertThat(fieldDoc.fields[0], anyOf(equalTo(1), equalTo(2)));
assertThat(totalHitCountCollector.getTotalHits(), equalTo(numDocs));
context.queryCollectors().clear();
}
{
collected.set(false);
context.trackTotalHits(false);
QueryPhase.execute(context, contextSearcher, sort);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, lessThan(numDocs));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs[0], instanceOf(FieldDoc.class));
assertThat(fieldDoc.fields[0], anyOf(equalTo(1), equalTo(2)));
final TotalHitCountCollector totalHitCountCollector = new TotalHitCountCollector();
context.queryCollectors().put(TotalHitCountCollector.class, totalHitCountCollector);
collected.set(false);
QueryPhase.execute(context, contextSearcher, sort);
assertTrue(collected.get());
assertTrue(context.queryResult().terminatedEarly());
assertThat(context.queryResult().topDocs().totalHits, lessThan(numDocs));
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
assertThat(context.queryResult().topDocs().scoreDocs[0], instanceOf(FieldDoc.class));
assertThat(fieldDoc.fields[0], anyOf(equalTo(1), equalTo(2)));
assertThat(totalHitCountCollector.getTotalHits(), equalTo(numDocs));
}
reader.close();
dir.close();
}
}

View File

@ -30,7 +30,9 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchContextException;
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
@ -51,6 +53,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFail
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class SimpleSearchIT extends ESIntegTestCase {
@ -285,7 +289,50 @@ public class SimpleSearchIT extends ESIntegTestCase {
.setTerminateAfter(2 * max).execute().actionGet();
assertHitCount(searchResponse, max);
assertFalse(searchResponse.isTerminatedEarly());
assertNull(searchResponse.isTerminatedEarly());
}
public void testSimpleIndexSortEarlyTerminate() throws Exception {
prepareCreate("test")
.setSettings(Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.sort.field", "rank")
)
.addMapping("type1", "rank", "type=integer")
.get();
ensureGreen();
int max = randomIntBetween(3, 29);
List<IndexRequestBuilder> docbuilders = new ArrayList<>(max);
for (int i = max-1; i >= 0; i--) {
String id = String.valueOf(i);
docbuilders.add(client().prepareIndex("test", "type1", id).setSource("rank", i));
}
indexRandom(true, docbuilders);
ensureGreen();
refresh();
SearchResponse searchResponse;
boolean hasEarlyTerminated = false;
for (int i = 1; i < max; i++) {
searchResponse = client().prepareSearch("test")
.addDocValueField("rank")
.setTrackTotalHits(false)
.addSort("rank", SortOrder.ASC)
.setSize(i).execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(-1L));
if (searchResponse.isTerminatedEarly() != null) {
assertTrue(searchResponse.isTerminatedEarly());
hasEarlyTerminated = true;
}
for (int j = 0; j < i; j++) {
assertThat(searchResponse.getHits().getAt(j).field("rank").getValue(),
equalTo((long) j));
}
}
assertTrue(hasEarlyTerminated);
}
public void testInsaneFromAndSize() throws Exception {

View File

@ -104,10 +104,110 @@ Index sorting supports the following settings:
[WARNING]
Index sorting can be defined only once at index creation. It is not allowed to add or update
a sort on an existing index.
a sort on an existing index. Index sorting also has a cost in terms of indexing throughput since
documents must be sorted at flush and merge time. You should test the impact on your application
before activating this feature.
// TODO: Also document how index sorting can be used to early-terminate
// sorted search requests when the total number of matches is not needed
[float]
[[early-terminate]]
=== Early termination of search request
By default in elasticsearch a search request must visit every document that match a query to
retrieve the top documents sorted by a specified sort.
Though when the index sort and the search sort are the same it is possible to limit
the number of documents that should be visited per segment to retrieve the N top ranked documents globally.
For example, let's say we have an index that contains events sorted by a timestamp field:
[source,js]
--------------------------------------------------
PUT events
{
"settings" : {
"index" : {
"sort.field" : "timestamp",
"sort.order" : "desc" <2>
}
},
"mappings": {
"doc": {
"properties": {
"timestamp": {
"type": "date"
}
}
}
}
}
--------------------------------------------------
// CONSOLE
<1> This index is sorted by timestamp in descending order (most recent first)
You can search for the last 10 events with:
[source,js]
--------------------------------------------------
GET /events/_search
{
"size": 10,
"sort": [
{ "timestamp": "desc" }
]
}
--------------------------------------------------
// CONSOLE
// TEST[continued]
Elasticsearch will detect that the top docs of each segment are already sorted in the index
and will only compare the first N documents per segment.
The rest of the documents matching the query are collected to count the total number of results
and to build aggregations.
If you're only looking for the last 10 events and have no interest in
the total number of documents that match the query you can set `track_total_hits`
to false:
[source,js]
--------------------------------------------------
GET /events/_search
{
"size": 10,
"sort": [
{ "timestamp": "desc" }
],
"track_total_hits": false
}
--------------------------------------------------
// CONSOLE
// TEST[continued]
<1> The index sort will be used to rank the top documents and each segment will early terminate the collection after the first 10 matches.
This time, Elasticsearch will not try to count the number of documents and will be able to terminate the query
as soon as N documents have been collected per segment.
[source,js]
--------------------------------------------------
{
"_shards": ...
"hits" : {
"total" : -1, <1>
"max_score" : null,
"hits" : []
},
"took": 20,
"terminated_early": true, <2>
"timed_out": false
}
--------------------------------------------------
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": "$body._shards",/]
// TESTRESPONSE[s/"took": 20,/"took": "$body.took",/]
// TESTRESPONSE[s/"terminated_early": true,//]
<1> The total number of hits matching the query is unknown because of early termination.
<2> Indicates whether the top docs retrieval has actually terminated_early.
NOTE: Aggregations will collect all documents that match the query regardless of the value of `track_total_hits`
[[index-modules-index-sorting-conjunctions]]
=== Use index sorting to speed up conjunctions
@ -128,4 +228,3 @@ close to each other.
For instance if you were indexing cars for sale, it might be interesting to
sort by fuel type, body type, make, year of registration and finally mileage.

View File

@ -97,6 +97,11 @@ is important).
|`track_scores` |When sorting, set to `true` in order to still track
scores and return them as part of each hit.
|`track_total_hits` |Set to `false` in order to disable the tracking
of the total number of hits that match the query.
(see <<index-modules-index-sorting,_Index Sorting_>> for more details).
Defaults to true.
|`timeout` |A search timeout, bounding the search request to be executed
within the specified time value and bail with the hits accumulated up to
that point when expired. Defaults to no timeout.

View File

@ -147,6 +147,10 @@
"type" : "boolean",
"description": "Whether to calculate and return scores even if they are not used for sorting"
},
"track_total_hits": {
"type" : "boolean",
"description": "Indicate if the number of documents that match the query should be tracked"
},
"typed_keys": {
"type" : "boolean",
"description" : "Specify whether aggregation and suggester names should be prefixed by their respective types in the response"

View File

@ -11,7 +11,7 @@
body:
settings:
number_of_shards: 1
number_of_replicas: 0
number_of_replicas: 1
index.sort.field: rank
mappings:
test:
@ -19,10 +19,6 @@
rank:
type: integer
- do:
cluster.health:
wait_for_status: green
- do:
index:
index: test
@ -55,6 +51,105 @@
indices.refresh:
index: test
- do:
index:
index: test
type: test
id: "5"
body: { "rank": 8 }
- do:
index:
index: test
type: test
id: "6"
body: { "rank": 6 }
- do:
index:
index: test
type: test
id: "7"
body: { "rank": 5 }
- do:
index:
index: test
type: test
id: "8"
body: { "rank": 7 }
- do:
index:
index: test
type: test
id: "8"
body: { "rank": 7 }
- do:
indices.refresh:
index: test
- do:
search:
index: test
type: test
body:
sort: ["rank"]
size: 1
- is_true: terminated_early
- match: {hits.total: 8 }
- length: {hits.hits: 1 }
- match: {hits.hits.0._id: "2" }
- do:
search:
index: test
type: test
body:
sort: ["rank"]
query: {"range": { "rank": { "from": 0 } } }
track_total_hits: false
size: 1
- match: {terminated_early: true}
- match: {hits.total: -1 }
- length: {hits.hits: 1 }
- match: {hits.hits.0._id: "2" }
- do:
search:
index: test
type: test
body:
sort: ["rank"]
size: 3
- is_true: terminated_early
- match: {hits.total: 8 }
- length: {hits.hits: 3 }
- match: {hits.hits.0._id: "2" }
- match: {hits.hits.1._id: "4" }
- match: {hits.hits.2._id: "3" }
- do:
search:
index: test
type: test
track_total_hits: false
body:
query: {"range": { "rank": { "from": 0 } } }
sort: ["rank"]
size: 3
- match: {terminated_early: true }
- match: {hits.total: -1 }
- length: {hits.hits: 3 }
- match: {hits.hits.0._id: "2" }
- match: {hits.hits.1._id: "4" }
- match: {hits.hits.2._id: "3" }
- do:
indices.forcemerge:
index: test
@ -64,14 +159,6 @@
indices.refresh:
index: test
- do:
indices.segments:
index: test
- match: { _shards.total: 1}
- length: { indices.test.shards.0: 1}
- length: { indices.test.shards.0.0.segments: 1}
- do:
search:
index: test
@ -79,9 +166,44 @@
body:
sort: _doc
- match: {hits.total: 4 }
- length: {hits.hits: 4 }
- match: {hits.hits.0._id: "2" }
- match: {hits.hits.1._id: "4" }
- match: {hits.hits.2._id: "3" }
- match: {hits.hits.3._id: "1" }
- is_false: terminated_early
- match: {hits.total: 8 }
- length: {hits.hits: 8 }
- match: {hits.hits.0._id: "2" }
- match: {hits.hits.1._id: "4" }
- match: {hits.hits.2._id: "3" }
- match: {hits.hits.3._id: "1" }
- match: {hits.hits.4._id: "7" }
- match: {hits.hits.5._id: "6" }
- match: {hits.hits.6._id: "8" }
- match: {hits.hits.7._id: "5" }
- do:
search:
index: test
type: test
body:
sort: ["rank"]
query: {"range": { "rank": { "from": 0 } } }
track_total_hits: false
size: 3
- match: {terminated_early: true }
- match: {hits.total: -1 }
- length: {hits.hits: 3 }
- match: {hits.hits.0._id: "2" }
- match: {hits.hits.1._id: "4" }
- match: {hits.hits.2._id: "3" }
- do:
catch: /disabling \[track_total_hits\] is not allowed in a scroll context/
search:
index: test
type: test
scroll: 1m
body:
sort: ["rank"]
query: {"range": { "rank": { "from": 0 } } }
track_total_hits: false
size: 3

View File

@ -143,6 +143,9 @@ public class RandomSearchRequestGenerator {
if (randomBoolean()) {
builder.terminateAfter(randomIntBetween(1, 100000));
}
if (randomBoolean()) {
builder.trackTotalHits(randomBoolean());
}
switch(randomInt(2)) {
case 0:

View File

@ -60,6 +60,7 @@ import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.suggest.SuggestionSearchContext;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -81,11 +82,15 @@ public class TestSearchContext extends SearchContext {
Query query;
Float minScore;
SearchTask task;
SortAndFormats sort;
boolean trackScores = false;
boolean trackTotalHits = true;
ContextIndexSearcher searcher;
int size;
private int terminateAfter = DEFAULT_TERMINATE_AFTER;
private SearchContextAggregations aggregations;
private ScrollContext scrollContext;
private final long originNanoTime = System.nanoTime();
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
@ -161,12 +166,13 @@ public class TestSearchContext extends SearchContext {
@Override
public ScrollContext scrollContext() {
return null;
return scrollContext;
}
@Override
public SearchContext scrollContext(ScrollContext scrollContext) {
throw new UnsupportedOperationException();
this.scrollContext = scrollContext;
return this;
}
@Override
@ -210,7 +216,7 @@ public class TestSearchContext extends SearchContext {
@Override
public List<RescoreSearchContext> rescore() {
return null;
return Collections.emptyList();
}
@Override
@ -336,22 +342,35 @@ public class TestSearchContext extends SearchContext {
@Override
public SearchContext sort(SortAndFormats sort) {
return null;
this.sort = sort;
return this;
}
@Override
public SortAndFormats sort() {
return null;
return sort;
}
@Override
public SearchContext trackScores(boolean trackScores) {
return null;
this.trackScores = trackScores;
return this;
}
@Override
public boolean trackScores() {
return false;
return trackScores;
}
@Override
public SearchContext trackTotalHits(boolean trackTotalHits) {
this.trackTotalHits = trackTotalHits;
return this;
}
@Override
public boolean trackTotalHits() {
return trackTotalHits;
}
@Override