Currently we set the defaults for ccsMinimizeRoundtrips, preFilterShardSize and requestCache on the HLRC SubmitAsyncSearchRequest in the constructor. This is no longer needed since we now only send the parameters along with the rest request that are supported (omitting e.g. ccsMinimizeRoundtrips) and the correct defaults are set on the client side. This change removes setting and sending these defaults where possible, leaving only the overwrite of batchedReduceSize with a default value of 5, since the default used in the vanilla SearchRequest is 512. However, we don't need to send this value along as a request parameter if its the default since the correct one will be set on the receiving end if no value is specified. Also adding tests for RestSubmitAsyncSearchAction that check the correct defaults are set when parameters are missing on the server side. Backport of #54200
This commit is contained in:
parent
b6a8de0d89
commit
da404bbce2
|
@ -73,7 +73,9 @@ final class AsyncSearchRequestConverters {
|
|||
if (request.getAllowPartialSearchResults() != null) {
|
||||
params.withAllowPartialResults(request.getAllowPartialSearchResults());
|
||||
}
|
||||
params.withBatchedReduceSize(request.getBatchedReduceSize());
|
||||
if (request.getBatchedReduceSize() != null) {
|
||||
params.withBatchedReduceSize(request.getBatchedReduceSize());
|
||||
}
|
||||
}
|
||||
|
||||
static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) {
|
||||
|
|
|
@ -36,10 +36,7 @@ import java.util.Optional;
|
|||
*/
|
||||
public class SubmitAsyncSearchRequest implements Validatable {
|
||||
|
||||
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 1;
|
||||
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5;
|
||||
private static final boolean DEFAULT_CCS_MINIMIZE_ROUNDTRIPS = false;
|
||||
private static final boolean DEFAULT_REQUEST_CACHE_VALUE = true;
|
||||
|
||||
public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();
|
||||
|
||||
|
@ -47,16 +44,14 @@ public class SubmitAsyncSearchRequest implements Validatable {
|
|||
private Boolean keepOnCompletion;
|
||||
private TimeValue keepAlive;
|
||||
private final SearchRequest searchRequest;
|
||||
// The following is optional and will only be sent down with the request if explicitely set by the user
|
||||
private Integer batchedReduceSize;
|
||||
|
||||
/**
|
||||
* Creates a new request
|
||||
*/
|
||||
public SubmitAsyncSearchRequest(SearchSourceBuilder source, String... indices) {
|
||||
this.searchRequest = new SearchRequest(indices, source);
|
||||
searchRequest.setCcsMinimizeRoundtrips(DEFAULT_CCS_MINIMIZE_ROUNDTRIPS);
|
||||
searchRequest.setPreFilterShardSize(DEFAULT_PRE_FILTER_SHARD_SIZE);
|
||||
searchRequest.setBatchedReduceSize(DEFAULT_BATCHED_REDUCE_SIZE);
|
||||
searchRequest.requestCache(DEFAULT_REQUEST_CACHE_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -192,33 +187,34 @@ public class SubmitAsyncSearchRequest implements Validatable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
|
||||
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
|
||||
* Optional. Sets the number of shard results that should be reduced at once on the coordinating node.
|
||||
* This value should be used as a protection mechanism to reduce the memory overhead per search
|
||||
* request if the potential number of shards in the request can be large. Defaults to 5.
|
||||
*/
|
||||
public void setBatchedReduceSize(int batchedReduceSize) {
|
||||
this.searchRequest.setBatchedReduceSize(batchedReduceSize);
|
||||
this.batchedReduceSize = batchedReduceSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of shard results that should be reduced at once on the coordinating node.
|
||||
* This defaults to 5 for {@link SubmitAsyncSearchRequest}.
|
||||
* Returns {@code null} if unset.
|
||||
*/
|
||||
public int getBatchedReduceSize() {
|
||||
return this.searchRequest.getBatchedReduceSize();
|
||||
public Integer getBatchedReduceSize() {
|
||||
return this.batchedReduceSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets if this request should use the request cache or not, assuming that it can (for
|
||||
* example, if "now" is used, it will never be cached). By default (not set, or null,
|
||||
* will default to the index level setting if request cache is enabled or not).
|
||||
* example, if "now" is used, it will never be cached).
|
||||
* By default (if not set) this is turned on for {@link SubmitAsyncSearchRequest}.
|
||||
*/
|
||||
public void setRequestCache(Boolean requestCache) {
|
||||
this.searchRequest.requestCache(requestCache);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets if this request should use the request cache or not.
|
||||
* Defaults to `true` for {@link SubmitAsyncSearchRequest}.
|
||||
* Gets if this request should use the request cache or not, if set.
|
||||
* This defaults to `true` on the server side if unset in the client.
|
||||
*/
|
||||
public Boolean getRequestCache() {
|
||||
return this.searchRequest.requestCache();
|
||||
|
|
|
@ -50,8 +50,6 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
|
|||
|
||||
// the following parameters might be overwritten by random ones later,
|
||||
// but we need to set these since they are the default we send over http
|
||||
expectedParams.put("request_cache", "true");
|
||||
expectedParams.put("batched_reduce_size", "5");
|
||||
setRandomSearchParams(submitRequest, expectedParams);
|
||||
setRandomIndicesOptions(submitRequest::setIndicesOptions, submitRequest::getIndicesOptions, expectedParams);
|
||||
|
||||
|
@ -108,8 +106,8 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
|
|||
}
|
||||
if (randomBoolean()) {
|
||||
request.setBatchedReduceSize(randomIntBetween(2, Integer.MAX_VALUE));
|
||||
expectedParams.put("batched_reduce_size", Integer.toString(request.getBatchedReduceSize()));
|
||||
}
|
||||
expectedParams.put("batched_reduce_size", Integer.toString(request.getBatchedReduceSize()));
|
||||
if (randomBoolean()) {
|
||||
request.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE));
|
||||
}
|
||||
|
|
|
@ -167,7 +167,7 @@ public class RestSearchAction extends BaseRestHandler {
|
|||
searchRequest.searchType(searchType);
|
||||
}
|
||||
parseSearchSource(searchRequest.source(), request, setSize);
|
||||
searchRequest.requestCache(request.paramAsBoolean("request_cache", null));
|
||||
searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache()));
|
||||
|
||||
String scroll = request.param("scroll");
|
||||
if (scroll != null) {
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.search;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.rest.FakeRestChannel;
|
||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||
import org.elasticsearch.usage.UsageService;
|
||||
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class RestSubmitAsyncSearchActionTests extends ESTestCase {
|
||||
|
||||
private RestSubmitAsyncSearchAction action;
|
||||
private ActionRequest lastCapturedRequest;
|
||||
private RestController controller;
|
||||
private NodeClient nodeClient;
|
||||
|
||||
@Before
|
||||
public void setUpController() {
|
||||
nodeClient = new NodeClient(Settings.EMPTY, null) {
|
||||
|
||||
@Override
|
||||
public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,
|
||||
Request request, ActionListener<Response> listener) {
|
||||
lastCapturedRequest = request;
|
||||
return new Task(1L, "type", "action", "description", null, null);
|
||||
}
|
||||
};
|
||||
nodeClient.initialize(new HashMap<>(), () -> "local", null);
|
||||
controller = new RestController(Collections.emptySet(), null,
|
||||
nodeClient,
|
||||
new NoneCircuitBreakerService(),
|
||||
new UsageService());
|
||||
action = new RestSubmitAsyncSearchAction();
|
||||
controller.registerHandler(action);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the appropriate defaults are set on the {@link SubmitAsyncSearchRequest} if
|
||||
* no parameters are specified on the rest request itself.
|
||||
*/
|
||||
public void testRequestParameterDefaults() throws IOException {
|
||||
RestRequest submitAsyncRestRequest = new FakeRestRequest.Builder(xContentRegistry())
|
||||
.withMethod(RestRequest.Method.POST)
|
||||
.withPath("/test_index/_async_search")
|
||||
.withContent(new BytesArray("{}"), XContentType.JSON)
|
||||
.build();
|
||||
dispatchRequest(submitAsyncRestRequest);
|
||||
SubmitAsyncSearchRequest submitRequest = (SubmitAsyncSearchRequest) lastCapturedRequest;
|
||||
assertEquals(TimeValue.timeValueSeconds(1), submitRequest.getWaitForCompletionTimeout());
|
||||
assertEquals(false, submitRequest.isKeepOnCompletion());
|
||||
assertEquals(TimeValue.timeValueDays(5), submitRequest.getKeepAlive());
|
||||
// check parameters we implicitly set in the SubmitAsyncSearchRequest ctor
|
||||
assertEquals(false, submitRequest.getSearchRequest().isCcsMinimizeRoundtrips());
|
||||
assertEquals(5, submitRequest.getSearchRequest().getBatchedReduceSize());
|
||||
assertEquals(true, submitRequest.getSearchRequest().requestCache());
|
||||
assertEquals(1, submitRequest.getSearchRequest().getPreFilterShardSize().intValue());
|
||||
}
|
||||
|
||||
public void testParameters() throws IOException {
|
||||
String tvString = randomTimeValue(1, 100);
|
||||
doTestParameter("keep_alive", tvString, TimeValue.parseTimeValue(tvString, ""), SubmitAsyncSearchRequest::getKeepAlive);
|
||||
doTestParameter("wait_for_completion_timeout", tvString, TimeValue.parseTimeValue(tvString, ""),
|
||||
SubmitAsyncSearchRequest::getWaitForCompletionTimeout);
|
||||
boolean keepOnCompletion = randomBoolean();
|
||||
doTestParameter("keep_on_completion", Boolean.toString(keepOnCompletion), keepOnCompletion,
|
||||
SubmitAsyncSearchRequest::isKeepOnCompletion);
|
||||
boolean requestCache = randomBoolean();
|
||||
doTestParameter("request_cache", Boolean.toString(requestCache), requestCache,
|
||||
r -> r.getSearchRequest().requestCache());
|
||||
Integer batchReduceSize = randomIntBetween(2, 50);
|
||||
doTestParameter("batched_reduce_size", Integer.toString(batchReduceSize), batchReduceSize,
|
||||
r -> r.getSearchRequest().getBatchedReduceSize());
|
||||
}
|
||||
|
||||
private <T> void doTestParameter(String paramName, String paramValue, T expectedValue,
|
||||
Function<SubmitAsyncSearchRequest, T> valueAccessor) {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put(paramName, paramValue);
|
||||
RestRequest submitAsyncRestRequest = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
|
||||
.withPath("/test_index/_async_search")
|
||||
.withParams(params)
|
||||
.withContent(new BytesArray("{}"), XContentType.JSON).build();
|
||||
dispatchRequest(submitAsyncRestRequest);
|
||||
SubmitAsyncSearchRequest submitRequest = (SubmitAsyncSearchRequest) lastCapturedRequest;
|
||||
assertEquals(expectedValue, valueAccessor.apply(submitRequest));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the given request to the test controller
|
||||
*/
|
||||
protected void dispatchRequest(RestRequest request) {
|
||||
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
|
||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
controller.dispatchRequest(request, channel, threadContext);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue