Search - add allow_partial_search_results flag with default setting false (#28440)

Adds allow_partial_search_results flag to search requests with default setting = true.
When false, will error if search either timeouts, has partial errors or has missing shards rather
than returning partial search results. A cluster-level setting provides a default for search requests with no flag.

Closes #27435
This commit is contained in:
markharwood 2018-01-31 15:51:29 +00:00 committed by GitHub
parent 4c154b70d3
commit 77d2dd203e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 392 additions and 37 deletions

View File

@ -145,7 +145,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
ext.bwc_tests_enabled = false
}
task verifyBwcTestsEnabled {

View File

@ -442,6 +442,9 @@ public final class Request {
if (searchRequest.requestCache() != null) {
params.putParam("request_cache", Boolean.toString(searchRequest.requestCache()));
}
if (searchRequest.allowPartialSearchResults() != null) {
params.putParam("allow_partial_search_results", Boolean.toString(searchRequest.allowPartialSearchResults()));
}
params.putParam("batched_reduce_size", Integer.toString(searchRequest.getBatchedReduceSize()));
if (searchRequest.scroll() != null) {
params.putParam("scroll", searchRequest.scroll().keepAlive());

View File

@ -844,6 +844,10 @@ public class RequestTests extends ESTestCase {
searchRequest.requestCache(randomBoolean());
expectedParams.put("request_cache", Boolean.toString(searchRequest.requestCache()));
}
if (randomBoolean()) {
searchRequest.allowPartialSearchResults(randomBoolean());
expectedParams.put("allow_partial_search_results", Boolean.toString(searchRequest.allowPartialSearchResults()));
}
if (randomBoolean()) {
searchRequest.setBatchedReduceSize(randomIntBetween(2, Integer.MAX_VALUE));
}

View File

@ -86,6 +86,12 @@ And here is a sample response:
aggregations and suggestions (no top hits returned).
See <<shard-request-cache>>.
`allow_partial_search_results`::
Set to `false` to return an overall failure if the request would produce partial
results. Defaults to true, which will allow partial results in the case of timeouts
or partial failures.
`terminate_after`::
The maximum number of documents to collect for each shard,
@ -103,9 +109,9 @@ And here is a sample response:
Out of the above, the `search_type` and the `request_cache` must be passed as
query-string parameters. The rest of the search request should be passed
within the body itself. The body content can also be passed as a REST
Out of the above, the `search_type`, `request_cache` and the `allow_partial_search_results`
settings must be passed as query-string parameters. The rest of the search request should
be passed within the body itself. The body content can also be passed as a REST
parameter named `source`.
Both HTTP GET and HTTP POST can be used to execute search with body. Since not

View File

@ -122,4 +122,8 @@ Defaults to no terminate_after.
Defaults to `query_then_fetch`. See
<<search-request-search-type,_Search Type_>> for
more details on the different types of search that can be performed.
|`allow_partial_search_results` |Set to `false` to return an overall failure if the request would produce
partial results. Defaults to true, which will allow partial results in the case of timeouts
or partial failures..
|=======================================================================

View File

@ -164,6 +164,9 @@ class BulkByScrollParallelizationHelper {
.requestCache(request.requestCache())
.scroll(request.scroll())
.indicesOptions(request.indicesOptions());
if (request.allowPartialSearchResults() != null) {
slices[slice].allowPartialSearchResults(request.allowPartialSearchResults());
}
}
return slices;
}

View File

@ -147,6 +147,11 @@
"type" : "boolean",
"description": "Indicate if the number of documents that match the query should be tracked"
},
"allow_partial_search_results": {
"type" : "boolean",
"default" : true,
"description": "Indicate if an error should be returned if there is a partial search failure or timeout"
},
"typed_keys": {
"type" : "boolean",
"description" : "Specify whether aggregation and suggester names should be prefixed by their respective types in the response"

View File

@ -131,13 +131,27 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
}
onPhaseFailure(currentPhase, "all shards failed", cause);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug((Supplier<?>) () -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()),
cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
}
executePhase(nextPhase);
}
executePhase(nextPhase);
}
}
@ -265,8 +279,16 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), buildShardFailures(), clusters);
skippedOps.get(), buildTookInMillis(), failures, clusters);
}
@Override

