Async search: rename REST parameters (#54198)

This commit renames wait_for_completion to wait_for_completion_timeout in submit async search and get async search.
Also it renames clean_on_completion to keep_on_completion and turns around its behaviour.

Closes #54069
This commit is contained in:
Luca Cavanna 2020-03-26 09:40:05 +01:00
parent 487b273286
commit ff269160af
16 changed files with 87 additions and 86 deletions

View File

@ -47,14 +47,14 @@ final class AsyncSearchRequestConverters {
request.setEntity(RequestConverters.createEntity(asyncSearchRequest.getSearchSource(), REQUEST_BODY_CONTENT_TYPE));
}
// set async search submit specific parameters
if (asyncSearchRequest.isCleanOnCompletion() != null) {
params.putParam("clean_on_completion", asyncSearchRequest.isCleanOnCompletion().toString());
if (asyncSearchRequest.isKeepOnCompletion() != null) {
params.putParam("keep_on_completion", asyncSearchRequest.isKeepOnCompletion().toString());
}
if (asyncSearchRequest.getKeepAlive() != null) {
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
}
if (asyncSearchRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep());
if (asyncSearchRequest.getWaitForCompletionTimeout() != null) {
params.putParam("wait_for_completion_timeout", asyncSearchRequest.getWaitForCompletionTimeout().getStringRep());
}
request.addParameters(params.asMap());
return request;
@ -76,7 +76,7 @@ final class AsyncSearchRequestConverters {
params.withBatchedReduceSize(request.getBatchedReduceSize());
}
static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) throws IOException {
static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_async_search")
.addPathPart(asyncSearchRequest.getId())
@ -87,13 +87,13 @@ final class AsyncSearchRequestConverters {
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
}
if (asyncSearchRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep());
params.putParam("wait_for_completion_timeout", asyncSearchRequest.getWaitForCompletion().getStringRep());
}
request.addParameters(params.asMap());
return request;
}
static Request deleteAsyncSearch(DeleteAsyncSearchRequest deleteAsyncSearchRequest) throws IOException {
static Request deleteAsyncSearch(DeleteAsyncSearchRequest deleteAsyncSearchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_async_search")
.addPathPart(deleteAsyncSearchRequest.getId())

View File

@ -43,8 +43,8 @@ public class SubmitAsyncSearchRequest implements Validatable {
public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();
private TimeValue waitForCompletion;
private Boolean cleanOnCompletion;
private TimeValue waitForCompletionTimeout;
private Boolean keepOnCompletion;
private TimeValue keepAlive;
private final SearchRequest searchRequest;
@ -70,29 +70,29 @@ public class SubmitAsyncSearchRequest implements Validatable {
/**
* Get the minimum time that the request should wait before returning a partial result (defaults to 1 second).
*/
public TimeValue getWaitForCompletion() {
return waitForCompletion;
public TimeValue getWaitForCompletionTimeout() {
return waitForCompletionTimeout;
}
/**
* Sets the minimum time that the request should wait before returning a partial result (defaults to 1 second).
*/
public void setWaitForCompletion(TimeValue waitForCompletion) {
this.waitForCompletion = waitForCompletion;
public void setWaitForCompletionTimeout(TimeValue waitForCompletionTimeout) {
this.waitForCompletionTimeout = waitForCompletionTimeout;
}
/**
* Returns whether the resource resource should be removed on completion or failure (defaults to true).
*/
public Boolean isCleanOnCompletion() {
return cleanOnCompletion;
public Boolean isKeepOnCompletion() {
return keepOnCompletion;
}
/**
* Determines if the resource should be removed on completion or failure (defaults to true).
*/
public void setCleanOnCompletion(boolean cleanOnCompletion) {
this.cleanOnCompletion = cleanOnCompletion;
public void setKeepOnCompletion(boolean keepOnCompletion) {
this.keepOnCompletion = keepOnCompletion;
}
/**
@ -273,12 +273,12 @@ public class SubmitAsyncSearchRequest implements Validatable {
SubmitAsyncSearchRequest request = (SubmitAsyncSearchRequest) o;
return Objects.equals(searchRequest, request.searchRequest)
&& Objects.equals(getKeepAlive(), request.getKeepAlive())
&& Objects.equals(getWaitForCompletion(), request.getWaitForCompletion())
&& Objects.equals(isCleanOnCompletion(), request.isCleanOnCompletion());
&& Objects.equals(getWaitForCompletionTimeout(), request.getWaitForCompletionTimeout())
&& Objects.equals(isKeepOnCompletion(), request.isKeepOnCompletion());
}
@Override
public int hashCode() {
return Objects.hash(searchRequest, getKeepAlive(), getWaitForCompletion(), isCleanOnCompletion());
return Objects.hash(searchRequest, getKeepAlive(), getWaitForCompletionTimeout(), isKeepOnCompletion());
}
}

View File

@ -56,9 +56,9 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
setRandomIndicesOptions(submitRequest::setIndicesOptions, submitRequest::getIndicesOptions, expectedParams);
if (randomBoolean()) {
boolean cleanOnCompletion = randomBoolean();
submitRequest.setCleanOnCompletion(cleanOnCompletion);
expectedParams.put("clean_on_completion", Boolean.toString(cleanOnCompletion));
boolean keepOnCompletion = randomBoolean();
submitRequest.setKeepOnCompletion(keepOnCompletion);
expectedParams.put("keep_on_completion", Boolean.toString(keepOnCompletion));
}
if (randomBoolean()) {
TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test");
@ -66,9 +66,9 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
expectedParams.put("keep_alive", keepAlive.getStringRep());
}
if (randomBoolean()) {
TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", waitForCompletion.getStringRep());
TimeValue waitForCompletionTimeout = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletionTimeout(waitForCompletionTimeout);
expectedParams.put("wait_for_completion_timeout", waitForCompletionTimeout.getStringRep());
}
Request request = AsyncSearchRequestConverters.submitAsyncSearch(submitRequest);
@ -128,7 +128,7 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
if (randomBoolean()) {
TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", waitForCompletion.getStringRep());
expectedParams.put("wait_for_completion_timeout", waitForCompletion.getStringRep());
}
Request request = AsyncSearchRequestConverters.getAsyncSearch(submitRequest);

View File

@ -36,7 +36,7 @@ public class AsyncSearchIT extends ESRestHighLevelClientTestCase {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(sourceBuilder, index);
submitRequest.setCleanOnCompletion(false);
submitRequest.setKeepOnCompletion(true);
AsyncSearchResponse submitResponse = highLevelClient().asyncSearch().submit(submitRequest, RequestOptions.DEFAULT);
assertNotNull(submitResponse.getId());
assertFalse(submitResponse.isPartial());

View File

@ -31,7 +31,7 @@ POST /sales*/_async_search?size=0
}
--------------------------------------------------
// TEST[setup:sales]
// TEST[s/size=0/size=0&wait_for_completion=10s&clean_on_completion=false/]
// TEST[s/size=0/size=0&wait_for_completion_timeout=10s&keep_on_completion=true/]
The response contains an identifier of the search being executed.
You can use this ID to later retrieve the search's final results.
@ -88,8 +88,8 @@ results are returned as part of the <<search-api-response-body,`response`>> obje
<6> How many documents are currently matching the query, which belong to the shards that have already completed the search
It is possible to block and wait until the search is completed up to a certain
timeout by providing the `wait_for_completion` parameter, which defaults to
`1` second.
timeout by providing the `wait_for_completion_timeout` parameter, which
defaults to `1` second.
You can also specify how long the async search needs to be
available through the `keep_alive` parameter, which defaults to `5d` (five days).
@ -193,11 +193,12 @@ first.
<6> Partial aggregations results, coming from the shards that have already
completed the execution of the query.
The `wait_for_completion` parameter, which defaults to `1`, can also be provided
when calling the Get Async Search API, in order to wait for the search to be
completed up until the provided timeout. Final results will be returned if
available before the timeout expires, otherwise the currently available results
will be returned once the timeout expires.
The `wait_for_completion_timeout` parameter can also be provided when calling
the Get Async Search API, in order to wait for the search to be completed up
until the provided timeout. Final results will be returned if available before
the timeout expires, otherwise the currently available results will be
returned once the timeout expires. By default no timeout is set meaning that
the currently available results will be returned without any additional wait.
The `keep_alive` parameter specifies how long the async search should be
available in the cluster. When not specified, the `keep_alive` set with the

View File

@ -127,16 +127,16 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
final Request request = new Request("POST", indexName + "/_async_search");
setRunAsHeader(request, user);
request.addParameter("q", query);
request.addParameter("wait_for_completion", waitForCompletion.toString());
request.addParameter("wait_for_completion_timeout", waitForCompletion.toString());
// we do the cleanup explicitly
request.addParameter("clean_on_completion", "false");
request.addParameter("keep_on_completion", "true");
return client().performRequest(request);
}
static Response getAsyncSearch(String id, String user) throws IOException {
final Request request = new Request("GET", "/_async_search/" + id);
setRunAsHeader(request, user);
request.addParameter("wait_for_completion", "0ms");
request.addParameter("wait_for_completion_timeout", "0ms");
return client().performRequest(request);
}

View File

@ -34,8 +34,8 @@ public class RestGetAsyncSearchAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
GetAsyncSearchAction.Request get = new GetAsyncSearchAction.Request(request.param("id"));
if (request.hasParam("wait_for_completion")) {
get.setWaitForCompletion(request.paramAsTime("wait_for_completion", get.getWaitForCompletion()));
if (request.hasParam("wait_for_completion_timeout")) {
get.setWaitForCompletion(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletion()));
}
if (request.hasParam("keep_alive")) {
get.setKeepAlive(request.paramAsTime("keep_alive", get.getKeepAlive()));

View File

@ -51,14 +51,14 @@ public final class RestSubmitAsyncSearchAction extends BaseRestHandler {
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(submit.getSearchRequest(), request, parser, setSize));
if (request.hasParam("wait_for_completion")) {
submit.setWaitForCompletion(request.paramAsTime("wait_for_completion", submit.getWaitForCompletion()));
if (request.hasParam("wait_for_completion_timeout")) {
submit.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", submit.getWaitForCompletionTimeout()));
}
if (request.hasParam("keep_alive")) {
submit.setKeepAlive(request.paramAsTime("keep_alive", submit.getKeepAlive()));
}
if (request.hasParam("clean_on_completion")) {
submit.setCleanOnCompletion(request.paramAsBoolean("clean_on_completion", submit.isCleanOnCompletion()));
if (request.hasParam("keep_on_completion")) {
submit.setKeepOnCompletion(request.paramAsBoolean("keep_on_completion", submit.isKeepOnCompletion()));
}
return channel -> {
RestStatusToXContentListener<AsyncSearchResponse> listener = new RestStatusToXContentListener<>(channel);

View File

@ -73,7 +73,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
new ActionListener<AsyncSearchResponse>() {
@Override
public void onResponse(AsyncSearchResponse searchResponse) {
if (searchResponse.isRunning() || request.isCleanOnCompletion() == false) {
if (searchResponse.isRunning() || request.isKeepOnCompletion()) {
// the task is still running and the user cannot wait more so we create
// a document for further retrieval
try {
@ -126,7 +126,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
public void onFailure(Exception exc) {
submitListener.onFailure(exc);
}
}, request.getWaitForCompletion());
}, request.getWaitForCompletionTimeout());
}
private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, CancellableTask submitTask, TimeValue keepAlive) {

View File

@ -236,14 +236,14 @@ public class AsyncSearchActionTests extends AsyncSearchIntegTestCase {
public void testNoIndex() throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("invalid-*");
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
AsyncSearchResponse response = submitAsyncSearch(request);
assertNotNull(response.getSearchResponse());
assertFalse(response.isRunning());
assertThat(response.getSearchResponse().getTotalShards(), equalTo(0));
request = new SubmitAsyncSearchRequest("invalid");
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
response = submitAsyncSearch(request);
assertNull(response.getSearchResponse());
assertNotNull(response.getFailure());
@ -257,7 +257,7 @@ public class AsyncSearchActionTests extends AsyncSearchIntegTestCase {
request.getSearchRequest().source(
new SearchSourceBuilder().aggregation(new CancellingAggregationBuilder("test"))
);
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
AsyncSearchResponse response = submitAsyncSearch(request);
assertNotNull(response.getSearchResponse());
assertTrue(response.isRunning());

View File

@ -139,7 +139,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
int progressStep) throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, indexName);
request.setBatchedReduceSize(progressStep);
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
ClusterSearchShardsResponse response = dataNodeClient().admin().cluster()
.prepareSearchShards(request.getSearchRequest().indices()).get();
AtomicInteger failures = new AtomicInteger(numFailures);

View File

@ -35,9 +35,9 @@ public class SubmitAsyncSearchRequestTests extends AbstractWireSerializingTransf
searchRequest = new SubmitAsyncSearchRequest();
}
if (randomBoolean()) {
searchRequest.setWaitForCompletion(TimeValue.parseTimeValue(randomPositiveTimeValue(), "wait_for_completion"));
searchRequest.setWaitForCompletionTimeout(TimeValue.parseTimeValue(randomPositiveTimeValue(), "wait_for_completion"));
}
searchRequest.setCleanOnCompletion(randomBoolean());
searchRequest.setKeepOnCompletion(randomBoolean());
if (randomBoolean()) {
searchRequest.setKeepAlive(TimeValue.parseTimeValue(randomPositiveTimeValue(), "keep_alive"));
}

View File

@ -30,8 +30,8 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class SubmitAsyncSearchRequest extends ActionRequest {
public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();
private TimeValue waitForCompletion = TimeValue.timeValueSeconds(1);
private boolean cleanOnCompletion = true;
private TimeValue waitForCompletionTimeout = TimeValue.timeValueSeconds(1);
private boolean keepOnCompletion = false;
private TimeValue keepAlive = TimeValue.timeValueDays(5);
private final SearchRequest request;
@ -56,17 +56,17 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
public SubmitAsyncSearchRequest(StreamInput in) throws IOException {
this.request = new SearchRequest(in);
this.waitForCompletion = in.readTimeValue();
this.waitForCompletionTimeout = in.readTimeValue();
this.keepAlive = in.readTimeValue();
this.cleanOnCompletion = in.readBoolean();
this.keepOnCompletion = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
request.writeTo(out);
out.writeTimeValue(waitForCompletion);
out.writeTimeValue(waitForCompletionTimeout);
out.writeTimeValue(keepAlive);
out.writeBoolean(cleanOnCompletion);
out.writeBoolean(keepOnCompletion);
}
/**
@ -84,13 +84,13 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
/**
* Sets the minimum time that the request should wait before returning a partial result (defaults to 1 second).
*/
public SubmitAsyncSearchRequest setWaitForCompletion(TimeValue waitForCompletion) {
this.waitForCompletion = waitForCompletion;
public SubmitAsyncSearchRequest setWaitForCompletionTimeout(TimeValue waitForCompletionTimeout) {
this.waitForCompletionTimeout = waitForCompletionTimeout;
return this;
}
public TimeValue getWaitForCompletion() {
return waitForCompletion;
public TimeValue getWaitForCompletionTimeout() {
return waitForCompletionTimeout;
}
/**
@ -115,13 +115,13 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
/**
* Should the resource be removed on completion or failure (defaults to true).
*/
public SubmitAsyncSearchRequest setCleanOnCompletion(boolean value) {
this.cleanOnCompletion = value;
public SubmitAsyncSearchRequest setKeepOnCompletion(boolean value) {
this.keepOnCompletion = value;
return this;
}
public boolean isCleanOnCompletion() {
return cleanOnCompletion;
public boolean isKeepOnCompletion() {
return keepOnCompletion;
}
@Override
@ -165,22 +165,22 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubmitAsyncSearchRequest request1 = (SubmitAsyncSearchRequest) o;
return cleanOnCompletion == request1.cleanOnCompletion &&
waitForCompletion.equals(request1.waitForCompletion) &&
return keepOnCompletion == request1.keepOnCompletion &&
waitForCompletionTimeout.equals(request1.waitForCompletionTimeout) &&
keepAlive.equals(request1.keepAlive) &&
request.equals(request1.request);
}
@Override
public int hashCode() {
return Objects.hash(waitForCompletion, cleanOnCompletion, keepAlive, request);
return Objects.hash(waitForCompletionTimeout, keepOnCompletion, keepAlive, request);
}
@Override
public String toString() {
return "SubmitAsyncSearchRequest{" +
"waitForCompletion=" + waitForCompletion +
", cleanOnCompletion=" + cleanOnCompletion +
"waitForCompletionTimeout=" + waitForCompletionTimeout +
", keepOnCompletion=" + keepOnCompletion +
", keepAlive=" + keepAlive +
", request=" + request +
'}';

View File

@ -21,15 +21,13 @@
]
},
"params":{
"wait_for_completion":{
"wait_for_completion_timeout":{
"type":"time",
"description":"Specify the time that the request should block waiting for the final response",
"default": "1s"
"description":"Specify the time that the request should block waiting for the final response"
},
"keep_alive": {
"type": "time",
"description": "Specify the time interval in which the results (partial or final) for this search will be available",
"default": "5d"
"description": "Specify the time interval in which the results (partial or final) for this search will be available"
},
"typed_keys":{
"type":"boolean",

View File

@ -27,18 +27,20 @@
]
},
"params":{
"wait_for_completion":{
"wait_for_completion_timeout":{
"type":"time",
"description":"Specify the time that the request should block waiting for the final response",
"default": "1s"
},
"clean_on_completion":{
"keep_on_completion":{
"type":"boolean",
"description":"Control whether the response should not be stored in the cluster if it completed within the provided [wait_for_completion] time (default: true)"
"description":"Control whether the response should be stored in the cluster if it completed within the provided [wait_for_completion] time (default: true)",
"default":false
},
"keep_alive": {
"type": "time",
"description": "Update the time interval in which the results (partial or final) for this search will be available"
"description": "Update the time interval in which the results (partial or final) for this search will be available",
"default": "5d"
},
"batched_reduce_size":{
"type":"number",

View File

@ -43,7 +43,7 @@
async_search.submit:
index: test-*
batched_reduce_size: 2
wait_for_completion: 10s
wait_for_completion_timeout: 10s
body:
query:
match_all: {}
@ -63,8 +63,8 @@
async_search.submit:
index: test-*
batched_reduce_size: 2
wait_for_completion: 10s
clean_on_completion: false
wait_for_completion_timeout: 10s
keep_on_completion: true
body:
aggs:
max:
@ -82,8 +82,8 @@
async_search.submit:
index: test-*
batched_reduce_size: 2
wait_for_completion: 10s
clean_on_completion: false
wait_for_completion_timeout: 10s
keep_on_completion: true
typed_keys: true
body:
aggs: