Search & Count: Add option to early terminate doc collection

Allow users to control document collection termination, if a specified terminate_after number is
set. Upon setting the newly added parameter, the response will include a boolean terminated_early
flag, indicating if the document collection for any shard terminated early.

closes #6876
This commit is contained in:
Areek Zillur 2014-07-18 10:53:20 -04:00
parent 66825ac851
commit 5487c56c70
32 changed files with 600 additions and 47 deletions

View File

@ -63,6 +63,12 @@ query.
|default_operator |The default operator to be used, can be `AND` or |default_operator |The default operator to be used, can be `AND` or
`OR`. Defaults to `OR`. `OR`. Defaults to `OR`.
|coming[1.4.0] terminate_after |The maximum count for each shard, upon
reaching which the query execution will terminate early.
If set, the response will have a boolean field `terminated_early` to
indicate whether the query execution has actually terminated_early.
Defaults to no terminate_after.
|======================================================================= |=======================================================================
[float] [float]

View File

@ -62,6 +62,12 @@ that point when expired. Defaults to no timeout.
`query_and_fetch`. Defaults to `query_then_fetch`. See `query_and_fetch`. Defaults to `query_then_fetch`. See
<<search-request-search-type,_Search Type_>> for <<search-request-search-type,_Search Type_>> for
more details on the different types of search that can be performed. more details on the different types of search that can be performed.
|coming[1.4.0] `terminate_after` |The maximum number of documents to collect for
each shard, upon reaching which the query execution will terminate early.
If set, the response will have a boolean field `terminated_early` to
indicate whether the query execution has actually terminated_early.
Defaults to no terminate_after.
|======================================================================= |=======================================================================
Out of the above, the `search_type` is the one that can not be passed Out of the above, the `search_type` is the one that can not be passed

View File

@ -82,6 +82,12 @@ scores and return them as part of each hit.
within the specified time value and bail with the hits accumulated up to within the specified time value and bail with the hits accumulated up to
that point when expired. Defaults to no timeout. that point when expired. Defaults to no timeout.
|coming[1.4.0] `terminate_after` |The maximum number of documents to collect for
each shard, upon reaching which the query execution will terminate early.
If set, the response will have a boolean field `terminated_early` to
indicate whether the query execution has actually terminated_early.
Defaults to no terminate_after.
|`from` |The starting from index of the hits to return. Defaults to `0`. |`from` |The starting from index of the hits to return. Defaults to `0`.
|`size` |The number of hits to return. Defaults to `10`. |`size` |The number of hits to return. Defaults to `10`.

View File

@ -20,6 +20,8 @@
package org.elasticsearch.action.count; package org.elasticsearch.action.count;
import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.QuerySourceBuilder; import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
@ -34,6 +36,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TERMINATE_AFTER;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
@ -67,6 +71,7 @@ public class CountRequest extends BroadcastOperationRequest<CountRequest> {
private String[] types = Strings.EMPTY_ARRAY; private String[] types = Strings.EMPTY_ARRAY;
long nowInMillis; long nowInMillis;
private int terminateAfter = DEFAULT_TERMINATE_AFTER;
CountRequest() { CountRequest() {
} }
@ -221,6 +226,21 @@ public class CountRequest extends BroadcastOperationRequest<CountRequest> {
return this.preference; return this.preference;
} }
/**
* Upon reaching <code>terminateAfter</code> counts, the count request will early terminate
*/
public CountRequest terminateAfter(int terminateAfterCount) {
if (terminateAfterCount <= 0) {
throw new ElasticsearchIllegalArgumentException("terminateAfter must be > 0");
}
this.terminateAfter = terminateAfterCount;
return this;
}
public int terminateAfter() {
return this.terminateAfter;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
@ -230,6 +250,10 @@ public class CountRequest extends BroadcastOperationRequest<CountRequest> {
sourceUnsafe = false; sourceUnsafe = false;
source = in.readBytesReference(); source = in.readBytesReference();
types = in.readStringArray(); types = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminateAfter = in.readVInt();
}
} }
@Override @Override
@ -240,6 +264,10 @@ public class CountRequest extends BroadcastOperationRequest<CountRequest> {
out.writeOptionalString(preference); out.writeOptionalString(preference);
out.writeBytesReference(source); out.writeBytesReference(source);
out.writeStringArray(types); out.writeStringArray(types);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVInt(terminateAfter);
}
} }
@Override @Override

View File

@ -91,6 +91,14 @@ public class CountRequestBuilder extends BroadcastOperationRequestBuilder<CountR
return this; return this;
} }
/**
* The query binary to execute
*/
public CountRequestBuilder setQuery(BytesReference queryBinary) {
sourceBuilder().setQuery(queryBinary);
return this;
}
/** /**
* The source to execute. * The source to execute.
*/ */
@ -115,6 +123,11 @@ public class CountRequestBuilder extends BroadcastOperationRequestBuilder<CountR
return this; return this;
} }
public CountRequestBuilder setTerminateAfter(int terminateAfter) {
request().terminateAfter(terminateAfter);
return this;
}
@Override @Override
protected void doExecute(ActionListener<CountResponse> listener) { protected void doExecute(ActionListener<CountResponse> listener) {
if (sourceBuilder != null) { if (sourceBuilder != null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.count; package org.elasticsearch.action.count;
import org.elasticsearch.Version;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -33,15 +34,17 @@ import java.util.List;
*/ */
public class CountResponse extends BroadcastOperationResponse { public class CountResponse extends BroadcastOperationResponse {
private boolean terminatedEarly;
private long count; private long count;
CountResponse() { CountResponse() {
} }
CountResponse(long count, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) { CountResponse(long count, boolean hasTerminatedEarly, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures); super(totalShards, successfulShards, failedShards, shardFailures);
this.count = count; this.count = count;
this.terminatedEarly = hasTerminatedEarly;
} }
/** /**
@ -51,6 +54,13 @@ public class CountResponse extends BroadcastOperationResponse {
return count; return count;
} }
/**
* True if the request has been terminated early due to enough count
*/
public boolean terminatedEarly() {
return this.terminatedEarly;
}
public RestStatus status() { public RestStatus status() {
if (getFailedShards() == 0) { if (getFailedShards() == 0) {
if (getSuccessfulShards() == 0 && getTotalShards() > 0) { if (getSuccessfulShards() == 0 && getTotalShards() > 0) {
@ -76,11 +86,17 @@ public class CountResponse extends BroadcastOperationResponse {
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
count = in.readVLong(); count = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminatedEarly = in.readBoolean();
}
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeVLong(count); out.writeVLong(count);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(terminatedEarly);
}
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.count; package org.elasticsearch.action.count;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -28,12 +29,15 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TERMINATE_AFTER;
/** /**
* Internal count request executed directly against a specific index shard. * Internal count request executed directly against a specific index shard.
*/ */
class ShardCountRequest extends BroadcastShardOperationRequest { class ShardCountRequest extends BroadcastShardOperationRequest {
private float minScore; private float minScore;
private int terminateAfter;
private BytesReference querySource; private BytesReference querySource;
@ -55,6 +59,7 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
this.types = request.types(); this.types = request.types();
this.filteringAliases = filteringAliases; this.filteringAliases = filteringAliases;
this.nowInMillis = request.nowInMillis; this.nowInMillis = request.nowInMillis;
this.terminateAfter = request.terminateAfter();
} }
public float minScore() { public float minScore() {
@ -77,6 +82,10 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
return this.nowInMillis; return this.nowInMillis;
} }
public int terminateAfter() {
return this.terminateAfter;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
@ -99,6 +108,12 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
} }
} }
nowInMillis = in.readVLong(); nowInMillis = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminateAfter = in.readVInt();
} else {
terminateAfter = DEFAULT_TERMINATE_AFTER;
}
} }
@Override @Override
@ -121,5 +136,9 @@ class ShardCountRequest extends BroadcastShardOperationRequest {
out.writeVInt(0); out.writeVInt(0);
} }
out.writeVLong(nowInMillis); out.writeVLong(nowInMillis);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVInt(terminateAfter);
}
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.count; package org.elasticsearch.action.count;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -33,29 +34,41 @@ import java.io.IOException;
class ShardCountResponse extends BroadcastShardOperationResponse { class ShardCountResponse extends BroadcastShardOperationResponse {
private long count; private long count;
private boolean terminatedEarly;
ShardCountResponse() { ShardCountResponse() {
} }
public ShardCountResponse(String index, int shardId, long count) { public ShardCountResponse(String index, int shardId, long count, boolean terminatedEarly) {
super(index, shardId); super(index, shardId);
this.count = count; this.count = count;
this.terminatedEarly = terminatedEarly;
} }
public long getCount() { public long getCount() {
return this.count; return this.count;
} }
public boolean terminatedEarly() {
return this.terminatedEarly;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
count = in.readVLong(); count = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminatedEarly = in.readBoolean();
}
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeVLong(count); out.writeVLong(count);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(terminatedEarly);
}
} }
} }

View File

@ -58,6 +58,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TERMINATE_AFTER;
/** /**
* *
@ -139,6 +140,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
int successfulShards = 0; int successfulShards = 0;
int failedShards = 0; int failedShards = 0;
long count = 0; long count = 0;
boolean terminatedEarly = false;
List<ShardOperationFailedException> shardFailures = null; List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) { for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i); Object shardResponse = shardsResponses.get(i);
@ -152,10 +154,13 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else { } else {
count += ((ShardCountResponse) shardResponse).getCount(); count += ((ShardCountResponse) shardResponse).getCount();
if (((ShardCountResponse) shardResponse).terminatedEarly()) {
terminatedEarly = true;
}
successfulShards++; successfulShards++;
} }
} }
return new CountResponse(count, shardsResponses.length(), successfulShards, failedShards, shardFailures); return new CountResponse(count, terminatedEarly, shardsResponses.length(), successfulShards, failedShards, shardFailures);
} }
@Override @Override
@ -186,10 +191,20 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
QueryParseContext.removeTypes(); QueryParseContext.removeTypes();
} }
} }
final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
boolean terminatedEarly = false;
context.preProcess(); context.preProcess();
try { try {
long count = Lucene.count(context.searcher(), context.query()); long count;
return new ShardCountResponse(request.index(), request.shardId(), count); if (hasTerminateAfterCount) {
final Lucene.EarlyTerminatingCollector countCollector =
Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
count = countCollector.count();
} else {
count = Lucene.count(context.searcher(), context.query());
}
return new ShardCountResponse(request.index(), request.shardId(), count, terminatedEarly);
} catch (Exception e) { } catch (Exception e) {
throw new QueryPhaseExecutionException(context, "failed to execute count", e); throw new QueryPhaseExecutionException(context, "failed to execute count", e);
} }

View File

@ -129,6 +129,15 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
return this; return this;
} }
/**
* An optional document count, upon collecting which the search
* query will early terminate
*/
public SearchRequestBuilder setTerminateAfter(int terminateAfter) {
sourceBuilder().terminateAfter(terminateAfter);
return this;
}
/** /**
* A comma separated list of routing values to control the shards the search will be executed on. * A comma separated list of routing values to control the shards the search will be executed on.
*/ */

View File

@ -116,6 +116,14 @@ public class SearchResponse extends ActionResponse implements StatusToXContent {
return internalResponse.timedOut(); return internalResponse.timedOut();
} }
/**
* Has the search operation terminated early due to reaching
* <code>terminateAfter</code>
*/
public Boolean isTerminatedEarly() {
return internalResponse.terminatedEarly();
}
/** /**
* How long the search took. * How long the search took.
*/ */
@ -181,6 +189,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContent {
static final XContentBuilderString REASON = new XContentBuilderString("reason"); static final XContentBuilderString REASON = new XContentBuilderString("reason");
static final XContentBuilderString TOOK = new XContentBuilderString("took"); static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString TIMED_OUT = new XContentBuilderString("timed_out"); static final XContentBuilderString TIMED_OUT = new XContentBuilderString("timed_out");
static final XContentBuilderString TERMINATED_EARLY = new XContentBuilderString("terminated_early");
} }
@Override @Override
@ -190,6 +199,9 @@ public class SearchResponse extends ActionResponse implements StatusToXContent {
} }
builder.field(Fields.TOOK, tookInMillis); builder.field(Fields.TOOK, tookInMillis);
builder.field(Fields.TIMED_OUT, isTimedOut()); builder.field(Fields.TIMED_OUT, isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(Fields.TERMINATED_EARLY, isTerminatedEarly());
}
builder.startObject(Fields._SHARDS); builder.startObject(Fields._SHARDS);
builder.field(Fields.TOTAL, getTotalShards()); builder.field(Fields.TOTAL, getTotalShards());
builder.field(Fields.SUCCESSFUL, getSuccessfulShards()); builder.field(Fields.SUCCESSFUL, getSuccessfulShards());

