Merge pull request #12211 from jasontedor/features/12149

Add global search timeout setting
This commit is contained in:
Jason Tedor 2015-07-17 09:23:16 -04:00
commit e379d20ae6
9 changed files with 64 additions and 10 deletions

View File

@ -46,6 +46,7 @@ import org.elasticsearch.index.query.QueryParsingException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
@ -172,7 +173,8 @@ public class TransportValidateQueryAction extends TransportBroadcastAction<Valid
DefaultSearchContext searchContext = new DefaultSearchContext(0,
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
null, searcher, indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher,
SearchService.NO_TIMEOUT
);
SearchContext.setCurrent(searchContext);
try {

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
@ -153,7 +154,9 @@ public class TransportExistsAction extends TransportBroadcastAction<ExistsReques
SearchContext context = new DefaultSearchContext(0,
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
shardTarget, indexShard.acquireSearcher("exists"), indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher);
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher,
SearchService.NO_TIMEOUT
);
SearchContext.setCurrent(context);
try {

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
@ -114,7 +115,8 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
0, new ShardSearchLocalRequest(new String[]{request.type()}, request.nowInMillis, request.filteringAlias()),
null, result.searcher(), indexService, indexShard,
scriptService, pageCacheRecycler,
bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher
bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher,
SearchService.NO_TIMEOUT
);
SearchContext.setCurrent(context);

View File

@ -34,6 +34,7 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.threadpool.ThreadPool;
/**
@ -100,6 +101,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(SearchService.DEFAULT_SEARCH_TIMEOUT, Validator.TIMEOUT);
}
public void addDynamicSettings(String... settings) {

View File

@ -57,6 +57,25 @@ public interface Validator {
}
};
public static final Validator TIMEOUT = new Validator() {
@Override
public String validate(String setting, String value) {
try {
if (value == null) {
throw new NullPointerException("value must not be null");
}
TimeValue timeValue = TimeValue.parseTimeValue(value, null, setting);
assert timeValue != null;
if (timeValue.millis() < 0 && timeValue.millis() != -1) {
return "cannot parse value [" + value + "] as a timeout";
}
} catch (ElasticsearchParseException ex) {
return ex.getMessage();
}
return null;
}
};
public static final Validator TIME_NON_NEGATIVE = new Validator() {
@Override
public String validate(String setting, String value) {

View File

@ -75,6 +75,7 @@ import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
import org.elasticsearch.indices.IndicesWarmer.WarmerContext;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script.ScriptParseException;
import org.elasticsearch.script.ScriptContext;
@ -101,6 +102,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.Strings.hasLength;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
/**
@ -111,7 +113,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
public static final String NORMS_LOADING_KEY = "index.norms.loading";
public static final String DEFAULT_KEEPALIVE_KEY = "search.default_keep_alive";
public static final String KEEPALIVE_INTERVAL_KEY = "search.keep_alive_interval";
public static final String DEFAULT_SEARCH_TIMEOUT = "search.default_search_timeout";
public static final TimeValue NO_TIMEOUT = timeValueMillis(-1);
private final ThreadPool threadPool;
@ -137,6 +141,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final long defaultKeepAlive;
private volatile TimeValue defaultSearchTimeout;
private final ScheduledFuture<?> keepAliveReaper;
private final AtomicLong idGenerator = new AtomicLong();
@ -148,7 +154,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final ParseFieldMatcher parseFieldMatcher;
@Inject
public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService,IndicesWarmer indicesWarmer, ThreadPool threadPool,
public SearchService(Settings settings, NodeSettingsService nodeSettingsService, ClusterService clusterService, IndicesService indicesService,IndicesWarmer indicesWarmer, ThreadPool threadPool,
ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase,
IndicesRequestCache indicesQueryCache) {
super(settings);
@ -202,6 +208,20 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
this.indicesWarmer.addListener(new NormsWarmer());
this.indicesWarmer.addListener(new FieldDataWarmer());
this.indicesWarmer.addListener(new SearchWarmer());
defaultSearchTimeout = settings.getAsTime(DEFAULT_SEARCH_TIMEOUT, NO_TIMEOUT);
nodeSettingsService.addListener(new SearchSettingsListener());
}
class SearchSettingsListener implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
final TimeValue maybeNewDefaultSearchTimeout = settings.getAsTime(SearchService.DEFAULT_SEARCH_TIMEOUT, SearchService.this.defaultSearchTimeout);
if (!maybeNewDefaultSearchTimeout.equals(SearchService.this.defaultSearchTimeout)) {
logger.info("updating [{}] from [{}] to [{}]", SearchService.DEFAULT_SEARCH_TIMEOUT, SearchService.this.defaultSearchTimeout, maybeNewDefaultSearchTimeout);
SearchService.this.defaultSearchTimeout = maybeNewDefaultSearchTimeout;
}
}
}
protected void putContext(SearchContext context) {
@ -619,7 +639,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher);
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher, defaultSearchTimeout);
SearchContext.setCurrent(context);
try {
context.scroll(request.scroll());

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.CachedDfSource;
import org.elasticsearch.search.internal.SearchContext.Lifetime;
@ -139,7 +140,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
public void search(Query query, Collector collector) throws IOException {
// Wrap the caller's collector with various wrappers e.g. those used to siphon
// matches off for aggregation or to impose a time-limit on collection.
final boolean timeoutSet = searchContext.timeoutInMillis() != -1;
final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis();
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
if (timeoutSet) {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.lucene.search.function.BoostScoreFunction;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.analysis.AnalysisService;
@ -90,7 +91,7 @@ public class DefaultSearchContext extends SearchContext {
private ScanContext scanContext;
private float queryBoost = 1.0f;
// timeout in millis
private long timeoutInMillis = -1;
private long timeoutInMillis;
// terminate after count
private int terminateAfter = DEFAULT_TERMINATE_AFTER;
private List<String> groupStats;
@ -127,7 +128,9 @@ public class DefaultSearchContext extends SearchContext {
public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, IndexService indexService, IndexShard indexShard,
ScriptService scriptService, PageCacheRecycler pageCacheRecycler,
BigArrays bigArrays, Counter timeEstimateCounter, ParseFieldMatcher parseFieldMatcher) {
BigArrays bigArrays, Counter timeEstimateCounter, ParseFieldMatcher parseFieldMatcher,
TimeValue timeout
) {
super(parseFieldMatcher);
this.id = id;
this.request = request;
@ -145,6 +148,7 @@ public class DefaultSearchContext extends SearchContext {
this.indexService = indexService;
this.searcher = new ContextIndexSearcher(this, engineSearcher);
this.timeEstimateCounter = timeEstimateCounter;
this.timeoutInMillis = timeout.millis();
}
@Override

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsPhase;
@ -52,10 +53,10 @@ public class MockSearchService extends SearchService {
}
@Inject
public MockSearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesWarmer indicesWarmer,
public MockSearchService(Settings settings, NodeSettingsService nodeSettingsService, ClusterService clusterService, IndicesService indicesService, IndicesWarmer indicesWarmer,
ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays,
DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, IndicesRequestCache indicesQueryCache) {
super(settings, clusterService, indicesService, indicesWarmer, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase,
super(settings, nodeSettingsService, clusterService, indicesService, indicesWarmer, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase,
queryPhase, fetchPhase, indicesQueryCache);
}