Add heuristics to compute pre_filter_shard_size when unspecified (#53873) (#54007)

This commit changes the pre_filter_shard_size default from 128 to unspecified.
This allows to apply heuristics based on the request and the target indices when deciding
whether the can match phase should run or not. When unspecified, this pr runs the can match phase
automatically if one of these conditions is met:
  * The request targets more than 128 shards.
  * The request contains read-only indices.
  * The primary sort of the query targets an indexed field.
Users can opt-out from this behavior by setting the `pre_filter_shard_size` to a static value.

Closes #39835
This commit is contained in:
Jim Ferenczi 2020-03-24 02:05:15 +01:00 committed by GitHub
parent 4734c645f1
commit 9e3f7f4575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 290 additions and 129 deletions

View File

@ -426,7 +426,9 @@ final class RequestConverters {
params.withIndicesOptions(searchRequest.indicesOptions());
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
if (searchRequest.getPreFilterShardSize() != null) {
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
params.withMaxConcurrentShardRequests(searchRequest.getMaxConcurrentShardRequests());
if (searchRequest.requestCache() != null) {
params.withRequestCache(searchRequest.requestCache());

View File

@ -2043,7 +2043,9 @@ public class RequestConvertersTests extends ESTestCase {
if (randomBoolean()) {
searchRequest.setPreFilterShardSize(randomIntBetween(2, Integer.MAX_VALUE));
}
expectedParams.put("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
if (searchRequest.getPreFilterShardSize() != null) {
expectedParams.put("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
}
public static void setRandomIndicesOptions(Consumer<IndicesOptions> setter, Supplier<IndicesOptions> getter,

View File

@ -74,8 +74,8 @@ POST /twitter/_forcemerge?max_num_segments=1
== Searching a frozen index
Frozen indices are throttled in order to limit memory consumptions per node. The number of concurrently loaded frozen indices per node is
limited by the number of threads in the <<search-throttled,search_throttled>> threadpool, which is `1` by default.
Search requests will not be executed against frozen indices by default, even if a frozen index is named explicitly. This is
limited by the number of threads in the <<search-throttled,search_throttled>> threadpool, which is `1` by default.
Search requests will not be executed against frozen indices by default, even if a frozen index is named explicitly. This is
to prevent accidental slowdowns by targeting a frozen index by mistake. To include frozen indices a search request must be executed with
the query parameter `ignore_throttled=false`.
@ -85,15 +85,6 @@ GET /twitter/_search?q=user:kimchy&ignore_throttled=false
--------------------------------------------------
// TEST[setup:twitter]
[IMPORTANT]
================================
While frozen indices are slow to search, they can be pre-filtered efficiently. The request parameter `pre_filter_shard_size` specifies
a threshold that, when exceeded, will enforce a round-trip to pre-filter search shards that cannot possibly match.
This filter phase can limit the number of shards significantly. For instance, if a date range filter is applied, then all indices (frozen or unfrozen) that do not contain documents within the date range can be skipped efficiently.
The default value for `pre_filter_shard_size` is `128` but it's recommended to set it to `1` when searching frozen indices. There is no
significant overhead associated with this pre-filter phase.
================================
[role="xpack"]
[testenv="basic"]
[[monitoring_frozen_indices]]

View File

@ -23,7 +23,7 @@ GET twitter/_msearch
==== {api-description-title}
The multi search API executes several searches from a single API request.
The format of the request is similar to the bulk API format and makes use
The format of the request is similar to the bulk API format and makes use
of the newline delimited JSON (NDJSON) format.
The structure is as follows:
@ -85,7 +85,7 @@ Maximum number of concurrent searches the multi search API can execute.
--
(Optional, integer)
Maximum number of concurrent shard requests that each sub-search request
executes per node. Defaults to `5`.
executes per node. Defaults to `5`.
You can use this parameter to prevent a request from overloading a cluster. For
example, a default request hits all indices in a cluster. This could cause shard
@ -103,8 +103,13 @@ Defines a threshold that enforces a pre-filter roundtrip to prefilter search
shards based on query rewriting if the number of shards the search request
expands to exceeds the threshold. This filter roundtrip can limit the number of
shards significantly if for instance a shard can not match any documents based
on it's rewrite method i.e., if date filters are mandatory to match but the
shard bounds and the query are disjoint. Defaults to `128`.
on its rewrite method i.e., if date filters are mandatory to match but the
shard bounds and the query are disjoint.
When unspecified, the pre-filter phase is executed if any of these
conditions is met:
- The request targets more than `128` shards.
- The request targets one or more read-only index.
- The primary sort of the query targets an indexed field.
`rest_total_hits_as_int`::
(Optional, boolean)
@ -121,7 +126,7 @@ to a specific shard.
--
(Optional, string)
Indicates whether global term and document frequencies should be used when
scoring returned documents.
scoring returned documents.
Options are:
@ -134,7 +139,7 @@ This is usually faster but less accurate.
Documents are scored using global term and document frequencies across all
shards. This is usually slower but more accurate.
--
`typed_keys`::
(Optional, boolean)
Specifies whether aggregation and suggester names should be prefixed by their
@ -196,7 +201,7 @@ to a specific shard.
--
(Optional, string)
Indicates whether global term and document frequencies should be used when
scoring returned documents.
scoring returned documents.
Options are:
@ -234,18 +239,18 @@ Number of hits to return. Defaults to `10`.
==== {api-response-body-title}
`responses`::
(array) Includes the search response and status code for each search request
matching its order in the original multi search request. If there was a
complete failure for a specific search request, an object with `error` message
and corresponding status code will be returned in place of the actual search
(array) Includes the search response and status code for each search request
matching its order in the original multi search request. If there was a
complete failure for a specific search request, an object with `error` message
and corresponding status code will be returned in place of the actual search
response.
[[search-multi-search-api-example]]
==== {api-examples-title}
The header part includes which index / indices to search on, the `search_type`,
`preference`, and `routing`. The body includes the typical search body request
The header part includes which index / indices to search on, the `search_type`,
`preference`, and `routing`. The body includes the typical search body request
(including the `query`, `aggregations`, `from`, `size`, and so on).
[source,js]
@ -308,7 +313,7 @@ See <<url-access-control>>
==== Template support
Much like described in <<search-template>> for the _search resource, _msearch
also provides support for templates. Submit them like follows for inline
also provides support for templates. Submit them like follows for inline
templates:
[source,console]
@ -377,6 +382,6 @@ GET _msearch/template
[[multi-search-partial-responses]]
==== Partial responses
To ensure fast responses, the multi search API will respond with partial results
if one or more shards fail. See <<shard-failures, Shard failures>> for more
To ensure fast responses, the multi search API will respond with partial results
if one or more shards fail. See <<shard-failures, Shard failures>> for more
information.

View File

@ -25,7 +25,7 @@ GET /twitter/_search?q=tag:wow
[[search-search-api-desc]]
==== {api-description-title}
Allows you to execute a search query and get back search hits that match the
Allows you to execute a search query and get back search hits that match the
query. The query can either be provided using a simple
<<search-uri-request,query string as a parameter>>, or using a
<<search-request-body,request body>>.
@ -33,8 +33,8 @@ query. The query can either be provided using a simple
[[search-partial-responses]]
===== Partial responses
To ensure fast responses, the search API will respond with partial results if
one or more shards fail. See <<shard-failures, Shard failures>> for more
To ensure fast responses, the search API will respond with partial results if
one or more shards fail. See <<shard-failures, Shard failures>> for more
information.
[[search-search-api-path-params]]
@ -51,163 +51,167 @@ include::{docdir}/rest-api/common-parms.asciidoc[tag=allow-no-indices]
Defaults to `true`.
`allow_partial_search_results`::
(Optional, boolean) Indicates if an error should be returned if there is a
(Optional, boolean) Indicates if an error should be returned if there is a
partial search failure or timeout. Defaults to `true`.
`analyzer`::
(Optional, string) Defines the analyzer to use for the query string.
`analyze_wildcard`::
(Optional, boolean) If `true`, wildcard and prefix queries will also be
(Optional, boolean) If `true`, wildcard and prefix queries will also be
analyzed. Defaults to `false`.
`batched_reduce_size`::
(Optional, integer) The number of shard results that should be reduced at once
on the coordinating node. This value should be used as a protection mechanism
to reduce the memory overhead per search request if the potential number of
(Optional, integer) The number of shard results that should be reduced at once
on the coordinating node. This value should be used as a protection mechanism
to reduce the memory overhead per search request if the potential number of
shards in the request can be large. Defaults to `512`.
`ccs_minimize_roundtrips`::
(Optional, boolean) Indicates whether network round-trips should be minimized
(Optional, boolean) Indicates whether network round-trips should be minimized
as part of cross-cluster search requests execution. Defaults to `true`.
`default_operator`::
(Optional, string) The default operator for query string query (AND or OR).
(Optional, string) The default operator for query string query (AND or OR).
Defaults to `OR`.
`df`::
(Optional, string) Defines the field to use as default where no field prefix
(Optional, string) Defines the field to use as default where no field prefix
is given in the query string.
`docvalue_fields`::
(Optional, string) A comma-separated list of fields to return as the docvalue
(Optional, string) A comma-separated list of fields to return as the docvalue
representation of a field for each hit.
include::{docdir}/rest-api/common-parms.asciidoc[tag=expand-wildcards]
+
Defaults to `open`.
`explain`::
(Optional, boolean) If `true`, returns detailed information about score
(Optional, boolean) If `true`, returns detailed information about score
computation as part of a hit. Defaults to `false`.
`from`::
(Optional, integer) Defines the starting offset. Defaults to `0`.
`ignore_throttled`::
(Optional, boolean) If `true`, concrete, expanded or aliased indices will be
(Optional, boolean) If `true`, concrete, expanded or aliased indices will be
ignored when throttled. Defaults to `false`.
include::{docdir}/rest-api/common-parms.asciidoc[tag=index-ignore-unavailable]
`lenient`::
(Optional, boolean) If `true`, format-based query failures (such as
(Optional, boolean) If `true`, format-based query failures (such as
providing text to a numeric field) will be ignored. Defaults to `false`.
`max_concurrent_shard_requests`::
(Optional, integer) Defines the number of concurrent shard requests per node
this search executes concurrently. This value should be used to limit the
impact of the search on the cluster in order to limit the number of concurrent
(Optional, integer) Defines the number of concurrent shard requests per node
this search executes concurrently. This value should be used to limit the
impact of the search on the cluster in order to limit the number of concurrent
shard requests. Defaults to `5`.
`pre_filter_shard_size`::
(Optional, integer) Defines a threshold that enforces a pre-filter roundtrip
to prefilter search shards based on query rewriting if the number of shards
the search request expands to exceeds the threshold. This filter roundtrip can
limit the number of shards significantly if for instance a shard can not match
any documents based on it's rewrite method ie. if date filters are mandatory
to match but the shard bounds and the query are disjoint. Defaults to `128`.
(Optional, integer) Defines a threshold that enforces a pre-filter roundtrip
to prefilter search shards based on query rewriting if the number of shards
the search request expands to exceeds the threshold. This filter roundtrip can
limit the number of shards significantly if for instance a shard can not match
any documents based on its rewrite method ie. if date filters are mandatory
to match but the shard bounds and the query are disjoint.
When unspecified, the pre-filter phase is executed if any of these conditions is met:
- The request targets more than `128` shards.
- The request targets one or more read-only index.
- The primary sort of the query targets an indexed field.
`preference`::
(Optional, string) Specifies the node or shard the operation should be
(Optional, string) Specifies the node or shard the operation should be
performed on. Random by default.
`q`::
(Optional, string) Query in the Lucene query string syntax.
`request_cache`::
(Optional, boolean) If `true`, request cache will be used for this request.
(Optional, boolean) If `true`, request cache will be used for this request.
Defaults to index level settings.
`rest_total_hits_as_int`::
(Optional, boolean) Indicates whether hits.total should be rendered as an
(Optional, boolean) Indicates whether hits.total should be rendered as an
integer or an object in the rest search response. Defaults to `false`.
`routing`::
(Optional, <<time-units, time units>>) Specifies how long a consistent view of
(Optional, <<time-units, time units>>) Specifies how long a consistent view of
the index should be maintained for scrolled search.
`search_type`::
(Optional, string) Defines the type of the search operation. Available
(Optional, string) Defines the type of the search operation. Available
options:
* `query_then_fetch`
* `dfs_query_then_fetch`
`seq_no_primary_term`::
(Optional, boolean) If `true`, returns sequence number and primary term of the
(Optional, boolean) If `true`, returns sequence number and primary term of the
last modification of each hit.
`size`::
(Optional, integer) Defines the number of hits to return. Defaults to `10`.
`sort`::
(Optional, string) A comma-separated list of <field>:<direction> pairs.
`_source`::
(Optional, string) True or false to return the `_source` field or not, or a
(Optional, string) True or false to return the `_source` field or not, or a
list of fields to return.
`_source_excludes`::
(Optional, string) A list of fields to exclude from the returned `_source`
(Optional, string) A list of fields to exclude from the returned `_source`
field.
`_source_includes`::
(Optional, string) A list of fields to extract and return from the `_source`
(Optional, string) A list of fields to extract and return from the `_source`
field.
`stats`::
(Optional, string) Specific `tag` of the request for logging and statistical
(Optional, string) Specific `tag` of the request for logging and statistical
purposes.
`stored_fields`::
(Optional, string) A comma-separated list of stored fields to return as part
(Optional, string) A comma-separated list of stored fields to return as part
of a hit.
`suggest_field`::
(Optional, string) Specifies which field to use for suggestions.
`suggest_mode`::
(Optional, string) Specifies suggest mode. Defaults to `missing`. Available
(Optional, string) Specifies suggest mode. Defaults to `missing`. Available
options:
* `always`
* `missing`
* `popular`
`suggest_size`::
(Optional, integer) Defines how many suggestions to return in response.
`suggest_text`::
(Optional, string) The source text for which the suggestions should be
(Optional, string) The source text for which the suggestions should be
returned.
`terminate_after`::
(Optional, integer) The maximum number of documents to collect for each shard,
(Optional, integer) The maximum number of documents to collect for each shard,
upon reaching which the query execution will terminate early.
include::{docdir}/rest-api/common-parms.asciidoc[tag=timeout]
`track_scores`::
(Optional, boolean) If `true`, then calculates and returns scores even if they
(Optional, boolean) If `true`, then calculates and returns scores even if they
are not used for sorting.
`track_total_hits`::
(Optional, boolean) Indicates if the number of documents that match the query
(Optional, boolean) Indicates if the number of documents that match the query
should be tracked.
`typed_keys`::
(Optional, boolean) Specifies whether aggregation and suggester names should
(Optional, boolean) Specifies whether aggregation and suggester names should
be prefixed by their respective types in the response.
`version`::
(Optional, boolean) If `true`, returns document version as part of a hit.
@ -216,7 +220,7 @@ include::{docdir}/rest-api/common-parms.asciidoc[tag=timeout]
==== {api-request-body-title}
`query`::
(Optional, <<query-dsl,query object>>) Defines the search definition using the
(Optional, <<query-dsl,query object>>) Defines the search definition using the
<<query-dsl,Query DSL>>.

View File

@ -124,7 +124,7 @@ public class CCSDuelIT extends ESRestTestCase {
private static final String INDEX_NAME = "ccs_duel_index";
private static final String REMOTE_INDEX_NAME = "my_remote_cluster:" + INDEX_NAME;
private static final String[] TAGS = new String[]{"java", "xml", "sql", "html", "php", "ruby", "python", "perl"};
private static final String[] TAGS = new String[] {"java", "xml", "sql", "html", "php", "ruby", "python", "perl"};
private static RestHighLevelClient restHighLevelClient;
@ -435,6 +435,8 @@ public class CCSDuelIT extends ESRestTestCase {
public void testSortByFieldOneClusterHasNoResults() throws Exception {
assumeMultiClusterSetup();
SearchRequest searchRequest = initSearchRequest();
// set to a value greater than the number of shards to avoid differences due to the skipping of shards
searchRequest.setPreFilterShardSize(128);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
boolean onlyRemote = randomBoolean();
sourceBuilder.query(new TermQueryBuilder("_index", onlyRemote ? REMOTE_INDEX_NAME : INDEX_NAME));

View File

@ -66,10 +66,9 @@
"type" : "boolean",
"description" : "Specify whether aggregation and suggester names should be prefixed by their respective types in the response"
},
"pre_filter_shard_size" : {
"type" : "number",
"description" : "A threshold that enforces a pre-filter roundtrip to prefilter search shards based on query rewriting if the number of shards the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard bounds and the query are disjoint.",
"default" : 128
"pre_filter_shard_size":{
"type":"number",
"description" : "A threshold that enforces a pre-filter roundtrip to prefilter search shards based on query rewriting if the number of shards the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for instance a shard can not match any documents based on its rewrite method ie. if date filters are mandatory to match but the shard bounds and the query are disjoint."
},
"max_concurrent_shard_requests" : {
"type" : "number",

View File

@ -237,8 +237,7 @@
},
"pre_filter_shard_size":{
"type":"number",
"description":"A threshold that enforces a pre-filter roundtrip to prefilter search shards based on query rewriting if the number of shards the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard bounds and the query are disjoint.",
"default":128
"description":"A threshold that enforces a pre-filter roundtrip to prefilter search shards based on query rewriting if the number of shards the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for instance a shard can not match any documents based on its rewrite method ie. if date filters are mandatory to match but the shard bounds and the query are disjoint."
},
"rest_total_hits_as_int":{
"type":"boolean",

View File

@ -89,7 +89,7 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
private int maxConcurrentShardRequests = 0;
private int preFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE;
private Integer preFilterShardSize;
private String[] types = Strings.EMPTY_ARRAY;
@ -197,7 +197,11 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
requestCache = in.readOptionalBoolean();
batchedReduceSize = in.readVInt();
maxConcurrentShardRequests = in.readVInt();
preFilterShardSize = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
preFilterShardSize = in.readOptionalVInt();
} else {
preFilterShardSize = in.readVInt();
}
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
allowPartialSearchResults = in.readOptionalBoolean();
}
@ -234,7 +238,11 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
out.writeOptionalBoolean(requestCache);
out.writeVInt(batchedReduceSize);
out.writeVInt(maxConcurrentShardRequests);
out.writeVInt(preFilterShardSize);
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeOptionalVInt(preFilterShardSize);
} else {
out.writeVInt(preFilterShardSize == null ? DEFAULT_BATCHED_REDUCE_SIZE : preFilterShardSize);
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}
@ -573,8 +581,15 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
/**
* Sets a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for
* instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint. The default is {@code 128}
* instance a shard can not match any documents based on its rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint.
*
* When unspecified, the pre-filter phase is executed if any of these conditions is met:
* <ul>
* <li>The request targets more than 128 shards</li>
* <li>The request targets one or more read-only index</li>
* <li>The primary sort of the query targets an indexed field</li>
* </ul>
*/
public void setPreFilterShardSize(int preFilterShardSize) {
if (preFilterShardSize < 1) {
@ -585,11 +600,20 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
/**
* Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for
* instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint. The default is {@code 128}
* the search request expands to exceeds the threshold, or <code>null</code> if the threshold is unspecified.
* This filter roundtrip can limit the number of shards significantly if for
* instance a shard can not match any documents based on its rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint.
*
* When unspecified, the pre-filter phase is executed if any of these conditions is met:
* <ul>
* <li>The request targets more than 128 shards</li>
* <li>The request targets one or more read-only index</li>
* <li>The primary sort of the query targets an indexed field</li>
* </ul>
*/
public int getPreFilterShardSize() {
@Nullable
public Integer getPreFilterShardSize() {
return preFilterShardSize;
}

View File

@ -569,8 +569,15 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
/**
* Sets a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for
* instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint. The default is {@code 128}
* instance a shard can not match any documents based on its rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint.
*
* When unspecified, the pre-filter phase is executed if any of these conditions is met:
* <ul>
* <li>The request targets more than 128 shards</li>
* <li>The request targets one or more read-only index</li>
* <li>The primary sort of the query targets an indexed field</li>
* </ul>
*/
public SearchRequestBuilder setPreFilterShardSize(int preFilterShardSize) {
this.request.setPreFilterShardSize(preFilterShardSize);

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -55,7 +56,6 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
@ -83,6 +83,7 @@ import java.util.function.LongSupplier;
import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
import static org.elasticsearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
@ -512,7 +513,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
final DiscoveryNodes nodes = clusterState.nodes();
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
nodes::get, remoteConnections, searchTransportService::getConnection);
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, indices, shardIterators.size());
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
}
@ -539,12 +540,31 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
};
}
private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators) {
static boolean shouldPreFilterSearchShards(ClusterState clusterState,
SearchRequest searchRequest,
Index[] indices,
int numShards) {
SearchSourceBuilder source = searchRequest.source();
Integer preFilterShardSize = searchRequest.getPreFilterShardSize();
if (preFilterShardSize == null
&& (hasReadOnlyIndices(indices, clusterState) || hasPrimaryFieldSort(source))) {
preFilterShardSize = 1;
} else if (preFilterShardSize == null) {
preFilterShardSize = SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE;
}
return searchRequest.searchType() == QUERY_THEN_FETCH // we can't do this for DFS it needs to fan out to all shards all the time
&& (SearchService.canRewriteToMatchNone(source) || FieldSortBuilder.hasPrimaryFieldSort(source))
&& searchRequest.getPreFilterShardSize() < shardIterators.size();
&& (SearchService.canRewriteToMatchNone(source) || hasPrimaryFieldSort(source))
&& preFilterShardSize < numShards;
}
private static boolean hasReadOnlyIndices(Index[] indices, ClusterState clusterState) {
for (Index index : indices) {
ClusterBlockException writeBlock = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getName());
if (writeBlock != null) {
return true;
}
}
return false;
}
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,

View File

@ -113,7 +113,10 @@ public class RestMultiSearchAction extends BaseRestHandler {
multiRequest.maxConcurrentSearchRequests(restRequest.paramAsInt("max_concurrent_searches", 0));
}
int preFilterShardSize = restRequest.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE);
Integer preFilterShardSize = null;
if (restRequest.hasParam("pre_filter_shard_size")) {
preFilterShardSize = restRequest.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE);
}
final Integer maxConcurrentShardRequests;
if (restRequest.hasParam("max_concurrent_shard_requests")) {
@ -130,10 +133,11 @@ public class RestMultiSearchAction extends BaseRestHandler {
multiRequest.add(searchRequest);
});
List<SearchRequest> requests = multiRequest.requests();
preFilterShardSize = Math.max(1, preFilterShardSize / (requests.size()+1));
for (SearchRequest request : requests) {
// preserve if it's set on the request
request.setPreFilterShardSize(Math.min(preFilterShardSize, request.getPreFilterShardSize()));
if (preFilterShardSize != null && request.getPreFilterShardSize() == null) {
request.setPreFilterShardSize(preFilterShardSize);
}
if (maxConcurrentShardRequests != null) {
request.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
}

View File

@ -139,7 +139,9 @@ public class RestSearchAction extends BaseRestHandler {
final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
searchRequest.setBatchedReduceSize(batchedReduceSize);
searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", searchRequest.getPreFilterShardSize()));
if (request.hasParam("pre_filter_shard_size")) {
searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));
}
if (request.hasParam("max_concurrent_shard_requests")) {
// only set if we have the parameter since we auto adjust the max concurrency on the coordinator

View File

@ -29,6 +29,10 @@ import org.elasticsearch.action.OriginalIndicesTests;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.GroupShardsIteratorTests;
@ -44,6 +48,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
@ -58,6 +63,7 @@ import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -535,7 +541,7 @@ public class TransportSearchActionTests extends ESTestCase {
AtomicReference<SearchResponse> response = new AtomicReference<>();
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(response::set, e -> fail("no failures expected")), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider,
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider,
aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
@ -838,6 +844,100 @@ public class TransportSearchActionTests extends ESTestCase {
}
}
public void testShouldPreFilterSearchShards() {
int numIndices = randomIntBetween(1, 10);
Index[] indices = new Index[numIndices];
for (int i = 0; i < numIndices; i++) {
String indexName = randomAlphaOfLengthBetween(5, 10);
indices[i] = new Index(indexName, indexName + "-uuid");
}
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build();
{
SearchRequest searchRequest = new SearchRequest();
assertFalse(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(1, 127)));
assertFalse(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
}
{
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().query(QueryBuilders.rangeQuery("timestamp")));
assertFalse(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(1, 127)));
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
}
{
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort("timestamp")));
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(1, 127)));
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
}
{
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort("timestamp")))
.scroll("5m");
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(1, 127)));
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
}
}
public void testShouldPreFilterSearchShardsWithReadOnly() {
int numIndices = randomIntBetween(1, 10);
int numReadOnly = randomIntBetween(1, numIndices);
Index[] indices = new Index[numIndices];
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder();
for (int i = 0; i < numIndices; i++) {
String indexName = randomAlphaOfLengthBetween(5, 10);
indices[i] = new Index(indexName, indexName + "-uuid");
if (--numReadOnly >= 0) {
if (randomBoolean()) {
blocksBuilder.addIndexBlock(indexName, IndexMetaData.INDEX_WRITE_BLOCK);
} else {
blocksBuilder.addIndexBlock(indexName, IndexMetaData.INDEX_READ_ONLY_BLOCK);
}
}
}
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).blocks(blocksBuilder).build();
{
SearchRequest searchRequest = new SearchRequest();
assertFalse(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(1, 127)));
assertFalse(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
}
{
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().query(QueryBuilders.rangeQuery("timestamp")));
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(1, 127)));
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
}
{
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().query(QueryBuilders.rangeQuery("timestamp")));
searchRequest.scroll("5s");
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
assertTrue(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
}
{
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().query(QueryBuilders.rangeQuery("timestamp")));
searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH);
assertFalse(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
assertFalse(TransportSearchAction.shouldPreFilterSearchShards(clusterState, searchRequest,
indices, randomIntBetween(127, 10000)));
}
}
private InternalAggregation.ReduceContextBuilder aggReduceContextBuilder() {
return new InternalAggregation.ReduceContextBuilder() {
@Override