Allow to provide timeout parameter in request body (as well as URI parameter), closes #1604.

This commit is contained in:
Shay Banon 2012-01-12 14:19:21 +02:00
parent a380e0e169
commit 04a138db5d
11 changed files with 157 additions and 67 deletions

View File

@ -44,7 +44,6 @@ import java.util.Arrays;
import java.util.Map;
import static org.elasticsearch.action.Actions.addValidationError;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.search.Scroll.readScroll;
/**
@ -89,8 +88,6 @@ public class SearchRequest implements ActionRequest {
private String[] types = Strings.EMPTY_ARRAY;
private TimeValue timeout;
private boolean listenerThreaded = false;
private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD;
@ -501,28 +498,6 @@ public class SearchRequest implements ActionRequest {
return scroll(new Scroll(TimeValue.parseTimeValue(keepAlive, null)));
}
/**
* An optional timeout to control how long search is allowed to take.
*/
public TimeValue timeout() {
return timeout;
}
/**
* An optional timeout to control how long search is allowed to take.
*/
public SearchRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
/**
* An optional timeout to control how long search is allowed to take.
*/
public SearchRequest timeout(String timeout) {
return timeout(TimeValue.parseTimeValue(timeout, null));
}
@Override
public void readFrom(StreamInput in) throws IOException {
operationThreading = SearchOperationThreading.fromId(in.readByte());
@ -546,9 +521,6 @@ public class SearchRequest implements ActionRequest {
if (in.readBoolean()) {
scroll = readScroll(in);
}
if (in.readBoolean()) {
timeout = readTimeValue(in);
}
BytesHolder bytes = in.readBytesReference();
sourceUnsafe = false;
@ -606,12 +578,6 @@ public class SearchRequest implements ActionRequest {
out.writeBoolean(true);
scroll.writeTo(out);
}
if (timeout == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
timeout.writeTo(out);
}
out.writeBytesHolder(source, sourceOffset, sourceLength);
out.writeBytesHolder(extraSource, extraSourceOffset, extraSourceLength);
out.writeVInt(types.length);

View File

@ -66,7 +66,6 @@ public abstract class TransportSearchHelper {
internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength());
internalRequest.extraSource(request.extraSource(), request.extraSourceOffset(), request.extraSourceLength());
internalRequest.scroll(request.scroll());
internalRequest.timeout(request.timeout());
internalRequest.filteringAliases(filteringAliases);
internalRequest.types(request.types());
internalRequest.nowInMillis(nowInMillis);

View File

@ -115,7 +115,7 @@ public class SearchRequestBuilder extends BaseRequestBuilder<SearchRequest, Sear
* An optional timeout to control how long search is allowed to take.
*/
public SearchRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
sourceBuilder().timeout(timeout);
return this;
}
@ -123,7 +123,7 @@ public class SearchRequestBuilder extends BaseRequestBuilder<SearchRequest, Sear
* An optional timeout to control how long search is allowed to take.
*/
public SearchRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
sourceBuilder().timeout(timeout);
return this;
}

View File

@ -137,7 +137,6 @@ public class RestSearchAction extends BaseRestHandler {
searchRequest.scroll(new Scroll(parseTimeValue(scroll, null)));
}
searchRequest.timeout(request.paramAsTime("timeout", null));
searchRequest.types(RestActions.splitTypes(request.param("type")));
searchRequest.queryHint(request.param("query_hint"));
searchRequest.routing(request.param("routing"));
@ -198,6 +197,12 @@ public class RestSearchAction extends BaseRestHandler {
}
searchSourceBuilder.version(request.paramAsBooleanOptional("version", null));
}
if (request.hasParam("timeout")) {
if (searchSourceBuilder == null) {
searchSourceBuilder = new SearchSourceBuilder();
}
searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
}
String sField = request.param("fields");
if (sField != null) {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -93,6 +94,8 @@ public class SearchSourceBuilder implements ToXContent {
private Float minScore;
private long timeoutInMillis = -1;
private List<String> fieldNames;
private List<ScriptField> scriptFields;
private List<PartialField> partialFields;
@ -276,6 +279,22 @@ public class SearchSourceBuilder implements ToXContent {
return this;
}
/**
* An optional timeout to control how long search is allowed to take.
*/
public SearchSourceBuilder timeout(TimeValue timeout) {
this.timeoutInMillis = timeout.millis();
return this;
}
/**
* An optional timeout to control how long search is allowed to take.
*/
public SearchSourceBuilder timeout(String timeout) {
this.timeoutInMillis = TimeValue.parseTimeValue(timeout, null).millis();
return this;
}
/**
* Adds a sort against the given field name and the sort ordering.
*
@ -564,6 +583,10 @@ public class SearchSourceBuilder implements ToXContent {
builder.field("size", size);
}
if (timeoutInMillis != -1) {
builder.field("timeout", timeoutInMillis);
}
if (queryBuilder != null) {
builder.field("query");
queryBuilder.toXContent(builder, params);

View File

@ -158,9 +158,9 @@ public class ContextIndexSearcher extends ExtendedIndexSearcher {
// since that is where the filter should only work
collector = new FilteredCollector(collector, searchContext.parsedFilter());
}
if (searchContext.timeout() != null) {
if (searchContext.timeoutInMillis() != -1) {
// TODO: change to use our own counter that uses the scheduler in ThreadPool
collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), searchContext.timeout().millis());
collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), searchContext.timeoutInMillis());
}
if (scopeCollectors != null) {
List<Collector> collectors = scopeCollectors.get(processingScope);
@ -185,8 +185,7 @@ public class ContextIndexSearcher extends ExtendedIndexSearcher {
}
// we only compute the doc id set once since within a context, we execute the same query always...
if (searchContext.timeout() != null) {
searchContext.queryResult().searchTimedOut(false);
if (searchContext.timeoutInMillis() != -1) {
try {
super.search(weight, combinedFilter, collector);
} catch (TimeLimitingCollector.TimeExceededException e) {

View File

@ -26,12 +26,10 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.Scroll;
import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.search.Scroll.readScroll;
/**
@ -63,8 +61,6 @@ public class InternalSearchRequest implements Streamable {
private Scroll scroll;
private TimeValue timeout;
private String[] types = Strings.EMPTY_ARRAY;
private String[] filteringAliases;
@ -169,15 +165,6 @@ public class InternalSearchRequest implements Streamable {
return this;
}
public TimeValue timeout() {
return timeout;
}
public InternalSearchRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
public String[] filteringAliases() {
return filteringAliases;
}
@ -203,9 +190,6 @@ public class InternalSearchRequest implements Streamable {
if (in.readBoolean()) {
scroll = readScroll(in);
}
if (in.readBoolean()) {
timeout = readTimeValue(in);
}
BytesHolder bytes = in.readBytesReference();
source = bytes.bytes();
@ -248,12 +232,6 @@ public class InternalSearchRequest implements Streamable {
out.writeBoolean(true);
scroll.writeTo(out);
}
if (timeout == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
timeout.writeTo(out);
}
out.writeBytesHolder(source, sourceOffset, sourceLength);
out.writeBytesHolder(extraSource, extraSourceOffset, extraSourceLength);
out.writeVInt(types.length);

View File

@ -28,7 +28,6 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.cache.filter.FilterCache;
@ -112,6 +111,9 @@ public class SearchContext implements Releasable {
private float queryBoost = 1.0f;
// timeout in millis
private long timeoutInMillis = -1;
private List<String> groupStats;
@ -337,8 +339,12 @@ public class SearchContext implements Releasable {
return indexService.cache().idCache();
}
public TimeValue timeout() {
return request.timeout();
public long timeoutInMillis() {
return timeoutInMillis;
}
public void timeoutInMillis(long timeoutInMillis) {
this.timeoutInMillis = timeoutInMillis;
}
public SearchContext minimumScore(float minimumScore) {

View File

@ -68,6 +68,7 @@ public class QueryPhase implements SearchPhase {
.put("track_scores", new TrackScoresParseElement())
.put("min_score", new MinScoreParseElement())
.put("minScore", new MinScoreParseElement())
.put("timeout", new TimeoutParseElement())
.putAll(facetPhase.parseElements());
return parseElements.build();
}
@ -94,6 +95,7 @@ public class QueryPhase implements SearchPhase {
}
public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
searchContext.queryResult().searchTimedOut(false);
// set the filter on the searcher
if (searchContext.scopePhases() != null) {
// we have scoped queries, refresh the id cache

View File

@ -0,0 +1,41 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.query;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.internal.SearchContext;
/**
*/
public class TimeoutParseElement implements SearchParseElement {
@Override
public void parse(XContentParser parser, SearchContext context) throws Exception {
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.VALUE_NUMBER) {
context.timeoutInMillis(parser.longValue());
} else {
context.timeoutInMillis(TimeValue.parseTimeValue(parser.text(), null).millis());
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.search.timeout;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.FilterBuilders.scriptFilter;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class SearchTimeoutTests extends AbstractNodesTests {
private Client client;
@BeforeClass
public void createNodes() throws Exception {
Settings settings = settingsBuilder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build();
startNode("node1", settings);
client = client("node1");
}
@AfterClass
public void closeNodes() {
client.close();
closeAllNodes();
}
@Test
public void simpleTimeoutTest() throws Exception {
client.admin().indices().prepareDelete().execute().actionGet();
for (int i = 0; i < 10; i++) {
client.prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value").execute().actionGet();
}
client.admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse = client.prepareSearch("test")
.setTimeout("10ms")
.setQuery(filteredQuery(matchAllQuery(), scriptFilter("Thread.sleep(100); return true;")))
.execute().actionGet();
assertThat(searchResponse.timedOut(), equalTo(true));
}
}