From da404bbce20c34020a6c0a2e1891ccb83f7f6e6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Thu, 26 Mar 2020 19:01:17 +0100 Subject: [PATCH] HLRC: Don't send defaults for SubmitAsyncSearchRequest (#54200) (#54266) Currently we set the defaults for ccsMinimizeRoundtrips, preFilterShardSize and requestCache on the HLRC SubmitAsyncSearchRequest in the constructor. This is no longer needed since we now only send the parameters along with the rest request that are supported (omitting e.g. ccsMinimizeRoundtrips) and the correct defaults are set on the client side. This change removes setting and sending these defaults where possible, leaving only the overwrite of batchedReduceSize with a default value of 5, since the default used in the vanilla SearchRequest is 512. However, we don't need to send this value along as a request parameter if its the default since the correct one will be set on the receiving end if no value is specified. Also adding tests for RestSubmitAsyncSearchAction that check the correct defaults are set when parameters are missing on the server side. Backport of #54200 --- .../client/AsyncSearchRequestConverters.java | 4 +- .../asyncsearch/SubmitAsyncSearchRequest.java | 30 ++--- .../AsyncSearchRequestConvertersTests.java | 4 +- .../rest/action/search/RestSearchAction.java | 2 +- .../RestSubmitAsyncSearchActionTests.java | 121 ++++++++++++++++++ 5 files changed, 139 insertions(+), 22 deletions(-) create mode 100644 x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchActionTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java index c5e70815752..fcc736cc28a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/AsyncSearchRequestConverters.java @@ -73,7 +73,9 @@ final class AsyncSearchRequestConverters { if (request.getAllowPartialSearchResults() != null) { params.withAllowPartialResults(request.getAllowPartialSearchResults()); } - params.withBatchedReduceSize(request.getBatchedReduceSize()); + if (request.getBatchedReduceSize() != null) { + params.withBatchedReduceSize(request.getBatchedReduceSize()); + } } static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java index 65892fb562d..eba121d7cc0 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java @@ -36,10 +36,7 @@ import java.util.Optional; */ public class SubmitAsyncSearchRequest implements Validatable { - public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 1; public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5; - private static final boolean DEFAULT_CCS_MINIMIZE_ROUNDTRIPS = false; - private static final boolean DEFAULT_REQUEST_CACHE_VALUE = true; public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis(); @@ -47,16 +44,14 @@ public class SubmitAsyncSearchRequest implements Validatable { private Boolean keepOnCompletion; private TimeValue keepAlive; private final SearchRequest searchRequest; + // The following is optional and will only be sent down with the request if explicitely set by the user + private Integer batchedReduceSize; /** * Creates a new request */ public SubmitAsyncSearchRequest(SearchSourceBuilder source, String... indices) { this.searchRequest = new SearchRequest(indices, source); - searchRequest.setCcsMinimizeRoundtrips(DEFAULT_CCS_MINIMIZE_ROUNDTRIPS); - searchRequest.setPreFilterShardSize(DEFAULT_PRE_FILTER_SHARD_SIZE); - searchRequest.setBatchedReduceSize(DEFAULT_BATCHED_REDUCE_SIZE); - searchRequest.requestCache(DEFAULT_REQUEST_CACHE_VALUE); } /** @@ -192,33 +187,34 @@ public class SubmitAsyncSearchRequest implements Validatable { } /** - * Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection - * mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large. + * Optional. Sets the number of shard results that should be reduced at once on the coordinating node. + * This value should be used as a protection mechanism to reduce the memory overhead per search + * request if the potential number of shards in the request can be large. Defaults to 5. */ public void setBatchedReduceSize(int batchedReduceSize) { - this.searchRequest.setBatchedReduceSize(batchedReduceSize); + this.batchedReduceSize = batchedReduceSize; } /** * Gets the number of shard results that should be reduced at once on the coordinating node. - * This defaults to 5 for {@link SubmitAsyncSearchRequest}. + * Returns {@code null} if unset. */ - public int getBatchedReduceSize() { - return this.searchRequest.getBatchedReduceSize(); + public Integer getBatchedReduceSize() { + return this.batchedReduceSize; } /** * Sets if this request should use the request cache or not, assuming that it can (for - * example, if "now" is used, it will never be cached). By default (not set, or null, - * will default to the index level setting if request cache is enabled or not). + * example, if "now" is used, it will never be cached). + * By default (if not set) this is turned on for {@link SubmitAsyncSearchRequest}. */ public void setRequestCache(Boolean requestCache) { this.searchRequest.requestCache(requestCache); } /** - * Gets if this request should use the request cache or not. - * Defaults to `true` for {@link SubmitAsyncSearchRequest}. + * Gets if this request should use the request cache or not, if set. + * This defaults to `true` on the server side if unset in the client. */ public Boolean getRequestCache() { return this.searchRequest.requestCache(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java index 9a863c0ef66..3f7fd7bf07c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/AsyncSearchRequestConvertersTests.java @@ -50,8 +50,6 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase { // the following parameters might be overwritten by random ones later, // but we need to set these since they are the default we send over http - expectedParams.put("request_cache", "true"); - expectedParams.put("batched_reduce_size", "5"); setRandomSearchParams(submitRequest, expectedParams); setRandomIndicesOptions(submitRequest::setIndicesOptions, submitRequest::getIndicesOptions, expectedParams); @@ -108,8 +106,8 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase { } if (randomBoolean()) { request.setBatchedReduceSize(randomIntBetween(2, Integer.MAX_VALUE)); + expectedParams.put("batched_reduce_size", Integer.toString(request.getBatchedReduceSize())); } - expectedParams.put("batched_reduce_size", Integer.toString(request.getBatchedReduceSize())); if (randomBoolean()) { request.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE)); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 85129cd3425..c81c9dcb551 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -167,7 +167,7 @@ public class RestSearchAction extends BaseRestHandler { searchRequest.searchType(searchType); } parseSearchSource(searchRequest.source(), request, setSize); - searchRequest.requestCache(request.paramAsBoolean("request_cache", null)); + searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache())); String scroll = request.param("scroll"); if (scroll != null) { diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchActionTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchActionTests.java new file mode 100644 index 00000000000..748c6d12acc --- /dev/null +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchActionTests.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestChannel; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.usage.UsageService; +import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +public class RestSubmitAsyncSearchActionTests extends ESTestCase { + + private RestSubmitAsyncSearchAction action; + private ActionRequest lastCapturedRequest; + private RestController controller; + private NodeClient nodeClient; + + @Before + public void setUpController() { + nodeClient = new NodeClient(Settings.EMPTY, null) { + + @Override + public Task executeLocally(ActionType action, + Request request, ActionListener listener) { + lastCapturedRequest = request; + return new Task(1L, "type", "action", "description", null, null); + } + }; + nodeClient.initialize(new HashMap<>(), () -> "local", null); + controller = new RestController(Collections.emptySet(), null, + nodeClient, + new NoneCircuitBreakerService(), + new UsageService()); + action = new RestSubmitAsyncSearchAction(); + controller.registerHandler(action); + } + + /** + * Check that the appropriate defaults are set on the {@link SubmitAsyncSearchRequest} if + * no parameters are specified on the rest request itself. + */ + public void testRequestParameterDefaults() throws IOException { + RestRequest submitAsyncRestRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.POST) + .withPath("/test_index/_async_search") + .withContent(new BytesArray("{}"), XContentType.JSON) + .build(); + dispatchRequest(submitAsyncRestRequest); + SubmitAsyncSearchRequest submitRequest = (SubmitAsyncSearchRequest) lastCapturedRequest; + assertEquals(TimeValue.timeValueSeconds(1), submitRequest.getWaitForCompletionTimeout()); + assertEquals(false, submitRequest.isKeepOnCompletion()); + assertEquals(TimeValue.timeValueDays(5), submitRequest.getKeepAlive()); + // check parameters we implicitly set in the SubmitAsyncSearchRequest ctor + assertEquals(false, submitRequest.getSearchRequest().isCcsMinimizeRoundtrips()); + assertEquals(5, submitRequest.getSearchRequest().getBatchedReduceSize()); + assertEquals(true, submitRequest.getSearchRequest().requestCache()); + assertEquals(1, submitRequest.getSearchRequest().getPreFilterShardSize().intValue()); + } + + public void testParameters() throws IOException { + String tvString = randomTimeValue(1, 100); + doTestParameter("keep_alive", tvString, TimeValue.parseTimeValue(tvString, ""), SubmitAsyncSearchRequest::getKeepAlive); + doTestParameter("wait_for_completion_timeout", tvString, TimeValue.parseTimeValue(tvString, ""), + SubmitAsyncSearchRequest::getWaitForCompletionTimeout); + boolean keepOnCompletion = randomBoolean(); + doTestParameter("keep_on_completion", Boolean.toString(keepOnCompletion), keepOnCompletion, + SubmitAsyncSearchRequest::isKeepOnCompletion); + boolean requestCache = randomBoolean(); + doTestParameter("request_cache", Boolean.toString(requestCache), requestCache, + r -> r.getSearchRequest().requestCache()); + Integer batchReduceSize = randomIntBetween(2, 50); + doTestParameter("batched_reduce_size", Integer.toString(batchReduceSize), batchReduceSize, + r -> r.getSearchRequest().getBatchedReduceSize()); + } + + private void doTestParameter(String paramName, String paramValue, T expectedValue, + Function valueAccessor) { + Map params = new HashMap<>(); + params.put(paramName, paramValue); + RestRequest submitAsyncRestRequest = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/test_index/_async_search") + .withParams(params) + .withContent(new BytesArray("{}"), XContentType.JSON).build(); + dispatchRequest(submitAsyncRestRequest); + SubmitAsyncSearchRequest submitRequest = (SubmitAsyncSearchRequest) lastCapturedRequest; + assertEquals(expectedValue, valueAccessor.apply(submitRequest)); + } + + /** + * Sends the given request to the test controller + */ + protected void dispatchRequest(RestRequest request) { + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + controller.dispatchRequest(request, channel, threadContext); + } +}