Add async_search.submit to HLRC #53592 (#53852)

This commit adds a new AsyncSearchClient to the High Level Rest Client which
initially supporst the submitAsyncSearch in its blocking and non-blocking
flavour. Also adding client side request and response objects and parsing code
to parse the xContent output of the client side AsyncSearchResponse together
with parsing roundtrip tests and a simple roundtrip integration test.

Relates to #49091
Backport of #53592
This commit is contained in:
Christoph Büscher 2020-03-20 13:15:58 +01:00 committed by GitHub
parent 10a2a95b25
commit 8eacb153df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1028 additions and 29 deletions

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.asyncsearch.AsyncSearchResponse;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import java.io.IOException;
import static java.util.Collections.emptySet;
public class AsyncSearchClient {
private final RestHighLevelClient restHighLevelClient;
AsyncSearchClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}
/**
* Submit a new async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options,
AsyncSearchResponse::fromXContent, emptySet());
}
/**
* Asynchronously submit a new async search request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> the docs</a> for more.
* <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-ilm-ilm-get-lifecycle-policy.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable submitAsyncSearchAsync(SubmitAsyncSearchRequest request, RequestOptions options,
ActionListener<AsyncSearchResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, AsyncSearchRequestConverters::submitAsyncSearch, options,
AsyncSearchResponse::fromXContent, listener, emptySet());
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.RequestConverters.Params;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.rest.action.search.RestSearchAction;
import java.io.IOException;
import java.util.Locale;
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
final class AsyncSearchRequestConverters {
static Request submitAsyncSearch(SubmitAsyncSearchRequest asyncSearchRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addCommaSeparatedPathParts(
asyncSearchRequest.getIndices())
.addPathPartAsIs("_async_search").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new RequestConverters.Params();
// add all typical search params and search request source as body
addSearchRequestParams(params, asyncSearchRequest);
if (asyncSearchRequest.getSearchSource() != null) {
request.setEntity(RequestConverters.createEntity(asyncSearchRequest.getSearchSource(), REQUEST_BODY_CONTENT_TYPE));
}
// set async search submit specific parameters
if (asyncSearchRequest.isCleanOnCompletion() != null) {
params.putParam("clean_on_completion", asyncSearchRequest.isCleanOnCompletion().toString());
}
if (asyncSearchRequest.getKeepAlive() != null) {
params.putParam("keep_alive", asyncSearchRequest.getKeepAlive().getStringRep());
}
if (asyncSearchRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", asyncSearchRequest.getWaitForCompletion().getStringRep());
}
request.addParameters(params.asMap());
return request;
}
static void addSearchRequestParams(Params params, SubmitAsyncSearchRequest request) {
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
params.withRouting(request.getRouting());
params.withPreference(request.getPreference());
params.withIndicesOptions(request.getIndicesOptions());
params.withSearchType(request.getSearchType().name().toLowerCase(Locale.ROOT));
params.withMaxConcurrentShardRequests(request.getMaxConcurrentShardRequests());
if (request.getRequestCache() != null) {
params.withRequestCache(request.getRequestCache());
}
if (request.getAllowPartialSearchResults() != null) {
params.withAllowPartialResults(request.getAllowPartialSearchResults());
}
params.withBatchedReduceSize(request.getBatchedReduceSize());
}
}

View File

@ -419,22 +419,22 @@ final class RequestConverters {
return request;
}
private static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
params.withRouting(searchRequest.routing());
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
params.putParam("max_concurrent_shard_requests", Integer.toString(searchRequest.getMaxConcurrentShardRequests()));
params.withMaxConcurrentShardRequests(searchRequest.getMaxConcurrentShardRequests());
if (searchRequest.requestCache() != null) {
params.putParam("request_cache", Boolean.toString(searchRequest.requestCache()));
params.withRequestCache(searchRequest.requestCache());
}
if (searchRequest.allowPartialSearchResults() != null) {
params.putParam("allow_partial_search_results", Boolean.toString(searchRequest.allowPartialSearchResults()));
params.withAllowPartialResults(searchRequest.allowPartialSearchResults());
}
params.putParam("batched_reduce_size", Integer.toString(searchRequest.getBatchedReduceSize()));
params.withBatchedReduceSize(searchRequest.getBatchedReduceSize());
if (searchRequest.scroll() != null) {
params.putParam("scroll", searchRequest.scroll().keepAlive());
}
@ -872,6 +872,26 @@ final class RequestConverters {
return putParam("preference", preference);
}
Params withSearchType(String searchType) {
return putParam("search_type", searchType);
}
Params withMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
return putParam("max_concurrent_shard_requests", Integer.toString(maxConcurrentShardRequests));
}
Params withBatchedReduceSize(int batchedReduceSize) {
return putParam("batched_reduce_size", Integer.toString(batchedReduceSize));
}
Params withRequestCache(boolean requestCache) {
return putParam("request_cache", Boolean.toString(requestCache));
}
Params withAllowPartialResults(boolean allowPartialSearchResults) {
return putParam("allow_partial_search_results", Boolean.toString(allowPartialSearchResults));
}
Params withRealtime(boolean realtime) {
if (realtime == false) {
return putParam("realtime", Boolean.FALSE.toString());

View File

@ -265,6 +265,7 @@ public class RestHighLevelClient implements Closeable {
private final TransformClient transformClient = new TransformClient(this);
private final EnrichClient enrichClient = new EnrichClient(this);
private final EqlClient eqlClient = new EqlClient(this);
private final AsyncSearchClient asyncSearchClient = new AsyncSearchClient(this);
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
@ -428,13 +429,23 @@ public class RestHighLevelClient implements Closeable {
* A wrapper for the {@link RestHighLevelClient} that provides methods for
* accessing the Elastic Index Lifecycle APIs.
* <p>
* See the <a href="http://FILL-ME-IN-WE-HAVE-NO-DOCS-YET.com"> X-Pack APIs
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/index-lifecycle-management-api.html"> X-Pack APIs
* on elastic.co</a> for more information.
*/
public IndexLifecycleClient indexLifecycle() {
return ilmClient;
}
/**
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Elastic Index Async Search APIs.
* <p>
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html"> X-Pack APIs on elastic.co</a>
* for more information.
*/
public AsyncSearchClient asyncSearch() {
return asyncSearchClient;
}
/**
* Provides methods for accessing the Elastic Licensed Migration APIs that
* are shipped with the default distribution of Elasticsearch. All of

View File

@ -0,0 +1,214 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
/**
* A response of an async search request.
*/
public class AsyncSearchResponse implements ToXContentObject {
@Nullable
private final String id;
private final int version;
@Nullable
private final SearchResponse searchResponse;
@Nullable
private final ElasticsearchException error;
private final boolean isRunning;
private final boolean isPartial;
private final long startTimeMillis;
private final long expirationTimeMillis;
/**
* Creates an {@link AsyncSearchResponse} with the arguments that are always present in the server response
*/
AsyncSearchResponse(int version,
boolean isPartial,
boolean isRunning,
long startTimeMillis,
long expirationTimeMillis,
@Nullable String id,
@Nullable SearchResponse searchResponse,
@Nullable ElasticsearchException error) {
this.version = version;
this.isPartial = isPartial;
this.isRunning = isRunning;
this.startTimeMillis = startTimeMillis;
this.expirationTimeMillis = expirationTimeMillis;
this.id = id;
this.searchResponse = searchResponse;
this.error = error;
}
/**
* Returns the id of the async search request or null if the response is not stored in the cluster.
*/
@Nullable
public String getId() {
return id;
}
/**
* Returns the version of this response.
*/
public int getVersion() {
return version;
}
/**
* Returns the current {@link SearchResponse} or <code>null</code> if not available.
*
* See {@link #isPartial()} to determine whether the response contains partial or complete
* results.
*/
public SearchResponse getSearchResponse() {
return searchResponse;
}
/**
* Returns the failure reason or null if the query is running or has completed normally.
*/
public ElasticsearchException getFailure() {
return error;
}
/**
* Returns <code>true</code> if the {@link SearchResponse} contains partial
* results computed from a subset of the total shards.
*/
public boolean isPartial() {
return isPartial;
}
/**
* Whether the search is still running in the cluster.
*
* A value of <code>false</code> indicates that the response is final
* even if {@link #isPartial()} returns <code>true</code>. In such case,
* the partial response represents the status of the search before a
* non-recoverable failure.
*/
public boolean isRunning() {
return isRunning;
}
/**
* When this response was created as a timestamp in milliseconds since epoch.
*/
public long getStartTime() {
return startTimeMillis;
}
/**
* When this response will expired as a timestamp in milliseconds since epoch.
*/
public long getExpirationTime() {
return expirationTimeMillis;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (id != null) {
builder.field("id", id);
}
builder.field("version", version);
builder.field("is_partial", isPartial);
builder.field("is_running", isRunning);
builder.field("start_time_in_millis", startTimeMillis);
builder.field("expiration_time_in_millis", expirationTimeMillis);
if (searchResponse != null) {
builder.field("response");
searchResponse.toXContent(builder, params);
}
if (error != null) {
builder.startObject("error");
error.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
return builder;
}
public static final ParseField ID_FIELD = new ParseField("id");
public static final ParseField VERSION_FIELD = new ParseField("version");
public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial");
public static final ParseField IS_RUNNING_FIELD = new ParseField("is_running");
public static final ParseField START_TIME_FIELD = new ParseField("start_time_in_millis");
public static final ParseField EXPIRATION_FIELD = new ParseField("expiration_time_in_millis");
public static final ParseField RESPONSE_FIELD = new ParseField("response");
public static final ParseField ERROR_FIELD = new ParseField("error");
public static final ConstructingObjectParser<AsyncSearchResponse, Void> PARSER = new ConstructingObjectParser<>(
"submit_async_search_response", true,
args -> new AsyncSearchResponse(
(int) args[0],
(boolean) args[1],
(boolean) args[2],
(long) args[3],
(long) args[4],
(String) args[5],
(SearchResponse) args[6],
(ElasticsearchException) args[7]));
static {
PARSER.declareInt(constructorArg(), VERSION_FIELD);
PARSER.declareBoolean(constructorArg(), IS_PARTIAL_FIELD);
PARSER.declareBoolean(constructorArg(), IS_RUNNING_FIELD);
PARSER.declareLong(constructorArg(), START_TIME_FIELD);
PARSER.declareLong(constructorArg(), EXPIRATION_FIELD);
PARSER.declareString(optionalConstructorArg(), ID_FIELD);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> AsyncSearchResponse.parseSearchResponse(p),
RESPONSE_FIELD);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), ERROR_FIELD);
}
private static SearchResponse parseSearchResponse(XContentParser p) throws IOException {
// we should be before the opening START_OBJECT of the response
ensureExpectedToken(Token.START_OBJECT, p.currentToken(), p::getTokenLocation);
p.nextToken();
return SearchResponse.innerFromXContent(p);
}
public static AsyncSearchResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.apply(parser, null);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -0,0 +1,284 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.Objects;
import java.util.Optional;
/**
* A request to track asynchronously the progress of a search against one or more indices.
*/
public class SubmitAsyncSearchRequest implements Validatable {
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 1;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5;
private static final boolean DEFAULT_CCS_MINIMIZE_ROUNDTRIPS = false;
private static final boolean DEFAULT_REQUEST_CACHE_VALUE = true;
public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();
private TimeValue waitForCompletion;
private Boolean cleanOnCompletion;
private TimeValue keepAlive;
private final SearchRequest searchRequest;
/**
* Creates a new request
*/
public SubmitAsyncSearchRequest(SearchSourceBuilder source, String... indices) {
this.searchRequest = new SearchRequest(indices, source);
searchRequest.setCcsMinimizeRoundtrips(DEFAULT_CCS_MINIMIZE_ROUNDTRIPS);
searchRequest.setPreFilterShardSize(DEFAULT_PRE_FILTER_SHARD_SIZE);
searchRequest.setBatchedReduceSize(DEFAULT_BATCHED_REDUCE_SIZE);
searchRequest.requestCache(DEFAULT_REQUEST_CACHE_VALUE);
}
/**
* Get the target indices
*/
public String[] getIndices() {
return this.searchRequest.indices();
}
/**
* Get the minimum time that the request should wait before returning a partial result (defaults to 1 second).
*/
public TimeValue getWaitForCompletion() {
return waitForCompletion;
}
/**
* Sets the minimum time that the request should wait before returning a partial result (defaults to 1 second).
*/
public void setWaitForCompletion(TimeValue waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}
/**
* Returns whether the resource resource should be removed on completion or failure (defaults to true).
*/
public Boolean isCleanOnCompletion() {
return cleanOnCompletion;
}
/**
* Determines if the resource should be removed on completion or failure (defaults to true).
*/
public void setCleanOnCompletion(boolean cleanOnCompletion) {
this.cleanOnCompletion = cleanOnCompletion;
}
/**
* Get the amount of time after which the result will expire (defaults to 5 days).
*/
public TimeValue getKeepAlive() {
return keepAlive;
}
/**
* Sets the amount of time after which the result will expire (defaults to 5 days).
*/
public void setKeepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
}
// setters for request parameters of the wrapped SearchRequest
/**
* Set the routing value to control the shards that the search will be executed on.
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public void setRouting(String routing) {
this.searchRequest.routing(routing);
}
/**
* Set the routing values to control the shards that the search will be executed on.
*/
public void setRoutings(String... routings) {
this.searchRequest.routing(routings);
}
/**
* Get the routing value to control the shards that the search will be executed on.
*/
public String getRouting() {
return this.searchRequest.routing();
}
/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public void setPreference(String preference) {
this.searchRequest.preference(preference);
}
/**
* Get the preference to execute the search.
*/
public String getPreference() {
return this.searchRequest.preference();
}
/**
* Specifies what type of requested indices to ignore and how to deal with indices wildcard expressions.
*/
public void setIndicesOptions(IndicesOptions indicesOptions) {
this.searchRequest.indicesOptions(indicesOptions);
}
/**
* Get the indices Options.
*/
public IndicesOptions getIndicesOptions() {
return this.searchRequest.indicesOptions();
}
/**
* The search type to execute, defaults to {@link SearchType#DEFAULT}.
*/
public void setSearchType(SearchType searchType) {
this.searchRequest.searchType(searchType);
}
/**
* Get the search type to execute, defaults to {@link SearchType#DEFAULT}.
*/
public SearchType getSearchType() {
return this.searchRequest.searchType();
}
/**
* Sets if this request should allow partial results. (If method is not called,
* will default to the cluster level setting).
*/
public void setAllowPartialSearchResults(boolean allowPartialSearchResults) {
this.searchRequest.allowPartialSearchResults(allowPartialSearchResults);
}
/**
* Gets if this request should allow partial results.
*/
public Boolean getAllowPartialSearchResults() {
return this.searchRequest.allowPartialSearchResults();
}
/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
public void setBatchedReduceSize(int batchedReduceSize) {
this.searchRequest.setBatchedReduceSize(batchedReduceSize);
}
/**
* Gets the number of shard results that should be reduced at once on the coordinating node.
* This defaults to 5 for {@link SubmitAsyncSearchRequest}.
*/
public int getBatchedReduceSize() {
return this.searchRequest.getBatchedReduceSize();
}
/**
* Sets if this request should use the request cache or not, assuming that it can (for
* example, if "now" is used, it will never be cached). By default (not set, or null,
* will default to the index level setting if request cache is enabled or not).
*/
public void setRequestCache(Boolean requestCache) {
this.searchRequest.requestCache(requestCache);
}
/**
* Gets if this request should use the request cache or not.
* Defaults to `true` for {@link SubmitAsyncSearchRequest}.
*/
public Boolean getRequestCache() {
return this.searchRequest.requestCache();
}
/**
* Returns the number of shard requests that should be executed concurrently on a single node.
* The default is {@code 5}.
*/
public int getMaxConcurrentShardRequests() {
return this.searchRequest.getMaxConcurrentShardRequests();
}
/**
* Sets the number of shard requests that should be executed concurrently on a single node.
* The default is {@code 5}.
*/
public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
this.searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
}
/**
* Gets if the source of the {@link SearchSourceBuilder} initially used on this request.
*/
public SearchSourceBuilder getSearchSource() {
return this.searchRequest.source();
}
@Override
public Optional<ValidationException> validate() {
final ValidationException validationException = new ValidationException();
if (searchRequest.isSuggestOnly()) {
validationException.addValidationError("suggest-only queries are not supported");
}
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEP_ALIVE) {
validationException.addValidationError("[keep_alive] must be greater than 1 minute, got: " + keepAlive.toString());
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
}
return Optional.of(validationException);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SubmitAsyncSearchRequest request = (SubmitAsyncSearchRequest) o;
return Objects.equals(searchRequest, request.searchRequest)
&& Objects.equals(getKeepAlive(), request.getKeepAlive())
&& Objects.equals(getWaitForCompletion(), request.getWaitForCompletion())
&& Objects.equals(isCleanOnCompletion(), request.isCleanOnCompletion());
}
@Override
public int hashCode() {
return Objects.hash(searchRequest, getKeepAlive(), getWaitForCompletion(), isCleanOnCompletion());
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.asyncsearch.SubmitAsyncSearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;
import static org.elasticsearch.client.RequestConvertersTests.createTestSearchSourceBuilder;
import static org.elasticsearch.client.RequestConvertersTests.setRandomIndicesOptions;
public class AsyncSearchRequestConvertersTests extends ESTestCase {
public void testSubmitAsyncSearch() throws Exception {
String[] indices = RequestConvertersTests.randomIndicesNames(0, 5);
Map<String, String> expectedParams = new HashMap<>();
SearchSourceBuilder searchSourceBuilder = createTestSearchSourceBuilder();
SubmitAsyncSearchRequest submitRequest = new SubmitAsyncSearchRequest(searchSourceBuilder, indices);
// the following parameters might be overwritten by random ones later,
// but we need to set these since they are the default we send over http
expectedParams.put("request_cache", "true");
expectedParams.put("batched_reduce_size", "5");
setRandomSearchParams(submitRequest, expectedParams);
setRandomIndicesOptions(submitRequest::setIndicesOptions, submitRequest::getIndicesOptions, expectedParams);
if (randomBoolean()) {
boolean cleanOnCompletion = randomBoolean();
submitRequest.setCleanOnCompletion(cleanOnCompletion);
expectedParams.put("clean_on_completion", Boolean.toString(cleanOnCompletion));
}
if (randomBoolean()) {
TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setKeepAlive(keepAlive);
expectedParams.put("keep_alive", keepAlive.getStringRep());
}
if (randomBoolean()) {
TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test");
submitRequest.setWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", waitForCompletion.getStringRep());
}
Request request = AsyncSearchRequestConverters.submitAsyncSearch(submitRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
if (Strings.hasLength(index)) {
endpoint.add(index);
}
endpoint.add("_async_search");
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
RequestConvertersTests.assertToXContentBody(searchSourceBuilder, request.getEntity());
}
private static void setRandomSearchParams(SubmitAsyncSearchRequest request, Map<String, String> expectedParams) {
expectedParams.put(RestSearchAction.TYPED_KEYS_PARAM, "true");
if (randomBoolean()) {
request.setRouting(randomAlphaOfLengthBetween(3, 10));
expectedParams.put("routing", request.getRouting());
}
if (randomBoolean()) {
request.setPreference(randomAlphaOfLengthBetween(3, 10));
expectedParams.put("preference", request.getPreference());
}
if (randomBoolean()) {
request.setSearchType(randomFrom(SearchType.CURRENTLY_SUPPORTED));
}
expectedParams.put("search_type", request.getSearchType().name().toLowerCase(Locale.ROOT));
if (randomBoolean()) {
request.setAllowPartialSearchResults(randomBoolean());
expectedParams.put("allow_partial_search_results", Boolean.toString(request.getAllowPartialSearchResults()));
}
if (randomBoolean()) {
request.setRequestCache(randomBoolean());
expectedParams.put("request_cache", Boolean.toString(request.getRequestCache()));
}
if (randomBoolean()) {
request.setBatchedReduceSize(randomIntBetween(2, Integer.MAX_VALUE));
}
expectedParams.put("batched_reduce_size", Integer.toString(request.getBatchedReduceSize()));
if (randomBoolean()) {
request.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE));
}
expectedParams.put("max_concurrent_shard_requests", Integer.toString(request.getMaxConcurrentShardRequests()));
}
}

View File

@ -586,8 +586,9 @@ public class RequestConvertersTests extends ESTestCase {
Request request = RequestConverters.updateByQuery(updateByQueryRequest);
StringJoiner joiner = new StringJoiner("/", "/", "");
joiner.add(String.join(",", updateByQueryRequest.indices()));
if (updateByQueryRequest.getDocTypes().length > 0)
if (updateByQueryRequest.getDocTypes().length > 0) {
joiner.add(String.join(",", updateByQueryRequest.getDocTypes()));
}
joiner.add("_update_by_query");
assertEquals(joiner.toString(), request.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
@ -656,8 +657,9 @@ public class RequestConvertersTests extends ESTestCase {
Request request = RequestConverters.deleteByQuery(deleteByQueryRequest);
StringJoiner joiner = new StringJoiner("/", "/", "");
joiner.add(String.join(",", deleteByQueryRequest.indices()));
if (deleteByQueryRequest.getDocTypes().length > 0)
if (deleteByQueryRequest.getDocTypes().length > 0) {
joiner.add(String.join(",", deleteByQueryRequest.getDocTypes()));
}
joiner.add("_delete_by_query");
assertEquals(joiner.toString(), request.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
@ -1160,6 +1162,27 @@ public class RequestConvertersTests extends ESTestCase {
public void testSearch() throws Exception {
String searchEndpoint = randomFrom("_" + randomAlphaOfLength(5));
String[] indices = randomIndicesNames(0, 5);
Map<String, String> expectedParams = new HashMap<>();
SearchRequest searchRequest = createTestSearchRequest(indices, expectedParams);
Request request = RequestConverters.search(searchRequest, searchEndpoint);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
if (Strings.hasLength(index)) {
endpoint.add(index);
}
String type = String.join(",", searchRequest.types());
if (Strings.hasLength(type)) {
endpoint.add(type);
}
endpoint.add(searchEndpoint);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
assertToXContentBody(searchRequest.source(), request.getEntity());
}
public static SearchRequest createTestSearchRequest(String[] indices, Map<String, String> expectedParams) {
SearchRequest searchRequest = new SearchRequest(indices);
int numTypes = randomIntBetween(0, 5);
@ -1169,10 +1192,15 @@ public class RequestConvertersTests extends ESTestCase {
}
searchRequest.types(types);
Map<String, String> expectedParams = new HashMap<>();
setRandomSearchParams(searchRequest, expectedParams);
setRandomIndicesOptions(searchRequest::indicesOptions, searchRequest::indicesOptions, expectedParams);
SearchSourceBuilder searchSourceBuilder = createTestSearchSourceBuilder();
searchRequest.source(searchSourceBuilder);
return searchRequest;
}
public static SearchSourceBuilder createTestSearchSourceBuilder() {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// rarely skip setting the search source completely
if (frequently()) {
@ -1216,25 +1244,10 @@ public class RequestConvertersTests extends ESTestCase {
searchSourceBuilder.collapse(new CollapseBuilder(randomAlphaOfLengthBetween(3, 10)));
}
}
searchRequest.source(searchSourceBuilder);
}
return searchSourceBuilder;
}
Request request = RequestConverters.search(searchRequest, searchEndpoint);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
if (Strings.hasLength(index)) {
endpoint.add(index);
}
String type = String.join(",", types);
if (Strings.hasLength(type)) {
endpoint.add(type);
}
endpoint.add(searchEndpoint);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
assertToXContentBody(searchSourceBuilder, request.getEntity());
}
public void testSearchNullIndicesAndTypes() {
expectThrows(NullPointerException.class, () -> new SearchRequest((String[]) null));
@ -2033,7 +2046,7 @@ public class RequestConvertersTests extends ESTestCase {
expectedParams.put("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
static void setRandomIndicesOptions(Consumer<IndicesOptions> setter, Supplier<IndicesOptions> getter,
public static void setRandomIndicesOptions(Consumer<IndicesOptions> setter, Supplier<IndicesOptions> getter,
Map<String, String> expectedParams) {
if (randomBoolean()) {

View File

@ -895,6 +895,7 @@ public class RestHighLevelClientTests extends ESTestCase {
apiName.startsWith("eql.") == false &&
apiName.endsWith("freeze") == false &&
apiName.endsWith("reload_analyzers") == false &&
apiName.startsWith("async_search") == false &&
// IndicesClientIT.getIndexTemplate should be renamed "getTemplate" in version 8.0 when we
// can get rid of 7.0's deprecated "getTemplate"
apiName.equals("indices.get_index_template") == false) {

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class AsyncSearchIT extends ESRestHighLevelClientTestCase {
public void testSubmitAsyncSearchRequest() throws IOException {
String index = "test-index";
createIndex(index, Settings.EMPTY);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(sourceBuilder, index);
// 15 sec should be enough to make sure we always complete right away
request.setWaitForCompletion(new TimeValue(15, TimeUnit.SECONDS));
AsyncSearchResponse response = highLevelClient().asyncSearch().submitAsyncSearch(request, RequestOptions.DEFAULT);
assertTrue(response.getVersion() >= 0);
assertFalse(response.isPartial());
assertTrue(response.getStartTime() > 0);
assertTrue(response.getExpirationTime() > 0);
assertNotNull(response.getSearchResponse());
if (response.isRunning() == false) {
assertNull(response.getId());
assertFalse(response.isPartial());
} else {
assertTrue(response.isPartial());
assertNotNull(response.getId());
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.internal.InternalSearchResponse;
import java.io.IOException;
import static org.hamcrest.Matchers.containsString;
public class AsyncSearchResponseTests
extends AbstractResponseTestCase<org.elasticsearch.xpack.core.search.action.AsyncSearchResponse, AsyncSearchResponse> {
@Override
protected org.elasticsearch.xpack.core.search.action.AsyncSearchResponse createServerTestInstance(XContentType xContentType) {
int version = randomIntBetween(0, Integer.MAX_VALUE);
boolean isPartial = randomBoolean();
boolean isRunning = randomBoolean();
long startTimeMillis = randomLongBetween(0, Long.MAX_VALUE);
long expirationTimeMillis = randomLongBetween(0, Long.MAX_VALUE);
String id = randomBoolean() ? null : randomAlphaOfLength(10);
ElasticsearchException error = randomBoolean() ? null : new ElasticsearchException(randomAlphaOfLength(10));
// add search response, minimal object is okay since the full randomization of parsing is tested in SearchResponseTests
SearchResponse searchResponse = randomBoolean() ? null
: new SearchResponse(InternalSearchResponse.empty(), randomAlphaOfLength(10), 1, 1, 0, randomIntBetween(0, 10000),
ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY);
org.elasticsearch.xpack.core.search.action.AsyncSearchResponse testResponse =
new org.elasticsearch.xpack.core.search.action.AsyncSearchResponse(id, version, searchResponse, error, isPartial, isRunning,
startTimeMillis, expirationTimeMillis);
return testResponse;
}
@Override
protected AsyncSearchResponse doParseToClientInstance(XContentParser parser) throws IOException {
return AsyncSearchResponse.fromXContent(parser);
}
@Override
protected void assertInstances(org.elasticsearch.xpack.core.search.action.AsyncSearchResponse expected, AsyncSearchResponse parsed) {
assertNotSame(parsed, expected);
assertEquals(expected.getId(), parsed.getId());
assertEquals(expected.getVersion(), parsed.getVersion());
assertEquals(expected.isRunning(), parsed.isRunning());
assertEquals(expected.isPartial(), parsed.isPartial());
assertEquals(expected.getStartTime(), parsed.getStartTime());
assertEquals(expected.getExpirationTime(), parsed.getExpirationTime());
// we cannot directly compare error since Exceptions are wrapped differently on parsing, but we can check original message
if (expected.getFailure() != null) {
assertThat(parsed.getFailure().getMessage(), containsString(expected.getFailure().getMessage()));
} else {
assertNull(parsed.getFailure());
}
// we don't need to check the complete parsed search response since this is done elsewhere
// only spot-check some randomized properties for equality here
if (expected.getSearchResponse() != null) {
assertEquals(expected.getSearchResponse().getTook(), parsed.getSearchResponse().getTook());
assertEquals(expected.getSearchResponse().getScrollId(), parsed.getSearchResponse().getScrollId());
} else {
assertNull(parsed.getSearchResponse());
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.asyncsearch;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.test.ESTestCase;
import java.util.Optional;
public class SubmitAsyncSearchRequestTests extends ESTestCase {
public void testValidation() {
{
SearchSourceBuilder source = new SearchSourceBuilder();
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, "test");
Optional<ValidationException> validation = request.validate();
assertFalse(validation.isPresent());
}
{
SearchSourceBuilder source = new SearchSourceBuilder().suggest(new SuggestBuilder());
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, "test");
Optional<ValidationException> validation = request.validate();
assertTrue(validation.isPresent());
assertEquals(1, validation.get().validationErrors().size());
assertEquals("suggest-only queries are not supported", validation.get().validationErrors().get(0));
}
{
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(new SearchSourceBuilder(), "test");
request.setKeepAlive(new TimeValue(1));
Optional<ValidationException> validation = request.validate();
assertTrue(validation.isPresent());
assertEquals(1, validation.get().validationErrors().size());
assertEquals("[keep_alive] must be greater than 1 minute, got: 1ms", validation.get().validationErrors().get(0));
}
}
}

View File

@ -265,7 +265,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
return innerFromXContent(parser);
}
static SearchResponse innerFromXContent(XContentParser parser) throws IOException {
public static SearchResponse innerFromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation);
String currentFieldName = parser.currentName();
SearchHits hits = null;

View File

@ -26,7 +26,9 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont
@Nullable
private final String id;
private final int version;
@Nullable
private final SearchResponse searchResponse;
@Nullable
private final ElasticsearchException error;
private final boolean isRunning;
private final boolean isPartial;