View File

@ -119,7 +119,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
public void start() { public void start() {
if (scrollId.getContext().length == 0) { if (scrollId.getContext().length == 0) {
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.getAttributes().get("total_hits")), 0.0f), null, null, null, false); final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.getAttributes().get("total_hits")), 0.0f), null, null, null, false, null);
listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, buildShardFailures())); listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, buildShardFailures()));
return; return;
} }

View File

@ -29,6 +29,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Version; import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
@ -39,6 +40,7 @@ import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldData;
import static org.elasticsearch.common.lucene.search.NoopCollector.NOOP_COLLECTOR;
import java.io.IOException; import java.io.IOException;
/** /**
@ -121,12 +123,110 @@ public class Lucene {
public static long count(IndexSearcher searcher, Query query) throws IOException { public static long count(IndexSearcher searcher, Query query) throws IOException {
TotalHitCountCollector countCollector = new TotalHitCountCollector(); TotalHitCountCollector countCollector = new TotalHitCountCollector();
query = wrapCountQuery(query);
searcher.search(query, countCollector);
return countCollector.getTotalHits();
}
/**
* Performs a count on the <code>searcher</code> for <code>query</code>. Terminates
* early when the count has reached <code>terminateAfter</code>
*/
public static long count(IndexSearcher searcher, Query query, int terminateAfterCount) throws IOException {
EarlyTerminatingCollector countCollector = createCountBasedEarlyTerminatingCollector(terminateAfterCount);
countWithEarlyTermination(searcher, query, countCollector);
return countCollector.count();
}
/**
* Creates count based early termination collector with a threshold of <code>maxCountHits</code>
*/
public final static EarlyTerminatingCollector createCountBasedEarlyTerminatingCollector(int maxCountHits) {
return new EarlyTerminatingCollector(maxCountHits);
}
/**
* Wraps <code>delegate</code> with count based early termination collector with a threshold of <code>maxCountHits</code>
*/
public final static EarlyTerminatingCollector wrapCountBasedEarlyTerminatingCollector(final Collector delegate, int maxCountHits) {
return new EarlyTerminatingCollector(delegate, maxCountHits);
}
/**
* Wraps <code>delegate</code> with a time limited collector with a timeout of <code>timeoutInMillis</code>
*/
public final static TimeLimitingCollector wrapTimeLimitingCollector(final Collector delegate, long timeoutInMillis) {
return new TimeLimitingCollector(delegate, TimeLimitingCollector.getGlobalCounter(), timeoutInMillis);
}
/**
* Performs an exists (count > 0) query on the <code>searcher</code> for <code>query</code>
* with <code>filter</code> using the given <code>collector</code>
*
* The <code>collector</code> can be instantiated using <code>Lucene.createExistsCollector()</code>
*/
public static boolean exists(IndexSearcher searcher, Query query, Filter filter,
EarlyTerminatingCollector collector) throws IOException {
collector.reset();
countWithEarlyTermination(searcher, filter, query, collector);
return collector.exists();
}
/**
* Performs an exists (count > 0) query on the <code>searcher</code> for <code>query</code>
* using the given <code>collector</code>
*
* The <code>collector</code> can be instantiated using <code>Lucene.createExistsCollector()</code>
*/
public static boolean exists(IndexSearcher searcher, Query query, EarlyTerminatingCollector collector) throws IOException {
collector.reset();
countWithEarlyTermination(searcher, query, collector);
return collector.exists();
}
/**
* Calls <code>countWithEarlyTermination(searcher, null, query, collector)</code>
*/
public static boolean countWithEarlyTermination(IndexSearcher searcher, Query query,
EarlyTerminatingCollector collector) throws IOException {
return countWithEarlyTermination(searcher, null, query, collector);
}
/**
* Performs a count on <code>query</code> and <code>filter</code> with early termination using <code>searcher</code>.
* The early termination threshold is specified by the provided <code>collector</code>
*/
public static boolean countWithEarlyTermination(IndexSearcher searcher, Filter filter, Query query,
EarlyTerminatingCollector collector) throws IOException {
query = wrapCountQuery(query);
try {
if (filter == null) {
searcher.search(query, collector);
} else {
searcher.search(query, filter, collector);
}
} catch (EarlyTerminationException e) {
// early termination
return true;
}
return false;
}
/**
* Creates an {@link org.elasticsearch.common.lucene.Lucene.EarlyTerminatingCollector}
* with a threshold of <code>1</code>
*/
public final static EarlyTerminatingCollector createExistsCollector() {
return createCountBasedEarlyTerminatingCollector(1);
}
private final static Query wrapCountQuery(Query query) {
// we don't need scores, so wrap it in a constant score query // we don't need scores, so wrap it in a constant score query
if (!(query instanceof ConstantScoreQuery)) { if (!(query instanceof ConstantScoreQuery)) {
query = new ConstantScoreQuery(query); query = new ConstantScoreQuery(query);
} }
searcher.search(query, countCollector); return query;
return countCollector.getTotalHits();
} }
/** /**
@ -355,35 +455,70 @@ public class Lucene {
} }
} }
public static class ExistsCollector extends Collector { /**
* This exception is thrown when {@link org.elasticsearch.common.lucene.Lucene.EarlyTerminatingCollector}
* reaches early termination
* */
public final static class EarlyTerminationException extends ElasticsearchException {
private boolean exists; public EarlyTerminationException(String msg) {
super(msg);
}
}
/**
* A collector that terminates early by throwing {@link org.elasticsearch.common.lucene.Lucene.EarlyTerminationException}
* when count of matched documents has reached <code>maxCountHits</code>
*/
public final static class EarlyTerminatingCollector extends Collector {
private final int maxCountHits;
private final Collector delegate;
private int count = 0;
EarlyTerminatingCollector(int maxCountHits) {
this.maxCountHits = maxCountHits;
this.delegate = NOOP_COLLECTOR;
}
EarlyTerminatingCollector(final Collector delegate, int maxCountHits) {
this.maxCountHits = maxCountHits;
this.delegate = (delegate == null) ? NOOP_COLLECTOR : delegate;
}
public void reset() { public void reset() {
exists = false; count = 0;
}
public int count() {
return count;
} }
public boolean exists() { public boolean exists() {
return exists; return count > 0;
} }
@Override @Override
public void setScorer(Scorer scorer) throws IOException { public void setScorer(Scorer scorer) throws IOException {
this.exists = false; delegate.setScorer(scorer);
} }
@Override @Override
public void collect(int doc) throws IOException { public void collect(int doc) throws IOException {
exists = true; delegate.collect(doc);
if (++count >= maxCountHits) {
throw new EarlyTerminationException("early termination [CountBased]");
}
} }
@Override @Override
public void setNextReader(AtomicReaderContext context) throws IOException { public void setNextReader(AtomicReaderContext atomicReaderContext) throws IOException {
delegate.setNextReader(atomicReaderContext);
} }
@Override @Override
public boolean acceptsDocsOutOfOrder() { public boolean acceptsDocsOutOfOrder() {
return true; return delegate.acceptsDocsOutOfOrder();
} }
} }

View File

@ -485,6 +485,16 @@ public class PercolateContext extends SearchContext {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public int terminateAfter() {
return DEFAULT_TERMINATE_AFTER;
}
@Override
public void terminateAfter(int terminateAfter) {
throw new UnsupportedOperationException();
}
@Override @Override
public SearchContext minimumScore(float minimumScore) { public SearchContext minimumScore(float minimumScore) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -446,14 +446,13 @@ public class PercolatorService extends AbstractComponent {
@Override @Override
public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context, boolean isNested) { public PercolateShardResponse doPercolate(PercolateShardRequest request, PercolateContext context, boolean isNested) {
long count = 0; long count = 0;
Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); Lucene.EarlyTerminatingCollector collector = Lucene.createExistsCollector();
for (Map.Entry<BytesRef, Query> entry : context.percolateQueries().entrySet()) { for (Map.Entry<BytesRef, Query> entry : context.percolateQueries().entrySet()) {
collector.reset();
try { try {
if (isNested) { if (isNested) {
context.docSearcher().search(entry.getValue(), NonNestedDocsFilter.INSTANCE, collector); Lucene.exists(context.docSearcher(), entry.getValue(), NonNestedDocsFilter.INSTANCE, collector);
} else { } else {
context.docSearcher().search(entry.getValue(), collector); Lucene.exists(context.docSearcher(), entry.getValue(), collector);
} }
} catch (Throwable e) { } catch (Throwable e) {
logger.debug("[" + entry.getKey() + "] failed to execute query", e); logger.debug("[" + entry.getKey() + "] failed to execute query", e);
@ -543,19 +542,18 @@ public class PercolatorService extends AbstractComponent {
long count = 0; long count = 0;
List<BytesRef> matches = new ArrayList<>(); List<BytesRef> matches = new ArrayList<>();
List<Map<String, HighlightField>> hls = new ArrayList<>(); List<Map<String, HighlightField>> hls = new ArrayList<>();
Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); Lucene.EarlyTerminatingCollector collector = Lucene.createExistsCollector();
for (Map.Entry<BytesRef, Query> entry : context.percolateQueries().entrySet()) { for (Map.Entry<BytesRef, Query> entry : context.percolateQueries().entrySet()) {
collector.reset();
if (context.highlight() != null) { if (context.highlight() != null) {
context.parsedQuery(new ParsedQuery(entry.getValue(), ImmutableMap.<String, Filter>of())); context.parsedQuery(new ParsedQuery(entry.getValue(), ImmutableMap.<String, Filter>of()));
context.hitContext().cache().clear(); context.hitContext().cache().clear();
} }
try { try {
if (isNested) { if (isNested) {
context.docSearcher().search(entry.getValue(), NonNestedDocsFilter.INSTANCE, collector); Lucene.exists(context.docSearcher(), entry.getValue(), NonNestedDocsFilter.INSTANCE, collector);
} else { } else {
context.docSearcher().search(entry.getValue(), collector); Lucene.exists(context.docSearcher(), entry.getValue(), collector);
} }
} catch (Throwable e) { } catch (Throwable e) {
logger.debug("[" + entry.getKey() + "] failed to execute query", e); logger.debug("[" + entry.getKey() + "] failed to execute query", e);

View File

@ -58,7 +58,7 @@ abstract class QueryCollector extends Collector {
final ESLogger logger; final ESLogger logger;
boolean isNestedDoc = false; boolean isNestedDoc = false;
final Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); final Lucene.EarlyTerminatingCollector collector = Lucene.createExistsCollector();
BytesRef current; BytesRef current;
SortedBinaryDocValues values; SortedBinaryDocValues values;
@ -200,16 +200,15 @@ abstract class QueryCollector extends Collector {
} }
// run the query // run the query
try { try {
collector.reset();
if (context.highlight() != null) { if (context.highlight() != null) {
context.parsedQuery(new ParsedQuery(query, ImmutableMap.<String, Filter>of())); context.parsedQuery(new ParsedQuery(query, ImmutableMap.<String, Filter>of()));
context.hitContext().cache().clear(); context.hitContext().cache().clear();
} }
if (isNestedDoc) { if (isNestedDoc) {
searcher.search(query, NonNestedDocsFilter.INSTANCE, collector); Lucene.exists(searcher, query, NonNestedDocsFilter.INSTANCE, collector);
} else { } else {
searcher.search(query, collector); Lucene.exists(searcher, query, collector);
} }
if (collector.exists()) { if (collector.exists()) {
if (!limit || counter < size) { if (!limit || counter < size) {
@ -259,11 +258,10 @@ abstract class QueryCollector extends Collector {
} }
// run the query // run the query
try { try {
collector.reset();
if (isNestedDoc) { if (isNestedDoc) {
searcher.search(query, NonNestedDocsFilter.INSTANCE, collector); Lucene.exists(searcher, query, NonNestedDocsFilter.INSTANCE, collector);
} else { } else {
searcher.search(query, collector); Lucene.exists(searcher, query, collector);
} }
if (collector.exists()) { if (collector.exists()) {
topDocsCollector.collect(doc); topDocsCollector.collect(doc);
@ -323,15 +321,14 @@ abstract class QueryCollector extends Collector {
} }
// run the query // run the query
try { try {
collector.reset();
if (context.highlight() != null) { if (context.highlight() != null) {
context.parsedQuery(new ParsedQuery(query, ImmutableMap.<String, Filter>of())); context.parsedQuery(new ParsedQuery(query, ImmutableMap.<String, Filter>of()));
context.hitContext().cache().clear(); context.hitContext().cache().clear();
} }
if (isNestedDoc) { if (isNestedDoc) {
searcher.search(query, NonNestedDocsFilter.INSTANCE, collector); Lucene.exists(searcher, query, NonNestedDocsFilter.INSTANCE, collector);
} else { } else {
searcher.search(query, collector); Lucene.exists(searcher, query, collector);
} }
if (collector.exists()) { if (collector.exists()) {
if (!limit || counter < size) { if (!limit || counter < size) {
@ -389,11 +386,10 @@ abstract class QueryCollector extends Collector {
} }
// run the query // run the query
try { try {
collector.reset();
if (isNestedDoc) { if (isNestedDoc) {
searcher.search(query, NonNestedDocsFilter.INSTANCE, collector); Lucene.exists(searcher, query, NonNestedDocsFilter.INSTANCE, collector);
} else { } else {
searcher.search(query, collector); Lucene.exists(searcher, query, collector);
} }
if (collector.exists()) { if (collector.exists()) {
counter++; counter++;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.rest.action.count; package org.elasticsearch.rest.action.count;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.count.CountRequest; import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
@ -33,6 +34,7 @@ import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestBuilderListener; import org.elasticsearch.rest.action.support.RestBuilderListener;
import static org.elasticsearch.action.count.CountRequest.DEFAULT_MIN_SCORE; import static org.elasticsearch.action.count.CountRequest.DEFAULT_MIN_SCORE;
import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TERMINATE_AFTER;
import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.action.support.RestActions.buildBroadcastShardsHeader; import static org.elasticsearch.rest.action.support.RestActions.buildBroadcastShardsHeader;
@ -76,12 +78,20 @@ public class RestCountAction extends BaseRestHandler {
countRequest.types(Strings.splitStringByCommaToArray(request.param("type"))); countRequest.types(Strings.splitStringByCommaToArray(request.param("type")));
countRequest.preference(request.param("preference")); countRequest.preference(request.param("preference"));
final int terminateAfter = request.paramAsInt("terminate_after", DEFAULT_TERMINATE_AFTER);
if (terminateAfter < 0) {
throw new ElasticsearchIllegalArgumentException("terminateAfter must be > 0");
} else if (terminateAfter > 0) {
countRequest.terminateAfter(terminateAfter);
}
client.count(countRequest, new RestBuilderListener<CountResponse>(channel) { client.count(countRequest, new RestBuilderListener<CountResponse>(channel) {
@Override @Override
public RestResponse buildResponse(CountResponse response, XContentBuilder builder) throws Exception { public RestResponse buildResponse(CountResponse response, XContentBuilder builder) throws Exception {
builder.startObject(); builder.startObject();
if (terminateAfter != DEFAULT_TERMINATE_AFTER) {
builder.field("terminated_early", response.terminatedEarly());
}
builder.field("count", response.getCount()); builder.field("count", response.getCount());
buildBroadcastShardsHeader(builder, response); buildBroadcastShardsHeader(builder, response);
builder.endObject(); builder.endObject();

View File

@ -37,6 +37,7 @@ import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import org.elasticsearch.search.Scroll; import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.source.FetchSourceContext; import org.elasticsearch.search.fetch.source.FetchSourceContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
@ -172,6 +173,18 @@ public class RestSearchAction extends BaseRestHandler {
} }
searchSourceBuilder.timeout(request.paramAsTime("timeout", null)); searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
} }
if (request.hasParam("terminate_after")) {
if (searchSourceBuilder == null) {
searchSourceBuilder = new SearchSourceBuilder();
}
int terminateAfter = request.paramAsInt("terminate_after",
SearchContext.DEFAULT_TERMINATE_AFTER);
if (terminateAfter < 0) {
throw new ElasticsearchIllegalArgumentException("terminateAfter must be > 0");
} else if (terminateAfter > 0) {
searchSourceBuilder.terminateAfter(terminateAfter);
}
}
String sField = request.param("fields"); String sField = request.param("fields");
if (sField != null) { if (sField != null) {

View File

@ -367,6 +367,16 @@ public class TopHitsContext extends SearchContext {
throw new UnsupportedOperationException("Not supported"); throw new UnsupportedOperationException("Not supported");
} }
@Override
public int terminateAfter() {
return context.terminateAfter();
}
@Override
public void terminateAfter(int terminateAfter) {
throw new UnsupportedOperationException("Not supported");
}
@Override @Override
public SearchContext minimumScore(float minimumScore) { public SearchContext minimumScore(float minimumScore) {
throw new UnsupportedOperationException("Not supported"); throw new UnsupportedOperationException("Not supported");

View File

@ -24,6 +24,7 @@ import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -40,6 +41,7 @@ import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.facet.FacetBuilder; import org.elasticsearch.search.facet.FacetBuilder;
import org.elasticsearch.search.fetch.source.FetchSourceContext; import org.elasticsearch.search.fetch.source.FetchSourceContext;
import org.elasticsearch.search.highlight.HighlightBuilder; import org.elasticsearch.search.highlight.HighlightBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.rescore.RescoreBuilder; import org.elasticsearch.search.rescore.RescoreBuilder;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortBuilders;
@ -97,6 +99,7 @@ public class SearchSourceBuilder implements ToXContent {
private Float minScore; private Float minScore;
private long timeoutInMillis = -1; private long timeoutInMillis = -1;
private int terminateAfter = SearchContext.DEFAULT_TERMINATE_AFTER;
private List<String> fieldNames; private List<String> fieldNames;
private List<String> fieldDataFields; private List<String> fieldDataFields;
@ -308,6 +311,17 @@ public class SearchSourceBuilder implements ToXContent {
return this; return this;
} }
/**
* An optional terminate_after to terminate the search after
* collecting <code>terminateAfter</code> documents
*/
public SearchSourceBuilder terminateAfter(int terminateAfter) {
if (terminateAfter <= 0) {
throw new ElasticsearchIllegalArgumentException("terminateAfter must be > 0");
}
this.terminateAfter = terminateAfter;
return this;
}
/** /**
* Adds a sort against the given field name and the sort ordering. * Adds a sort against the given field name and the sort ordering.
* *
@ -738,6 +752,10 @@ public class SearchSourceBuilder implements ToXContent {
builder.field("timeout", timeoutInMillis); builder.field("timeout", timeoutInMillis);
} }
if (terminateAfter != SearchContext.DEFAULT_TERMINATE_AFTER) {
builder.field("terminate_after", terminateAfter);
}
if (queryBuilder != null) { if (queryBuilder != null) {
builder.field("query"); builder.field("query");
queryBuilder.toXContent(builder, params); queryBuilder.toXContent(builder, params);

View File

@ -318,11 +318,19 @@ public class SearchPhaseController extends AbstractComponent {
long totalHits = 0; long totalHits = 0;
float maxScore = Float.NEGATIVE_INFINITY; float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false; boolean timedOut = false;
Boolean terminatedEarly = null;
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) { for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
QuerySearchResult result = entry.value.queryResult(); QuerySearchResult result = entry.value.queryResult();
if (result.searchTimedOut()) { if (result.searchTimedOut()) {
timedOut = true; timedOut = true;
} }
if (result.terminatedEarly() != null) {
if (terminatedEarly == null) {
terminatedEarly = result.terminatedEarly();
} else if (result.terminatedEarly()) {
terminatedEarly = true;
}
}
totalHits += result.topDocs().totalHits; totalHits += result.topDocs().totalHits;
if (!Float.isNaN(result.topDocs().getMaxScore())) { if (!Float.isNaN(result.topDocs().getMaxScore())) {
maxScore = Math.max(maxScore, result.topDocs().getMaxScore()); maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
@ -397,7 +405,7 @@ public class SearchPhaseController extends AbstractComponent {
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore); InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
return new InternalSearchResponse(searchHits, facets, aggregations, suggest, timedOut); return new InternalSearchResponse(searchHits, facets, aggregations, suggest, timedOut, terminatedEarly);
} }
} }

View File

@ -23,6 +23,7 @@ import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.*; import org.apache.lucene.search.*;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.MinimumScoreCollector; import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.MultiCollector; import org.elasticsearch.common.lucene.MultiCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector; import org.elasticsearch.common.lucene.search.FilteredCollector;
@ -138,9 +139,17 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
@Override @Override
public void search(List<AtomicReaderContext> leaves, Weight weight, Collector collector) throws IOException { public void search(List<AtomicReaderContext> leaves, Weight weight, Collector collector) throws IOException {
if (searchContext.timeoutInMillis() != -1) { final boolean timeoutSet = searchContext.timeoutInMillis() != -1;
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
if (timeoutSet) {
// TODO: change to use our own counter that uses the scheduler in ThreadPool // TODO: change to use our own counter that uses the scheduler in ThreadPool
collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), searchContext.timeoutInMillis()); // throws TimeLimitingCollector.TimeExceededException when timeout has reached
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeoutInMillis());
}
if (terminateAfterSet) {
// throws Lucene.EarlyTerminationException when given count is reached
collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
} }
if (currentState == Stage.MAIN_QUERY) { if (currentState == Stage.MAIN_QUERY) {
if (enableMainDocIdSetCollector) { if (enableMainDocIdSetCollector) {
@ -165,11 +174,18 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
// we only compute the doc id set once since within a context, we execute the same query always... // we only compute the doc id set once since within a context, we execute the same query always...
try { try {
if (searchContext.timeoutInMillis() != -1) { if (timeoutSet || terminateAfterSet) {
try { try {
super.search(leaves, weight, collector); super.search(leaves, weight, collector);
} catch (TimeLimitingCollector.TimeExceededException e) { } catch (TimeLimitingCollector.TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
searchContext.queryResult().searchTimedOut(true); searchContext.queryResult().searchTimedOut(true);
} catch (Lucene.EarlyTerminationException e) {
assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set";
searchContext.queryResult().terminatedEarly(true);
}
if (terminateAfterSet && searchContext.queryResult().terminatedEarly() == null) {
searchContext.queryResult().terminatedEarly(false);
} }
} else { } else {
super.search(leaves, weight, collector); super.search(leaves, weight, collector);
@ -205,4 +221,4 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
searchContext.clearReleasables(Lifetime.COLLECTION); searchContext.clearReleasables(Lifetime.COLLECTION);
} }
} }
} }

View File

@ -117,6 +117,9 @@ public class DefaultSearchContext extends SearchContext {
// timeout in millis // timeout in millis
private long timeoutInMillis = -1; private long timeoutInMillis = -1;
// terminate after count
private int terminateAfter = DEFAULT_TERMINATE_AFTER;
private List<String> groupStats; private List<String> groupStats;
@ -471,6 +474,16 @@ public class DefaultSearchContext extends SearchContext {
this.timeoutInMillis = timeoutInMillis; this.timeoutInMillis = timeoutInMillis;
} }
@Override
public int terminateAfter() {
return terminateAfter;
}
@Override
public void terminateAfter(int terminateAfter) {
this.terminateAfter = terminateAfter;
}
public SearchContext minimumScore(float minimumScore) { public SearchContext minimumScore(float minimumScore) {
this.minimumScore = minimumScore; this.minimumScore = minimumScore;
return this; return this;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.internal; package org.elasticsearch.search.internal;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
@ -41,7 +42,7 @@ import static org.elasticsearch.search.internal.InternalSearchHits.readSearchHit
public class InternalSearchResponse implements Streamable, ToXContent { public class InternalSearchResponse implements Streamable, ToXContent {
public static InternalSearchResponse empty() { public static InternalSearchResponse empty() {
return new InternalSearchResponse(InternalSearchHits.empty(), null, null, null, false); return new InternalSearchResponse(InternalSearchHits.empty(), null, null, null, false, null);
} }
private InternalSearchHits hits; private InternalSearchHits hits;
@ -54,21 +55,28 @@ public class InternalSearchResponse implements Streamable, ToXContent {
private boolean timedOut; private boolean timedOut;
private Boolean terminatedEarly = null;
private InternalSearchResponse() { private InternalSearchResponse() {
} }
public InternalSearchResponse(InternalSearchHits hits, InternalFacets facets, InternalAggregations aggregations, Suggest suggest, boolean timedOut) { public InternalSearchResponse(InternalSearchHits hits, InternalFacets facets, InternalAggregations aggregations, Suggest suggest, boolean timedOut, Boolean terminatedEarly) {
this.hits = hits; this.hits = hits;
this.facets = facets; this.facets = facets;
this.aggregations = aggregations; this.aggregations = aggregations;
this.suggest = suggest; this.suggest = suggest;
this.timedOut = timedOut; this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
} }
public boolean timedOut() { public boolean timedOut() {
return this.timedOut; return this.timedOut;
} }
public Boolean terminatedEarly() {
return this.terminatedEarly;
}
public SearchHits hits() { public SearchHits hits() {
return hits; return hits;
} }
@ -119,6 +127,10 @@ public class InternalSearchResponse implements Streamable, ToXContent {
suggest = Suggest.readSuggest(Suggest.Fields.SUGGEST, in); suggest = Suggest.readSuggest(Suggest.Fields.SUGGEST, in);
} }
timedOut = in.readBoolean(); timedOut = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminatedEarly = in.readOptionalBoolean();
}
} }
@Override @Override
@ -143,5 +155,10 @@ public class InternalSearchResponse implements Streamable, ToXContent {
suggest.writeTo(out); suggest.writeTo(out);
} }
out.writeBoolean(timedOut); out.writeBoolean(timedOut);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeOptionalBoolean(terminatedEarly);
}
} }
} }

View File

@ -72,6 +72,7 @@ import java.util.List;
public abstract class SearchContext implements Releasable { public abstract class SearchContext implements Releasable {
private static ThreadLocal<SearchContext> current = new ThreadLocal<>(); private static ThreadLocal<SearchContext> current = new ThreadLocal<>();
public final static int DEFAULT_TERMINATE_AFTER = 0;
public static void setCurrent(SearchContext value) { public static void setCurrent(SearchContext value) {
current.set(value); current.set(value);
@ -212,6 +213,10 @@ public abstract class SearchContext implements Releasable {
public abstract void timeoutInMillis(long timeoutInMillis); public abstract void timeoutInMillis(long timeoutInMillis);
public abstract int terminateAfter();
public abstract void terminateAfter(int terminateAfter);
public abstract SearchContext minimumScore(float minimumScore); public abstract SearchContext minimumScore(float minimumScore);
public abstract Float minimumScore(); public abstract Float minimumScore();

View File

@ -79,6 +79,7 @@ public class QueryPhase implements SearchPhase {
.put("min_score", new MinScoreParseElement()) .put("min_score", new MinScoreParseElement())
.put("minScore", new MinScoreParseElement()) .put("minScore", new MinScoreParseElement())
.put("timeout", new TimeoutParseElement()) .put("timeout", new TimeoutParseElement())
.put("terminate_after", new TerminateAfterParseElement())
.putAll(facetPhase.parseElements()) .putAll(facetPhase.parseElements())
.putAll(aggregationPhase.parseElements()) .putAll(aggregationPhase.parseElements())
.putAll(suggestPhase.parseElements()) .putAll(suggestPhase.parseElements())

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.query; package org.elasticsearch.search.query;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
@ -49,6 +50,7 @@ public class QuerySearchResult extends TransportResponse implements QuerySearchR
private InternalAggregations aggregations; private InternalAggregations aggregations;
private Suggest suggest; private Suggest suggest;
private boolean searchTimedOut; private boolean searchTimedOut;
private Boolean terminatedEarly = null;
public QuerySearchResult() { public QuerySearchResult() {
@ -90,6 +92,14 @@ public class QuerySearchResult extends TransportResponse implements QuerySearchR
return searchTimedOut; return searchTimedOut;
} }
public void terminatedEarly(boolean terminatedEarly) {
this.terminatedEarly = terminatedEarly;
}
public Boolean terminatedEarly() {
return this.terminatedEarly;
}
public TopDocs topDocs() { public TopDocs topDocs() {
return topDocs; return topDocs;
} }
@ -164,6 +174,9 @@ public class QuerySearchResult extends TransportResponse implements QuerySearchR
suggest = Suggest.readSuggest(Suggest.Fields.SUGGEST, in); suggest = Suggest.readSuggest(Suggest.Fields.SUGGEST, in);
} }
searchTimedOut = in.readBoolean(); searchTimedOut = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminatedEarly = in.readOptionalBoolean();
}
} }
@Override @Override
@ -193,5 +206,8 @@ public class QuerySearchResult extends TransportResponse implements QuerySearchR
suggest.writeTo(out); suggest.writeTo(out);
} }
out.writeBoolean(searchTimedOut); out.writeBoolean(searchTimedOut);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeOptionalBoolean(terminatedEarly);
}
} }
} }

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.query;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.internal.SearchContext;
/**
* Parser element for 'terminate_after'
*/
public class TerminateAfterParseElement implements SearchParseElement {
@Override
public void parse(XContentParser parser, SearchContext context) throws Exception {
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.VALUE_NUMBER) {
int terminateAfterCount = parser.intValue();
if (terminateAfterCount <= 0) {
throw new ElasticsearchIllegalArgumentException("terminateAfter must be > 0");
}
context.terminateAfter(parser.intValue());
}
}
}

View File

@ -169,12 +169,14 @@ public final class PhraseSuggester extends Suggester<PhraseSuggestionContext> {
req = client.prepareSearch() req = client.prepareSearch()
.setPreference(suggestions.getPreference()) .setPreference(suggestions.getPreference())
.setQuery(QueryBuilders.constantScoreQuery(FilterBuilders.bytesFilter(querySource))) .setQuery(QueryBuilders.constantScoreQuery(FilterBuilders.bytesFilter(querySource)))
.setSearchType(SearchType.COUNT); .setSearchType(SearchType.COUNT)
.setTerminateAfter(1);
} else { } else {
req = client.prepareSearch() req = client.prepareSearch()
.setPreference(suggestions.getPreference()) .setPreference(suggestions.getPreference())
.setQuery(querySource) .setQuery(querySource)
.setSearchType(SearchType.COUNT); .setSearchType(SearchType.COUNT)
.setTerminateAfter(1);
} }
multiSearchRequestBuilder.add(req); multiSearchRequestBuilder.add(req);
} }

View File

@ -20,13 +20,18 @@
package org.elasticsearch.count.simple; package org.elasticsearch.count.simple;
import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
@ -113,6 +118,45 @@ public class SimpleCountTests extends ElasticsearchIntegrationTest {
assertHitCount(countResponse, 2l); assertHitCount(countResponse, 2l);
} }
@Test
public void simpleCountEarlyTerminationTests() throws Exception {
// set up one shard only to test early termination
prepareCreate("test").setSettings(
SETTING_NUMBER_OF_SHARDS, 1,
SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen();
int max = randomIntBetween(3, 29);
List<IndexRequestBuilder> docbuilders = new ArrayList<>(max);
for (int i = 1; i <= max; i++) {
String id = String.valueOf(i);
docbuilders.add(client().prepareIndex("test", "type1", id).setSource("field", "2010-01-"+ id +"T02:00"));
}
indexRandom(true, docbuilders);
ensureGreen();
refresh();
String upperBound = "2010-01-" + String.valueOf(max+1) + "||+2d";
String lowerBound = "2009-12-01||+2d";
// sanity check
CountResponse countResponse = client().prepareCount("test").setQuery(QueryBuilders.rangeQuery("field").gte(lowerBound).lte(upperBound)).execute().actionGet();
assertHitCount(countResponse, max);
// threshold <= actual count
for (int i = 1; i <= max; i++) {
countResponse = client().prepareCount("test").setQuery(QueryBuilders.rangeQuery("field").gte(lowerBound).lte(upperBound)).setTerminateAfter(i).execute().actionGet();
assertHitCount(countResponse, i);
assertTrue(countResponse.terminatedEarly());
}
// threshold > actual count
countResponse = client().prepareCount("test").setQuery(QueryBuilders.rangeQuery("field").gte(lowerBound).lte(upperBound)).setTerminateAfter(max + randomIntBetween(1, max)).execute().actionGet();
assertHitCount(countResponse, max);
assertFalse(countResponse.terminatedEarly());
}
@Test @Test
public void localDependentDateTests() throws Exception { public void localDependentDateTests() throws Exception {
assertAcked(prepareCreate("test") assertAcked(prepareCreate("test")

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.simple; package org.elasticsearch.search.simple;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -28,8 +29,12 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
@ -200,4 +205,43 @@ public class SimpleSearchTests extends ElasticsearchIntegrationTest {
} }
} }
@Test
public void simpleTerminateAfterCountTests() throws Exception {
prepareCreate("test").setSettings(
SETTING_NUMBER_OF_SHARDS, 1,
SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen();
int max = randomIntBetween(3, 29);
List<IndexRequestBuilder> docbuilders = new ArrayList<>(max);
for (int i = 1; i <= max; i++) {
String id = String.valueOf(i);
docbuilders.add(client().prepareIndex("test", "type1", id).setSource("field", "2010-01-"+ id +"T02:00"));
}
indexRandom(true, docbuilders);
ensureGreen();
refresh();
String upperBound = "2010-01-" + String.valueOf(max+1) + "||+2d";
String lowerBound = "2009-12-01||+2d";
SearchResponse searchResponse;
for (int i = 1; i <= max; i++) {
searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.rangeQuery("field").gte(lowerBound).lte(upperBound))
.setTerminateAfter(i).execute().actionGet();
assertHitCount(searchResponse, (long)i);
assertTrue(searchResponse.isTerminatedEarly());
}
searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.rangeQuery("field").gte(lowerBound).lte(upperBound))
.setTerminateAfter(2 * max).execute().actionGet();
assertHitCount(searchResponse, max);
assertFalse(searchResponse.isTerminatedEarly());
}
} }

View File

@ -76,6 +76,7 @@ public class TestSearchContext extends SearchContext {
ContextIndexSearcher searcher; ContextIndexSearcher searcher;
int size; int size;
private int terminateAfter = DEFAULT_TERMINATE_AFTER;
public TestSearchContext(ThreadPool threadPool, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService, FilterCache filterCache, IndexFieldDataService indexFieldDataService) { public TestSearchContext(ThreadPool threadPool, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService, FilterCache filterCache, IndexFieldDataService indexFieldDataService) {
this.cacheRecycler = cacheRecycler; this.cacheRecycler = cacheRecycler;
@ -351,6 +352,16 @@ public class TestSearchContext extends SearchContext {
public void timeoutInMillis(long timeoutInMillis) { public void timeoutInMillis(long timeoutInMillis) {
} }
@Override
public int terminateAfter() {
return terminateAfter;
}
@Override
public void terminateAfter(int terminateAfter) {
this.terminateAfter = terminateAfter;
}
@Override @Override
public SearchContext minimumScore(float minimumScore) { public SearchContext minimumScore(float minimumScore) {
return null; return null;