Cumulative backport of async search changes (#53635)

* Submit async search to work only with POST (#53368)

Currently the submit async search API can be called using both GET and POST at REST, but given that it submits a call and creates internal state, POST should be the only allowed method.

* Refine SearchProgressListener internal API (#53373)

The following cumulative improvements have been made:
- rename `onReduce` and `notifyReduce` to `onFinalReduce` and `notifyFinalReduce`
- add unit test for `SearchShard`
- on* methods in `SearchProgressListener` shouldn't need to be public as they should never be called directly, they only need to be overridden hence they can be made protected. They are actually called directly from a test which required some adapting, like making `AsyncSearchTask.Listener` class package private instead of private
- Instead of overriding `getProgressListener` in `AsyncSearchTask`, as it feels weird to override a getter method, added a specific method that allows to retrieve the Listener directly without needing to cast it. Made the getter and setter for the listener final in the base class.
- rename `SearchProgressListener#searchShards` methods to `buildSearchShards` and make it static given that it accesses no instance members
- make `SearchShard` and `SearchShardTask` classes final

* Move async search yaml tests to x-pack yaml test folder (#53537)

The yaml tests for async search currently sit in its qa folder. There is no reason though for them to live in a separate folder as they don't require particular setup. This commit moves them to the main folder together with the other x-pack yaml tests so that they will be run by the client test runners too.

* [DOCS] Add temporary redirect for async-search (#53454)

The following API spec files contain a link to a not-yet-created
async search docs page:

* [async_search.delete.json][0]
* [async_search.get.json][1]
* [async_search.submit.json][2]

The Elaticsearch-js client uses these spec files to create their docs.
This created a broken link in the Elaticsearch-js docs, which has broken
the docs build.

This PR adds a temporary redirect for the docs page. This redirect
should be removed when the actual API docs are added.

[0]: https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.delete.json
[1]: https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.get.json
[2]: https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.submit.json

Co-authored-by: James Rodewig <james.rodewig@elastic.co>
This commit is contained in:
Luca Cavanna 2020-03-17 00:08:17 +01:00 committed by GitHub
parent 9845dbb7d6
commit c3d2417448
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 151 additions and 182 deletions

View File

@ -602,3 +602,8 @@ See <<slm-api-stop>>.
=== How {ccs} works
See <<ccs-gateway-seed-nodes>> and <<ccs-min-roundtrips>>.
[role="exclude",id="async-search"]
=== Asynchronous search
coming::[7.x]

View File

@ -54,8 +54,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
this.searchPhaseController = searchPhaseController;
SearchProgressListener progressListener = task.getProgressListener();
SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
progressListener.notifyListShards(SearchProgressListener.buildSearchShards(this.shardsIts),
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}
@Override

View File

@ -679,7 +679,7 @@ public final class SearchPhaseController {
numReducePhases++;
index = 1;
if (hasAggs || hasTopDocs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards),
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
}
}
@ -710,7 +710,7 @@ public final class SearchPhaseController {
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(),
getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false,
aggReduceContextBuilder, performFinalReduce);
progressListener.notifyReduce(progressListener.searchShards(results.asList()),
progressListener.notifyFinalReduce(SearchProgressListener.buildSearchShards(results.asList()),
reducePhase.totalHits, reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}
@ -767,8 +767,8 @@ public final class SearchPhaseController {
List<SearchPhaseResult> resultList = results.asList();
final ReducedQueryPhase reducePhase =
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, aggReduceContextBuilder, request.isFinalReduce());
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits,
reducePhase.aggregations, reducePhase.numReducePhases);
listener.notifyFinalReduce(SearchProgressListener.buildSearchShards(resultList),
reducePhase.totalHits, reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}
};

View File

@ -54,14 +54,14 @@ abstract class SearchProgressListener {
* @param clusters The statistics for remote clusters included in the search.
* @param fetchPhase <code>true</code> if the search needs a fetch phase, <code>false</code> otherwise.
**/
public void onListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {}
protected void onListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {}
/**
* Executed when a shard returns a query result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards} )}.
*/
public void onQueryResult(int shardIndex) {}
protected void onQueryResult(int shardIndex) {}
/**
* Executed when a shard reports a query failure.
@ -70,7 +70,7 @@ abstract class SearchProgressListener {
* @param shardTarget The last shard target that thrown an exception.
* @param exc The cause of the failure.
*/
public void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}
protected void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}
/**
* Executed when a partial reduce is created. The number of partial reduce can be controlled via
@ -81,7 +81,7 @@ abstract class SearchProgressListener {
* @param aggs The partial result for aggregations.
* @param reducePhase The version number for this reduce.
*/
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
protected void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
/**
* Executed once when the final reduce is created.
@ -91,14 +91,14 @@ abstract class SearchProgressListener {
* @param aggs The final result for aggregations.
* @param reducePhase The version number for this reduce.
*/
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
protected void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}
/**
* Executed when a shard returns a fetch result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
*/
public void onFetchResult(int shardIndex) {}
protected void onFetchResult(int shardIndex) {}
/**
* Executed when a shard reports a fetch failure.
@ -106,7 +106,7 @@ abstract class SearchProgressListener {
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param exc The cause of the failure.
*/
public void onFetchFailure(int shardIndex, Exception exc) {}
protected void onFetchFailure(int shardIndex, Exception exc) {}
final void notifyListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {
this.shards = shards;
@ -143,9 +143,9 @@ abstract class SearchProgressListener {
}
}
final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
protected final void notifyFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onReduce(shards, totalHits, aggs, reducePhase);
onFinalReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce"), e);
}
@ -169,7 +169,7 @@ abstract class SearchProgressListener {
}
}
final List<SearchShard> searchShards(List<? extends SearchPhaseResult> results) {
static List<SearchShard> buildSearchShards(List<? extends SearchPhaseResult> results) {
List<SearchShard> lst = results.stream()
.filter(Objects::nonNull)
.map(SearchPhaseResult::getSearchShardTarget)
@ -178,7 +178,7 @@ abstract class SearchProgressListener {
return Collections.unmodifiableList(lst);
}
final List<SearchShard> searchShards(SearchShardTarget[] results) {
static List<SearchShard> buildSearchShards(SearchShardTarget[] results) {
List<SearchShard> lst = Arrays.stream(results)
.filter(Objects::nonNull)
.map(e -> new SearchShard(e.getClusterAlias(), e.getShardId()))
@ -186,7 +186,7 @@ abstract class SearchProgressListener {
return Collections.unmodifiableList(lst);
}
final List<SearchShard> searchShards(GroupShardsIterator<SearchShardIterator> its) {
static List<SearchShard> buildSearchShards(GroupShardsIterator<SearchShardIterator> its) {
List<SearchShard> lst = StreamSupport.stream(its.spliterator(), false)
.map(e -> new SearchShard(e.getClusterAlias(), e.shardId()))
.collect(Collectors.toList());

View File

@ -57,8 +57,8 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
this.progressListener = task.getProgressListener();
final SearchProgressListener progressListener = task.getProgressListener();
final SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
progressListener.notifyListShards(SearchProgressListener.buildSearchShards(this.shardsIts),
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,

View File

@ -29,7 +29,7 @@ import java.util.Objects;
* A class that encapsulates the {@link ShardId} and the cluster alias
* of a shard used during the search action.
*/
public class SearchShard implements Comparable<SearchShard> {
public final class SearchShard implements Comparable<SearchShard> {
@Nullable
private final String clusterAlias;
private final ShardId shardId;
@ -40,8 +40,7 @@ public class SearchShard implements Comparable<SearchShard> {
}
/**
* Return the cluster alias if the shard is on a remote cluster and <code>null</code>
* otherwise (local).
* Return the cluster alias if we are executing a cross cluster search request, <code>null</code> otherwise.
*/
@Nullable
public String getClusterAlias() {
@ -51,7 +50,6 @@ public class SearchShard implements Comparable<SearchShard> {
/**
* Return the {@link ShardId} of this shard.
*/
@Nullable
public ShardId getShardId() {
return shardId;
}

View File

@ -40,5 +40,4 @@ public class SearchShardTask extends CancellableTask {
public boolean shouldCancelChildrenOnCancellation() {
return false;
}
}

View File

@ -37,14 +37,14 @@ public class SearchTask extends CancellableTask {
/**
* Attach a {@link SearchProgressListener} to this task.
*/
public void setProgressListener(SearchProgressListener progressListener) {
public final void setProgressListener(SearchProgressListener progressListener) {
this.progressListener = progressListener;
}
/**
* Return the {@link SearchProgressListener} attached to this task.
*/
public SearchProgressListener getProgressListener() {
public final SearchProgressListener getProgressListener() {
return progressListener;
}

View File

@ -847,7 +847,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
}
@Override
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
totalHitsListener.set(totalHits);
finalAggsListener.set(aggs);
numReduceListener.incrementAndGet();

View File

@ -178,7 +178,7 @@ public class SearchProgressActionListenerIT extends ESSingleNodeTestCase {
}
@Override
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
numReduces.incrementAndGet();
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SearchShardTests extends ESTestCase {
public void testEqualsAndHashcode() {
String index = randomAlphaOfLengthBetween(5, 10);
SearchShard searchShard = new SearchShard(randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10),
new ShardId(index, index + "-uuid", randomIntBetween(0, 1024)));
EqualsHashCodeTestUtils.checkEqualsAndHashCode(searchShard,
s -> new SearchShard(s.getClusterAlias(), s.getShardId()),
s -> {
if (randomBoolean()) {
return new SearchShard(s.getClusterAlias() == null ? randomAlphaOfLengthBetween(3, 10) : null, s.getShardId());
} else {
String indexName = s.getShardId().getIndexName();
int shardId = s.getShardId().getId();
if (randomBoolean()) {
indexName += randomAlphaOfLength(5);
} else {
shardId += randomIntBetween(1, 1024);
}
return new SearchShard(s.getClusterAlias(), new ShardId(indexName, indexName + "-uuid", shardId));
}
});
}
public void testCompareTo() {
List<SearchShard> searchShards = new ArrayList<>();
Index index0 = new Index("index0", "index0-uuid");
Index index1 = new Index("index1", "index1-uuid");
searchShards.add(new SearchShard(null, new ShardId(index0, 0)));
searchShards.add(new SearchShard(null, new ShardId(index1, 0)));
searchShards.add(new SearchShard(null, new ShardId(index0, 1)));
searchShards.add(new SearchShard(null, new ShardId(index1, 1)));
searchShards.add(new SearchShard(null, new ShardId(index0, 2)));
searchShards.add(new SearchShard(null, new ShardId(index1, 2)));
searchShards.add(new SearchShard("", new ShardId(index0, 0)));
searchShards.add(new SearchShard("", new ShardId(index1, 0)));
searchShards.add(new SearchShard("", new ShardId(index0, 1)));
searchShards.add(new SearchShard("", new ShardId(index1, 1)));
searchShards.add(new SearchShard("remote0", new ShardId(index0, 0)));
searchShards.add(new SearchShard("remote0", new ShardId(index1, 0)));
searchShards.add(new SearchShard("remote0", new ShardId(index0, 1)));
searchShards.add(new SearchShard("remote0", new ShardId(index0, 2)));
searchShards.add(new SearchShard("remote1", new ShardId(index0, 0)));
searchShards.add(new SearchShard("remote1", new ShardId(index1, 0)));
searchShards.add(new SearchShard("remote1", new ShardId(index0, 1)));
searchShards.add(new SearchShard("remote1", new ShardId(index1, 1)));
List<SearchShard> sorted = new ArrayList<>(searchShards);
Collections.sort(sorted);
assertEquals(searchShards, sorted);
}
}

View File

@ -1,30 +0,0 @@
import org.elasticsearch.gradle.test.RestIntegTestTask
apply plugin: 'elasticsearch.testclusters'
apply plugin: 'elasticsearch.standalone-test'
restResources {
restApi {
includeXpack 'async_search'
}
}
dependencies {
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('async-search'), configuration: 'runtime')
}
task restTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}
testClusters.restTest {
testDistribution = 'DEFAULT'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.security.enabled', 'true'
user username: 'async-search-user', password: 'async-search-password'
}
check.dependsOn restTest
test.enabled = false

View File

@ -1,35 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
public class AsyncSearchRestIT extends ESClientYamlSuiteTestCase {
public AsyncSearchRestIT(final ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
}
@Override
protected Settings restClientSettings() {
final String userAuthHeaderValue = basicAuthHeaderValue("async-search-user",
new SecureString("async-search-password".toCharArray()));
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", userAuthHeaderValue).build();
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.Before;
import java.io.IOException;
@ -31,7 +32,7 @@ import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class AsyncSearchSecurityIT extends AsyncSearchRestTestCase {
public class AsyncSearchSecurityIT extends ESRestTestCase {
/**
* All tests run as a superuser but use <code>es-security-runas-user</code> to become a less privileged user.
*/
@ -44,7 +45,7 @@ public class AsyncSearchSecurityIT extends AsyncSearchRestTestCase {
}
@Before
private void indexDocuments() throws IOException {
public void indexDocuments() throws IOException {
createIndex("index", Settings.EMPTY);
index("index", "0", "foo", "bar");
refresh("index");
@ -122,12 +123,8 @@ public class AsyncSearchSecurityIT extends AsyncSearchRestTestCase {
return client().performRequest(request);
}
static Response submitAsyncSearch(String indexName, String query, String user) throws IOException {
return submitAsyncSearch(indexName, query, TimeValue.MINUS_ONE, user);
}
static Response submitAsyncSearch(String indexName, String query, TimeValue waitForCompletion, String user) throws IOException {
final Request request = new Request("GET", indexName + "/_async_search");
final Request request = new Request("POST", indexName + "/_async_search");
setRunAsHeader(request, user);
request.addParameter("q", query);
request.addParameter("wait_for_completion", waitForCompletion.toString());
@ -136,13 +133,6 @@ public class AsyncSearchSecurityIT extends AsyncSearchRestTestCase {
return client().performRequest(request);
}
static Response search(String indexName, String query, String user) throws IOException {
final Request request = new Request("GET", indexName + "/_search");
setRunAsHeader(request, user);
request.addParameter("q", query);
return client().performRequest(request);
}
static Response getAsyncSearch(String id, String user) throws IOException {
final Request request = new Request("GET", "/_async_search/" + id);
setRunAsHeader(request, user);

View File

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;
import org.elasticsearch.test.rest.ESRestTestCase;
public class AsyncSearchRestTestCase extends ESRestTestCase {
@Override
protected boolean preserveClusterUponCompletion() {
return true;
}
}

View File

@ -41,7 +41,7 @@ import java.util.function.Supplier;
/**
* Task that tracks the progress of a currently running {@link SearchRequest}.
*/
class AsyncSearchTask extends SearchTask {
final class AsyncSearchTask extends SearchTask {
private final AsyncSearchId searchId;
private final Client client;
private final ThreadPool threadPool;
@ -111,8 +111,7 @@ class AsyncSearchTask extends SearchTask {
return searchId;
}
@Override
public SearchProgressActionListener getProgressListener() {
Listener getSearchProgressActionListener() {
return progressListener;
}
@ -193,7 +192,7 @@ class AsyncSearchTask extends SearchTask {
if (hasCompleted) {
executeImmediately = true;
} else {
completionListeners.put(completionId++, resp -> listener.accept(resp));
completionListeners.put(completionId++, listener::accept);
}
}
if (executeImmediately) {
@ -300,31 +299,31 @@ class AsyncSearchTask extends SearchTask {
}
}
private class Listener extends SearchProgressActionListener {
class Listener extends SearchProgressActionListener {
@Override
public void onQueryResult(int shardIndex) {
protected void onQueryResult(int shardIndex) {
checkExpiration();
}
@Override
public void onFetchResult(int shardIndex) {
protected void onFetchResult(int shardIndex) {
checkExpiration();
}
@Override
public void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
protected void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
// best effort to cancel expired tasks
checkExpiration();
searchResponse.get().addShardFailure(shardIndex, new ShardSearchFailure(exc, shardTarget));
}
@Override
public void onFetchFailure(int shardIndex, Exception exc) {
protected void onFetchFailure(int shardIndex, Exception exc) {
checkExpiration();
}
@Override
public void onListShards(List<SearchShard> shards, List<SearchShard> skipped, Clusters clusters, boolean fetchPhase) {
protected void onListShards(List<SearchShard> shards, List<SearchShard> skipped, Clusters clusters, boolean fetchPhase) {
// best effort to cancel expired tasks
checkExpiration();
searchResponse.compareAndSet(null,
@ -342,7 +341,7 @@ class AsyncSearchTask extends SearchTask {
}
@Override
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
// best effort to cancel expired tasks
checkExpiration();
searchResponse.get().updatePartialResponse(shards.size(),

View File

@ -16,13 +16,12 @@ import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.IntConsumer;
import java.util.List;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.action.search.RestSearchAction.parseSearchRequest;
@ -34,9 +33,7 @@ public final class RestSubmitAsyncSearchAction extends BaseRestHandler {
public List<Route> routes() {
return unmodifiableList(asList(
new Route(POST, "/_async_search"),
new Route(GET, "/_async_search"),
new Route(POST, "/{index}/_async_search"),
new Route(GET, "/{index}/_async_search")));
new Route(POST, "/{index}/_async_search")));
}
@Override

View File

@ -68,7 +68,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
CancellableTask submitTask = (CancellableTask) task;
final SearchRequest searchRequest = createSearchRequest(request, submitTask.getId(), request.getKeepAlive());
AsyncSearchTask searchTask = (AsyncSearchTask) taskManager.register("transport", SearchAction.INSTANCE.name(), searchRequest);
searchAction.execute(searchTask, searchRequest, searchTask.getProgressListener());
searchAction.execute(searchTask, searchRequest, searchTask.getSearchProgressActionListener());
searchTask.addCompletionListener(
new ActionListener<AsyncSearchResponse>() {
@Override

View File

@ -38,7 +38,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class AsyncSearchActionTests extends AsyncSearchIntegTestCase {
private String indexName;
private int numShards;
private int numDocs;
private int numKeywords;
private Map<String, AtomicInteger> keywordFreqs;
@ -140,7 +139,7 @@ public class AsyncSearchActionTests extends AsyncSearchIntegTestCase {
StringTerms terms = response.getSearchResponse().getAggregations().get("terms");
assertThat(terms.getBuckets().size(), greaterThanOrEqualTo(0));
assertThat(terms.getBuckets().size(), lessThanOrEqualTo(numKeywords));
for (InternalTerms.Bucket bucket : terms.getBuckets()) {
for (InternalTerms.Bucket<?> bucket : terms.getBuckets()) {
long count = keywordFreqs.getOrDefault(bucket.getKeyAsString(), new AtomicInteger(0)).get();
assertThat(bucket.getDocCount(), lessThanOrEqualTo(count));
}
@ -155,7 +154,7 @@ public class AsyncSearchActionTests extends AsyncSearchIntegTestCase {
StringTerms terms = response.getSearchResponse().getAggregations().get("terms");
assertThat(terms.getBuckets().size(), greaterThanOrEqualTo(0));
assertThat(terms.getBuckets().size(), lessThanOrEqualTo(numKeywords));
for (InternalTerms.Bucket bucket : terms.getBuckets()) {
for (InternalTerms.Bucket<?> bucket : terms.getBuckets()) {
long count = keywordFreqs.getOrDefault(bucket.getKeyAsString(), new AtomicInteger(0)).get();
if (numFailures > 0) {
assertThat(bucket.getDocCount(), lessThanOrEqualTo(count));
@ -236,14 +235,14 @@ public class AsyncSearchActionTests extends AsyncSearchIntegTestCase {
}
public void testNoIndex() throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(new String[] { "invalid-*" });
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("invalid-*");
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
AsyncSearchResponse response = submitAsyncSearch(request);
assertNotNull(response.getSearchResponse());
assertFalse(response.isRunning());
assertThat(response.getSearchResponse().getTotalShards(), equalTo(0));
request = new SubmitAsyncSearchRequest(new String[] { "invalid" });
request = new SubmitAsyncSearchRequest("invalid");
request.setWaitForCompletion(TimeValue.timeValueMillis(1));
response = submitAsyncSearch(request);
assertNull(response.getSearchResponse());

View File

@ -165,7 +165,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
.collect(
Collectors.toMap(
Function.identity(),
id -> new ShardIdLatch(id, new CountDownLatch(1), failures.decrementAndGet() >= 0 ? true : false)
id -> new ShardIdLatch(id, new CountDownLatch(1), failures.decrementAndGet() >= 0)
)
);
ShardIdLatch[] shardLatchArray = shardLatchMap.values().stream()
@ -186,7 +186,6 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
private AsyncSearchResponse response = initial;
private int shardIndex = 0;
private boolean isFirst = true;
private int shardFailures = 0;
@Override
public boolean hasNext() {
@ -213,8 +212,6 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
while (index < step && shardIndex < shardLatchArray.length) {
if (shardLatchArray[shardIndex].shouldFail == false) {
++index;
} else {
++shardFailures;
}
shardLatchArray[shardIndex++].countDown();
}
@ -225,13 +222,13 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
if (newResponse.isRunning()) {
assertThat(newResponse.status(), equalTo(RestStatus.OK));
assertTrue(newResponse.isPartial());
assertFalse(newResponse.getFailure() != null);
assertNull(newResponse.getFailure());
assertNotNull(newResponse.getSearchResponse());
assertThat(newResponse.getSearchResponse().getTotalShards(), equalTo(shardLatchArray.length));
assertThat(newResponse.getSearchResponse().getShardFailures().length, lessThanOrEqualTo(numFailures));
} else if (numFailures == shardLatchArray.length) {
assertThat(newResponse.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
assertTrue(newResponse.getFailure() != null);
assertNotNull(newResponse.getFailure());
assertTrue(newResponse.isPartial());
assertNotNull(newResponse.getSearchResponse());
assertThat(newResponse.getSearchResponse().getTotalShards(), equalTo(shardLatchArray.length));
@ -312,7 +309,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {}
protected void doWriteTo(StreamOutput out) {}
@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {

View File

@ -60,7 +60,6 @@ public class AsyncSearchTaskTests extends ESTestCase {
skippedShards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}
List<Thread> threads = new ArrayList<>();
int numThreads = randomIntBetween(1, 10);
CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
@ -79,11 +78,10 @@ public class AsyncSearchTaskTests extends ESTestCase {
}
}, TimeValue.timeValueMillis(1)));
threads.add(thread);
thread.start();
}
assertFalse(latch.await(numThreads*2, TimeUnit.MILLISECONDS));
task.getProgressListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
latch.await();
}
@ -91,18 +89,6 @@ public class AsyncSearchTaskTests extends ESTestCase {
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncSearchId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
int numShards = randomIntBetween(0, 10);
List<SearchShard> shards = new ArrayList<>();
for (int i = 0; i < numShards; i++) {
shards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}
List<SearchShard> skippedShards = new ArrayList<>();
int numSkippedShards = randomIntBetween(0, 10);
for (int i = 0; i < numSkippedShards; i++) {
skippedShards.add(new SearchShard(null, new ShardId("0", "0", 1)));
}
List<Thread> threads = new ArrayList<>();
int numThreads = randomIntBetween(1, 10);
CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
@ -120,11 +106,10 @@ public class AsyncSearchTaskTests extends ESTestCase {
throw new AssertionError(e);
}
}, TimeValue.timeValueMillis(1)));
threads.add(thread);
thread.start();
}
assertFalse(latch.await(numThreads*2, TimeUnit.MILLISECONDS));
task.getProgressListener().onFailure(new Exception("boom"));
task.getSearchProgressActionListener().onFailure(new Exception("boom"));
latch.await();
}
@ -144,22 +129,23 @@ public class AsyncSearchTaskTests extends ESTestCase {
}
int numShardFailures = 0;
task.getProgressListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
for (int i = 0; i < numShards; i++) {
task.getProgressListener().onPartialReduce(shards.subList(i, i+1),
task.getSearchProgressActionListener().onPartialReduce(shards.subList(i, i+1),
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
assertCompletionListeners(task, numShards+numSkippedShards, numSkippedShards, numShardFailures, true);
}
task.getProgressListener().onReduce(shards,
task.getSearchProgressActionListener().onFinalReduce(shards,
new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), null, 0);
assertCompletionListeners(task, numShards+numSkippedShards, numSkippedShards, numShardFailures, true);
task.getProgressListener().onResponse(newSearchResponse(numShards+numSkippedShards, numShards, numSkippedShards));
((AsyncSearchTask.Listener)task.getProgressListener()).onResponse(
newSearchResponse(numShards+numSkippedShards, numShards, numSkippedShards));
assertCompletionListeners(task, numShards+numSkippedShards,
numSkippedShards, numShardFailures, false);
threadPool.shutdownNow();
}
private SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards) {
private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards) {
InternalSearchResponse response = new InternalSearchResponse(SearchHits.empty(),
InternalAggregations.EMPTY, null, null, false, null, 1);
return new SearchResponse(response, null, totalShards, successfulShards, skippedShards,
@ -171,7 +157,6 @@ public class AsyncSearchTaskTests extends ESTestCase {
int expectedSkippedShards,
int expectedShardFailures,
boolean isPartial) throws InterruptedException {
List<Thread> threads = new ArrayList<>();
int numThreads = randomIntBetween(1, 10);
CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
@ -190,7 +175,6 @@ public class AsyncSearchTaskTests extends ESTestCase {
throw new AssertionError(e);
}
}, TimeValue.timeValueMillis(1)));
threads.add(thread);
thread.start();
}
latch.await();

View File

@ -9,14 +9,12 @@
{
"path":"/_async_search",
"methods":[
"GET",
"POST"
]
},
{
"path":"/{index}/_async_search",
"methods":[
"GET",
"POST"
],
"parts":{