Add async_search get and delete APIs to HLRC (#53828) (#53980)

This commit adds the "_async_searhc" get and delete APIs to the
AsyncSearchClient in the High Level Rest Client.

Relates to #49091
Backport of #53828
This commit is contained in:
Christoph Büscher 2020-03-23 21:21:36 +01:00 committed by GitHub
parent d276058c6c
commit 286c3660bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 341 additions and 20 deletions

View File

@ -21,7 +21,10 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.asyncsearch.AsyncSearchResponse; import org.elasticsearch.client.asyncsearch.AsyncSearchResponse;
import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.client.core.AcknowledgedResponse;
import java.io.IOException; import java.io.IOException;
@ -42,7 +45,7 @@ public class AsyncSearchClient {
* @return the response * @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response * @throws IOException in case there is a problem sending the request or parsing back the response
*/ */
public AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException { public AsyncSearchResponse submit(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options, return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options,
AsyncSearchResponse::fromXContent, emptySet()); AsyncSearchResponse::fromXContent, emptySet());
} }
@ -57,10 +60,61 @@ public class AsyncSearchClient {
* @param listener the listener to be notified upon request completion * @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request * @return cancellable that may be used to cancel the request
*/ */
public Cancellable submitAsyncSearchAsync(SubmitAsyncSearchRequest request, RequestOptions options, public Cancellable submitAsync(SubmitAsyncSearchRequest request, RequestOptions options,
ActionListener<AsyncSearchResponse> listener) { ActionListener<AsyncSearchResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options, return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options,
AsyncSearchResponse::fromXContent, listener, emptySet()); AsyncSearchResponse::fromXContent, listener, emptySet());
} }
/**
* Get an async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
*
*/
public AsyncSearchResponse get(GetAsyncSearchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::getAsyncSearch, options,
AsyncSearchResponse::fromXContent, emptySet());
}
/**
* Asynchronously get an async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable getAsync(GetAsyncSearchRequest request, RequestOptions options,
ActionListener<AsyncSearchResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::getAsyncSearch, options,
AsyncSearchResponse::fromXContent, listener, emptySet());
}
/**
* Delete an async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse delete(DeleteAsyncSearchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::deleteAsyncSearch, options,
AcknowledgedResponse::fromXContent, emptySet());
}
/**
* Asynchronously delete an async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable deleteAsync(DeleteAsyncSearchRequest request, RequestOptions options,
ActionListener<AcknowledgedResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::deleteAsyncSearch, options,
AcknowledgedResponse::fromXContent, listener, emptySet());
}
} }

View File

@ -19,8 +19,12 @@
package org.elasticsearch.client; package org.elasticsearch.client;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.RequestConverters.Params; import org.elasticsearch.client.RequestConverters.Params;
import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction;
@ -71,4 +75,29 @@ final class AsyncSearchRequestConverters {
} }
params.withBatchedReduceSize(request.getBatchedReduceSize()); params.withBatchedReduceSize(request.getBatchedReduceSize());
} }
static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_async_search")
.addPathPart(asyncSearchRequest.getId())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
Params params = new RequestConverters.Params();
if (asyncSearchRequest.getKeepAlive() != null) {
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
}
if (asyncSearchRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep());
}
request.addParameters(params.asMap());
return request;
}
static Request deleteAsyncSearch(DeleteAsyncSearchRequest deleteAsyncSearchRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_async_search")
.addPathPart(deleteAsyncSearchRequest.getId())
.build();
return new Request(HttpDelete.METHOD_NAME, endpoint);
}
} }

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.client.Validatable;
import java.util.Objects;
public class DeleteAsyncSearchRequest implements Validatable {
private final String id;
public DeleteAsyncSearchRequest(String id) {
this.id = id;
}
public String getId() {
return this.id;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DeleteAsyncSearchRequest request = (DeleteAsyncSearchRequest) o;
return Objects.equals(getId(), request.getId());
}
@Override
public int hashCode() {
return Objects.hash(getId());
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Objects;
import java.util.Optional;
public class GetAsyncSearchRequest implements Validatable {
private TimeValue waitForCompletion;
private TimeValue keepAlive;
public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis();
private final String id;
public GetAsyncSearchRequest(String id) {
this.id = id;
}
public String getId() {
return this.id;
}
public TimeValue getWaitForCompletion() {
return waitForCompletion;
}
public void setWaitForCompletion(TimeValue waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}
public TimeValue getKeepAlive() {
return keepAlive;
}
public void setKeepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
}
@Override
public Optional<ValidationException> validate() {
final ValidationException validationException = new ValidationException();
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) {
validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString());
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
}
return Optional.of(validationException);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GetAsyncSearchRequest request = (GetAsyncSearchRequest) o;
return Objects.equals(getId(), request.getId())
&& Objects.equals(getKeepAlive(), request.getKeepAlive())
&& Objects.equals(getWaitForCompletion(), request.getWaitForCompletion());
}
@Override
public int hashCode() {
return Objects.hash(getId(), getKeepAlive(), getWaitForCompletion());
}
}

View File

@ -19,8 +19,12 @@
package org.elasticsearch.client; package org.elasticsearch.client;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.asyncsearch.DeleteAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.GetAsyncSearchRequest;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest; import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -112,4 +116,35 @@ public class AsyncSearchRequestConvertersTests extends ESTestCase {
expectedParams.put("max_concurrent_shard_requests", Integer.toString(request.getMaxConcurrentShardRequests())); expectedParams.put("max_concurrent_shard_requests", Integer.toString(request.getMaxConcurrentShardRequests()));
} }
public void testGetAsyncSearch() throws Exception {
String id = randomAlphaOfLengthBetween(5, 10);
Map<String, String> expectedParams = new HashMap<>();
GetAsyncSearchRequest submitRequest = new GetAsyncSearchRequest(id);
if (randomBoolean()) {
TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setKeepAlive(keepAlive);
expectedParams.put("keep_alive", keepAlive.getStringRep());
}
if (randomBoolean()) {
TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", waitForCompletion.getStringRep());
}
Request request = AsyncSearchRequestConverters.getAsyncSearch(submitRequest);
String endpoint = "/_async_search/" + id;
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
}
public void testDeleteAsyncSearch() throws Exception {
String id = randomAlphaOfLengthBetween(5, 10);
DeleteAsyncSearchRequest deleteRequest = new DeleteAsyncSearchRequest(id);
Request request = AsyncSearchRequestConverters.deleteAsyncSearch(deleteRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_async_search/" + id, request.getEndpoint());
assertTrue(request.getParameters().isEmpty());
}
} }

View File

@ -21,36 +21,50 @@ package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class AsyncSearchIT extends ESRestHighLevelClientTestCase { public class AsyncSearchIT extends ESRestHighLevelClientTestCase {
public void testSubmitAsyncSearchRequest() throws IOException { public void testAsyncSearch() throws IOException {
String index = "test-index"; String index = "test-index";
createIndex(index, Settings.EMPTY); createIndex(index, Settings.EMPTY);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(sourceBuilder, index); SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(sourceBuilder, index);
// 15 sec should be enough to make sure we always complete right away submitRequest.setCleanOnCompletion(false);
request.setWaitForCompletion(new TimeValue(15, TimeUnit.SECONDS)); AsyncSearchResponse submitResponse = highLevelClient().asyncSearch().submit(submitRequest, RequestOptions.DEFAULT);
AsyncSearchResponse response = highLevelClient().asyncSearch().submitAsyncSearch(request, RequestOptions.DEFAULT); assertNotNull(submitResponse.getId());
assertFalse(response.isPartial()); assertFalse(submitResponse.isPartial());
assertTrue(response.getStartTime() > 0); assertTrue(submitResponse.getStartTime() > 0);
assertTrue(response.getExpirationTime() > 0); assertTrue(submitResponse.getExpirationTime() > 0);
assertNotNull(response.getSearchResponse()); assertNotNull(submitResponse.getSearchResponse());
if (response.isRunning() == false) { if (submitResponse.isRunning() == false) {
assertNull(response.getId()); assertFalse(submitResponse.isPartial());
assertFalse(response.isPartial());
} else { } else {
assertTrue(response.isPartial()); assertTrue(submitResponse.isPartial());
assertNotNull(response.getId());
} }
}
GetAsyncSearchRequest getRequest = new GetAsyncSearchRequest(submitResponse.getId());
AsyncSearchResponse getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT);
while (getResponse.isRunning()) {
getResponse = highLevelClient().asyncSearch().get(getRequest, RequestOptions.DEFAULT);
}
assertFalse(getResponse.isRunning());
assertFalse(getResponse.isPartial());
assertTrue(getResponse.getStartTime() > 0);
assertTrue(getResponse.getExpirationTime() > 0);
assertNotNull(getResponse.getSearchResponse());
DeleteAsyncSearchRequest deleteRequest = new DeleteAsyncSearchRequest(submitResponse.getId());
AcknowledgedResponse deleteAsyncSearchResponse = highLevelClient().asyncSearch().delete(deleteRequest,
RequestOptions.DEFAULT);
assertNotNull(deleteAsyncSearchResponse);
assertNotNull(deleteAsyncSearchResponse.isAcknowledged());
}
} }

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.TimeUnit;
public class GetAsyncSearchRequestTests extends ESTestCase {
public void testValidation() {
GetAsyncSearchRequest getAsyncSearchRequest = new GetAsyncSearchRequest(randomAlphaOfLength(10));
getAsyncSearchRequest.setKeepAlive(new TimeValue(0));
assertTrue(getAsyncSearchRequest.validate().isPresent());
ValidationException validationException = getAsyncSearchRequest.validate().get();
assertEquals(1, validationException.validationErrors().size());
assertEquals("Validation Failed: 1: keep_alive must be greater than 1 minute, got: 0s;", validationException.getMessage());
getAsyncSearchRequest.setKeepAlive(new TimeValue(1, TimeUnit.MINUTES));
assertFalse(getAsyncSearchRequest.validate().isPresent());
}
}

View File

@ -38,9 +38,9 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer; import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
import org.elasticsearch.xpack.core.security.SecurityContext;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;