View File

@ -128,6 +128,9 @@ final class ExpandSearchPhase extends SearchPhase {
.preference(orig.preference())
.routing(orig.routing())
.searchType(orig.searchType());
if (orig.allowPartialSearchResults() != null){
groupRequest.allowPartialSearchResults(orig.allowPartialSearchResults());
}
if (orig.isMaxConcurrentShardRequestsSet()) {
groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests());
}

View File

@ -146,7 +146,27 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
// Fail-fast verification of all shards being available
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
}
if (missingShards.length() >0) {
//Status red - shard is missing all copies and would produce partial results for an index search
final String msg = "Search rejected due to missing shards ["+ missingShards +
"]. Consider using `allow_partial_search_results` setting to bypass this error.";
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}
for (int index = 0; index < maxConcurrentShardRequests; index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;

View File

@ -225,6 +225,8 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
searchRequest.preference(nodeStringValue(value, null));
} else if ("routing".equals(entry.getKey())) {
searchRequest.routing(nodeStringValue(value, null));
} else if ("allow_partial_search_results".equals(entry.getKey())) {
searchRequest.allowPartialSearchResults(nodeBooleanValue(value, null));
}
}
defaultOptions = IndicesOptions.fromMap(source, defaultOptions);
@ -296,6 +298,9 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
if (request.routing() != null) {
xContentBuilder.field("routing", request.routing());
}
if (request.allowPartialSearchResults() != null) {
xContentBuilder.field("allow_partial_search_results", request.allowPartialSearchResults());
}
xContentBuilder.endObject();
xContentBuilder.bytes().writeTo(output);
}

View File

@ -74,6 +74,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private Boolean requestCache;
private Boolean allowPartialSearchResults;
private Scroll scroll;
private int batchedReduceSize = 512;
@ -135,6 +138,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
maxConcurrentShardRequests = in.readVInt();
preFilterShardSize = in.readVInt();
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
allowPartialSearchResults = in.readOptionalBoolean();
}
}
@Override
@ -157,6 +163,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
out.writeVInt(maxConcurrentShardRequests);
out.writeVInt(preFilterShardSize);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}
}
@Override
@ -351,6 +360,20 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public Boolean requestCache() {
return this.requestCache;
}
/**
* Sets if this request should allow partial results. (If method is not called,
* will default to the cluster level setting).
*/
public SearchRequest allowPartialSearchResults(boolean allowPartialSearchResults) {
this.allowPartialSearchResults = allowPartialSearchResults;
return this;
}
public Boolean allowPartialSearchResults() {
return this.allowPartialSearchResults;
}
/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
@ -478,13 +501,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
Objects.equals(batchedReduceSize, that.batchedReduceSize) &&
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
Objects.equals(indicesOptions, that.indicesOptions);
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults);
}
@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize);
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults);
}
@Override
@ -501,6 +526,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
", maxConcurrentShardRequests=" + maxConcurrentShardRequests +
", batchedReduceSize=" + batchedReduceSize +
", preFilterShardSize=" + preFilterShardSize +
", allowPartialSearchResults=" + allowPartialSearchResults +
", source=" + source + '}';
}
}

View File

@ -488,6 +488,16 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
request.requestCache(requestCache);
return this;
}
/**
* Sets if this request should allow partial results. (If method is not called,
* will default to the cluster level setting).
*/
public SearchRequestBuilder setAllowPartialSearchResults(boolean allowPartialSearchResults) {
request.allowPartialSearchResults(allowPartialSearchResults);
return this;
}
/**
* Should the query be profiled. Defaults to <code>false</code>

View File

@ -316,6 +316,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_THEN_FETCH);
}
if (searchRequest.allowPartialSearchResults() == null) {
// No user preference defined in search request - apply cluster service default
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
}
if (searchRequest.isSuggestOnly()) {
// disable request cache if we have only suggest
searchRequest.requestCache(false);

View File

@ -264,6 +264,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
HierarchyCircuitBreakerService.ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING,
ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,

View File

@ -124,6 +124,11 @@ public class RestSearchAction extends BaseRestHandler {
searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
}
if (request.hasParam("allow_partial_search_results")) {
// only set if we have the parameter passed to override the cluster-level default
searchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));
}
// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
// from the REST layer. these modes are an internal optimization and should
// not be specified explicitly by the user.

View File

@ -132,6 +132,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final TimeValue NO_TIMEOUT = timeValueMillis(-1);
public static final Setting<TimeValue> DEFAULT_SEARCH_TIMEOUT_SETTING =
Setting.timeSetting("search.default_search_timeout", NO_TIMEOUT, Property.Dynamic, Property.NodeScope);
public static final Setting<Boolean> DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS =
Setting.boolSetting("search.default_allow_partial_results", true, Property.Dynamic, Property.NodeScope);
private final ThreadPool threadPool;
@ -158,6 +160,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private volatile TimeValue defaultSearchTimeout;
private volatile boolean defaultAllowPartialSearchResults;
private volatile boolean lowLevelCancellation;
private final Cancellable keepAliveReaper;
@ -193,6 +197,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout);
defaultAllowPartialSearchResults = DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
this::setDefaultAllowPartialSearchResults);
lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
}
@ -215,6 +224,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
this.defaultSearchTimeout = defaultSearchTimeout;
}
private void setDefaultAllowPartialSearchResults(boolean defaultAllowPartialSearchResults) {
this.defaultAllowPartialSearchResults = defaultAllowPartialSearchResults;
}
public boolean defaultAllowPartialSearchResults() {
return defaultAllowPartialSearchResults;
}
private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}

View File

@ -73,6 +73,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
private SearchSourceBuilder source;
private Boolean requestCache;
private long nowInMillis;
private boolean allowPartialSearchResults;
private boolean profile;
@ -82,7 +83,11 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
this(shardId, numberOfShards, searchRequest.searchType(),
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost);
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
searchRequest.allowPartialSearchResults());
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
assert searchRequest.allowPartialSearchResults() != null;
this.scroll = searchRequest.scroll();
this.nowInMillis = nowInMillis;
this.clusterAlias = clusterAlias;
@ -97,7 +102,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
}
public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types,
Boolean requestCache, AliasFilter aliasFilter, float indexBoost) {
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) {
this.shardId = shardId;
this.numberOfShards = numberOfShards;
this.searchType = searchType;
@ -106,6 +111,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
this.requestCache = requestCache;
this.aliasFilter = aliasFilter;
this.indexBoost = indexBoost;
this.allowPartialSearchResults = allowPartialSearchResults;
}
@ -163,6 +169,12 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
public Boolean requestCache() {
return requestCache;
}
@Override
public Boolean allowPartialSearchResults() {
return allowPartialSearchResults;
}
@Override
public Scroll scroll() {
@ -210,6 +222,9 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
clusterAlias = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
allowPartialSearchResults = in.readOptionalBoolean();
}
}
protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException {
@ -232,6 +247,10 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
if (out.getVersion().onOrAfter(Version.V_5_6_0)) {
out.writeOptionalString(clusterAlias);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}
}
@Override

View File

@ -68,6 +68,8 @@ public interface ShardSearchRequest {
long nowInMillis();
Boolean requestCache();
Boolean allowPartialSearchResults();
Scroll scroll();

View File

@ -151,6 +151,11 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
public Boolean requestCache() {
return shardSearchLocalRequest.requestCache();
}
@Override
public Boolean allowPartialSearchResults() {
return shardSearchLocalRequest.allowPartialSearchResults();
}
@Override
public Scroll scroll() {

View File

@ -265,6 +265,11 @@ public class QueryPhase implements SearchPhase {
searcher.search(query, queryCollector);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
}
queryResult.searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);

View File

@ -61,6 +61,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
}
final SearchRequest request = new SearchRequest();
request.allowPartialSearchResults(true);
return new AbstractSearchAsyncAction<SearchPhaseResult>("test", null, null, null,
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null,
request, null, new GroupShardsIterator<>(Collections.singletonList(

View File

@ -76,12 +76,15 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
GroupShardsIterator<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter("idx",
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
2, randomBoolean(), primaryNode, replicaNode);
final SearchRequest searchRequest = new SearchRequest();
searchRequest.allowPartialSearchResults(true);
CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
new SearchRequest(), null, shardsIter, timeProvider, 0, null,
searchRequest, null, shardsIter, timeProvider, 0, null,
(iter) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
@ -153,12 +156,16 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
GroupShardsIterator<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter("idx",
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
2, randomBoolean(), primaryNode, replicaNode);
final SearchRequest searchRequest = new SearchRequest();
searchRequest.allowPartialSearchResults(true);
CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
new SearchRequest(), null, shardsIter, timeProvider, 0, null,
searchRequest, null, shardsIter, timeProvider, 0, null,
(iter) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
@ -207,6 +214,8 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
final GroupShardsIterator<SearchShardIterator> shardsIter =
SearchAsyncActionTests.getShardsIter("idx", originalIndices, 4096, randomBoolean(), primaryNode, replicaNode);
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors()));
final SearchRequest searchRequest = new SearchRequest();
searchRequest.allowPartialSearchResults(true);
final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(
logger,
searchTransportService,
@ -214,13 +223,14 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Collections.emptyMap(),
EsExecutors.newDirectExecutorService(),
new SearchRequest(),
searchRequest,
null,
shardsIter,
timeProvider,
0,
null,
(iter) -> new InitialSearchPhase<SearchPhaseResult>("test", null, iter, logger, randomIntBetween(1, 32), executor) {
(iter) -> new InitialSearchPhase<SearchPhaseResult>("test", searchRequest,
iter, logger, randomIntBetween(1, 32), executor) {
@Override
void onPhaseDone() {
latch.countDown();

View File

@ -270,6 +270,10 @@ public class MultiSearchRequestTests extends ESTestCase {
for (int j = 0; j < numSearchRequest; j++) {
SearchRequest searchRequest = createSimpleSearchRequest();
if (randomBoolean()) {
searchRequest.allowPartialSearchResults(true);
}
// scroll is not supported in the current msearch api, so unset it:
searchRequest.scroll((Scroll) null);

View File

@ -59,6 +59,7 @@ public class SearchAsyncActionTests extends ESTestCase {
public void testSkipSearchShards() throws InterruptedException {
SearchRequest request = new SearchRequest();
request.allowPartialSearchResults(true);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<TestSearchResponse> response = new AtomicReference<>();
ActionListener<SearchResponse> responseListener = new ActionListener<SearchResponse>() {
@ -154,6 +155,7 @@ public class SearchAsyncActionTests extends ESTestCase {
public void testLimitConcurrentShardRequests() throws InterruptedException {
SearchRequest request = new SearchRequest();
request.allowPartialSearchResults(true);
int numConcurrent = randomIntBetween(1, 5);
request.setMaxConcurrentShardRequests(numConcurrent);
CountDownLatch latch = new CountDownLatch(1);
@ -253,6 +255,7 @@ public class SearchAsyncActionTests extends ESTestCase {
public void testFanOutAndCollect() throws InterruptedException {
SearchRequest request = new SearchRequest();
request.allowPartialSearchResults(true);
request.setMaxConcurrentShardRequests(randomIntBetween(1, 100));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<TestSearchResponse> response = new AtomicReference<>();

View File

@ -198,13 +198,13 @@ public class NoMasterNodeIT extends ESIntegTestCase {
GetResponse getResponse = client().prepareGet("test1", "type1", "1").get();
assertExists(getResponse);
SearchResponse countResponse = client().prepareSearch("test1").setSize(0).get();
SearchResponse countResponse = client().prepareSearch("test1").setAllowPartialSearchResults(true).setSize(0).get();
assertHitCount(countResponse, 1L);
SearchResponse searchResponse = client().prepareSearch("test1").get();
SearchResponse searchResponse = client().prepareSearch("test1").setAllowPartialSearchResults(true).get();
assertHitCount(searchResponse, 1L);
countResponse = client().prepareSearch("test2").setSize(0).get();
countResponse = client().prepareSearch("test2").setAllowPartialSearchResults(true).setSize(0).get();
assertThat(countResponse.getTotalShards(), equalTo(2));
assertThat(countResponse.getSuccessfulShards(), equalTo(1));

View File

@ -112,6 +112,11 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase {
return null;
}
@Override
public Boolean allowPartialSearchResults() {
return null;
}
@Override
public Scroll scroll() {
return null;

View File

@ -157,6 +157,7 @@ public class SearchRequestTests extends AbstractSearchTestCase {
result.preference(searchRequest.preference());
result.routing(searchRequest.routing());
result.requestCache(searchRequest.requestCache());
result.allowPartialSearchResults(searchRequest.allowPartialSearchResults());
result.scroll(searchRequest.scroll());
if (searchRequest.source() != null) {
result.source(searchRequest.source());

View File

@ -212,7 +212,8 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
try {
SearchPhaseResult searchPhaseResult = service.executeQueryPhase(
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f),
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f,
true),
new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
IntArrayList intCursors = new IntArrayList(1);
intCursors.add(0);
@ -248,7 +249,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
new String[0],
false,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f)
1.0f, true)
);
try {
// the search context should inherit the default timeout
@ -268,7 +269,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
new String[0],
false,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f)
1.0f, true)
);
try {
// the search context should inherit the query timeout
@ -296,12 +297,12 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
searchSourceBuilder.docValueField("field" + i);
}
try (SearchContext context = service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f))) {
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true))) {
assertNotNull(context);
searchSourceBuilder.docValueField("one_field_too_much");
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f)));
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true)));
assertEquals(
"Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. "
+ "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.",
@ -327,13 +328,13 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
}
try (SearchContext context = service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f))) {
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true))) {
assertNotNull(context);
searchSourceBuilder.scriptField("anotherScriptField",
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f)));
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true)));
assertEquals(
"Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was ["
+ (maxScriptFields + 1)
@ -403,28 +404,30 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
final IndexShard indexShard = indexService.getShard(0);
final boolean allowPartialSearchResults = true;
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, null,
Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder(), Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
new SearchSourceBuilder(), Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f,
allowPartialSearchResults)));
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchAllQueryBuilder()), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
.aggregation(new TermsAggregationBuilder("test", ValueType.STRING).minDocCount(0)), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
.aggregation(new GlobalAggregationBuilder("test")), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
assertFalse(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchNoneQueryBuilder()), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
}

View File

@ -19,13 +19,17 @@
package org.elasticsearch.search;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Collection;
@ -58,9 +62,23 @@ public class SearchTimeoutIT extends ESIntegTestCase {
SearchResponse searchResponse = client().prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS))
.setQuery(scriptQuery(
new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(true)
.execute().actionGet();
assertThat(searchResponse.isTimedOut(), equalTo(true));
}
public void testPartialResultsIntolerantTimeout() throws Exception {
client().prepareIndex("test", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
ElasticsearchException ex = expectThrows(ElasticsearchException.class, () ->
client().prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS))
.setQuery(scriptQuery(
new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(false) // this line causes timeouts to report failures
.execute().actionGet()
);
assertTrue(ex.toString().contains("Time exceeded"));
}
public static class ScriptedTimeoutPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

View File

@ -0,0 +1,139 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.basic;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class SearchRedStateIndexIT extends ESIntegTestCase {
public void testAllowPartialsWithRedState() throws Exception {
final int numShards = cluster().numDataNodes()+2;
buildRedIndex(numShards);
SearchResponse searchResponse = client().prepareSearch().setSize(0).setAllowPartialSearchResults(true)
.execute().actionGet();
assertThat(RestStatus.OK, equalTo(searchResponse.status()));
assertThat("Expect no shards failed", searchResponse.getFailedShards(), equalTo(0));
assertThat("Expect no shards skipped", searchResponse.getSkippedShards(), equalTo(0));
assertThat("Expect subset of shards successful", searchResponse.getSuccessfulShards(), lessThan(numShards));
assertThat("Expected total shards", searchResponse.getTotalShards(), equalTo(numShards));
}
public void testClusterAllowPartialsWithRedState() throws Exception {
final int numShards = cluster().numDataNodes()+2;
buildRedIndex(numShards);
setClusterDefaultAllowPartialResults(true);
SearchResponse searchResponse = client().prepareSearch().setSize(0).execute().actionGet();
assertThat(RestStatus.OK, equalTo(searchResponse.status()));
assertThat("Expect no shards failed", searchResponse.getFailedShards(), equalTo(0));
assertThat("Expect no shards skipped", searchResponse.getSkippedShards(), equalTo(0));
assertThat("Expect subset of shards successful", searchResponse.getSuccessfulShards(), lessThan(numShards));
assertThat("Expected total shards", searchResponse.getTotalShards(), equalTo(numShards));
}
public void testDisallowPartialsWithRedState() throws Exception {
buildRedIndex(cluster().numDataNodes()+2);
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class,
() ->
client().prepareSearch().setSize(0).setAllowPartialSearchResults(false).execute().actionGet()
);
assertThat(ex.getDetailedMessage(), containsString("Search rejected due to missing shard"));
}
public void testClusterDisallowPartialsWithRedState() throws Exception {
buildRedIndex(cluster().numDataNodes()+2);
setClusterDefaultAllowPartialResults(false);
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class,
() ->
client().prepareSearch().setSize(0).execute().actionGet()
);
assertThat(ex.getDetailedMessage(), containsString("Search rejected due to missing shard"));
}
private void setClusterDefaultAllowPartialResults(boolean allowPartialResults) {
String key = SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.getKey();
Settings transientSettings = Settings.builder().put(key, allowPartialResults).build();
ClusterUpdateSettingsResponse response1 = client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(transientSettings)
.execute()
.actionGet();
assertAcked(response1);
assertEquals(response1.getTransientSettings().getAsBoolean(key, null), allowPartialResults);
}
private void buildRedIndex(int numShards) throws Exception {
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards",
numShards).put("index.number_of_replicas", 0)));
ensureGreen();
for (int i = 0; i < 10; i++) {
client().prepareIndex("test", "type1", ""+i).setSource("field1", "value1").execute().actionGet();
}
refresh();
internalCluster().stopRandomDataNode();
client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).execute().actionGet();
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
List<ShardRouting> unassigneds = clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED);
assertThat(unassigneds.size(), greaterThan(0));
});
}
@After
public void cleanup() throws Exception {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.getKey())));
}
}

View File

@ -278,7 +278,8 @@ public class SimpleQueryStringIT extends ESIntegTestCase {
client().prepareIndex("test2", "type1", "10").setSource("field", 5));
refresh();
SearchResponse searchResponse = client().prepareSearch().setQuery(simpleQueryStringQuery("foo").field("field")).get();
SearchResponse searchResponse = client().prepareSearch().setAllowPartialSearchResults(true)
.setQuery(simpleQueryStringQuery("foo").field("field")).get();
assertFailures(searchResponse);
assertHitCount(searchResponse, 1L);
assertSearchHits(searchResponse, "1");

View File

@ -82,6 +82,7 @@ public class RandomSearchRequestGenerator {
*/
public static SearchRequest randomSearchRequest(Supplier<SearchSourceBuilder> randomSearchSourceBuilder) throws IOException {
SearchRequest searchRequest = new SearchRequest();
searchRequest.allowPartialSearchResults(true);
if (randomBoolean()) {
searchRequest.indices(generateRandomStringArray(10, 10, false, false));
}