Introduce ability to minimize round-trips in CCS (#37828)

With #37566 we have introduced the ability to merge multiple search responses into one. That makes it possible to expose a new way of executing cross-cluster search requests, that makes CCS much faster whenever there is network latency between the CCS coordinating node and the remote clusters. The coordinating node can now send a single search request to each remote cluster, which gets reduced by each one of them. from + size results are requested to each cluster, and the reduce phase in each cluster is non final (meaning that buckets are not pruned and pipeline aggs are not executed). The CCS coordinating node performs an additional, final reduction, which produces one search response out of the multiple responses received from the different clusters.

This new execution path will be activated by default for any CCS request unless a scroll is provided or inner hits are requested as part of field collapsing. The search API accepts now a new parameter called ccs_minimize_roundtrips that allows to opt-out of the default behaviour.

Relates to #32125
This commit is contained in:
Luca Cavanna 2019-01-31 15:12:14 +01:00 committed by GitHub
parent ae9f4df361
commit 622fb7883b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1182 additions and 354 deletions

View File

@ -399,6 +399,7 @@ final class RequestConverters {
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
if (searchRequest.requestCache() != null) {
params.putParam("request_cache", Boolean.toString(searchRequest.requestCache()));
}

View File

@ -1239,7 +1239,7 @@ public class RequestConvertersTests extends ESTestCase {
requests.add(searchRequest);
};
MultiSearchRequest.readMultiLineFormat(new BytesArray(EntityUtils.toByteArray(request.getEntity())),
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null,
REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null, null,
xContentRegistry(), true);
assertEquals(requests, multiSearchRequest.requests());
}
@ -1862,6 +1862,10 @@ public class RequestConvertersTests extends ESTestCase {
searchRequest.scroll(randomTimeValue());
expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep());
}
if (randomBoolean()) {
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
}
expectedParams.put("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}
static void setRandomIndicesOptions(Consumer<IndicesOptions> setter, Supplier<IndicesOptions> getter,

View File

@ -65,6 +65,7 @@ GET /cluster_one:twitter/_search
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 2,
"_shards": {
"total": 1,
"successful": 1,
@ -130,6 +131,7 @@ will be prefixed with their remote cluster name:
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 3,
"_shards": {
"total": 2,
"successful": 2,
@ -222,6 +224,7 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
{
"took": 150,
"timed_out": false,
"num_reduce_phases": 3,
"_shards": {
"total": 2,
"successful": 2,
@ -273,3 +276,43 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
// TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/]
<1> The `clusters` section indicates that one cluster was unavailable and got skipped
[float]
[[ccs-reduction]]
=== CCS reduction phase
Cross-cluster search requests can be executed in two ways:
- the CCS coordinating node minimizes network round-trips by sending one search
request to each cluster. Each cluster performs the search independently,
reducing and fetching results. Once the CCS node has received all the
responses, it performs another reduction and returns the relevant results back
to the user. This strategy is beneficial when there is network latency between
the CCS coordinating node and the remote clusters involved, which is typically
the case. A single request is sent to each remote cluster, at the cost of
retrieving `from` + `size` already fetched results. This is the default
strategy, used whenever possible. In case a scroll is provided, or inner hits
are requested as part of field collapsing, this strategy is not supported hence
network round-trips cannot be minimized and the following strategy is used
instead.
- the CCS coordinating node sends a <<search-shards,search shards>> request to
each remote cluster, in order to collect information about their corresponding
remote indices involved in the search request and the shards where their data
is located. Once each cluster has responded to such request, the search
executes as if all shards were part of the same cluster. The coordinating node
sends one request to each shard involved, each shard executes the query and
returns its own results which are then reduced (and fetched, depending on the
<<search-request-search-type, search type>>) by the CCS coordinating node.
This strategy may be beneficial whenever there is very low network latency
between the CCS coordinating node and the remote clusters involved, as it
treats all shards the same, at the cost of sending many requests to each remote
cluster, which is problematic in presence of network latency.
The <<search-request-body, search API>> supports the `ccs_minimize_roundtrips`
parameter, which defaults to `true` and can be set to `false` in case
minimizing network round-trips is not desirable.
Note that all the communication between the nodes, regardless of which cluster
they belong to and the selected reduce mode, happens through the
<<modules-transport,transport layer>>.

View File

@ -113,6 +113,11 @@ And here is a sample response:
reduce the memory overhead per search request if the potential number of
shards in the request can be large.
`ccs_minimize_roundtrips`::
Defaults to `true`. Set to `false` to disable minimizing network round-trips
between the coordinating node and the remote clusters when executing
cross-cluster search requests. See <<ccs-reduction>> for more.
Out of the above, the `search_type`, `request_cache` and the `allow_partial_search_results`

View File

@ -65,7 +65,6 @@ public class MultiSearchTemplateRequest extends ActionRequest implements Composi
return this;
}
/**
* Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently.
*/

View File

@ -50,7 +50,7 @@ public class MultiSearchTemplateResponseTests extends AbstractXContentTestCase<M
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse.Clusters clusters = randomClusters();
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
@ -59,7 +59,13 @@ public class MultiSearchTemplateResponseTests extends AbstractXContentTestCase<M
}
return new MultiSearchTemplateResponse(items, overallTookInMillis);
}
private static SearchResponse.Clusters randomClusters() {
int totalClusters = randomIntBetween(0, 10);
int successfulClusters = randomIntBetween(0, totalClusters);
int skippedClusters = totalClusters - successfulClusters;
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
}
private static MultiSearchTemplateResponse createTestInstanceWithFailures() {
int numItems = randomIntBetween(0, 128);
@ -67,14 +73,13 @@ public class MultiSearchTemplateResponseTests extends AbstractXContentTestCase<M
MultiSearchTemplateResponse.Item[] items = new MultiSearchTemplateResponse.Item[numItems];
for (int i = 0; i < numItems; i++) {
if (randomBoolean()) {
// Creating a minimal response is OK, because SearchResponse self
// is tested elsewhere.
// Creating a minimal response is OK, because SearchResponse is tested elsewhere.
long tookInMillis = randomNonNegativeLong();
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse.Clusters clusters = randomClusters();
SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse();
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
@ -133,6 +138,5 @@ public class MultiSearchTemplateResponseTests extends AbstractXContentTestCase<M
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
getRandomFieldsExcludeFilterWhenResultHasErrors(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
}
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
@ -32,9 +33,11 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
@ -49,6 +52,8 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -107,6 +112,14 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new,
(request, channel, task) -> {
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
channel.sendResponse(searchResponse);
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();

View File

@ -36,6 +36,10 @@
terms:
field: f1.keyword
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: {num}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- gte: { hits.hits.0._seq_no: 0 }
@ -59,6 +63,9 @@
terms:
field: f1.keyword
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
@ -76,6 +83,9 @@
terms:
field: f1.keyword
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
@ -93,6 +103,7 @@
terms:
field: f1.keyword
- is_false: _clusters
- match: { _shards.total: 2 }
- match: { hits.total: 5}
- match: { hits.hits.0._index: "test_index"}
@ -122,6 +133,9 @@
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
@ -148,6 +162,9 @@
rest_total_hits_as_int: true
index: "*:test_index"
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 6 }
- match: { hits.total: 12 }
@ -159,6 +176,9 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
@ -172,6 +192,9 @@
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 4 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
@ -185,6 +208,9 @@
rest_total_hits_as_int: true
index: "my_remote_cluster:single_doc_index"
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 1 }
- match: { hits.total: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}

View File

@ -29,10 +29,12 @@
rest_total_hits_as_int: true
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
ccs_minimize_roundtrips: false
body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } }
- match: { hits.total: 1 }
- match: { hits.hits.0._index: "skip_shards_index"}
- is_false: num_reduce_phases
- match: { _shards.total: 2 }
- match: { _shards.successful: 2 }
- match: { _shards.skipped : 1}
@ -45,10 +47,12 @@
rest_total_hits_as_int: true
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
ccs_minimize_roundtrips: false
body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2015-02-01", "lt": "2016-02-01"} } } }
- match: { hits.total: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}
- is_false: num_reduce_phases
- match: { _shards.total: 2 }
- match: { _shards.successful: 2 }
- match: { _shards.skipped : 1}

View File

@ -43,6 +43,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},

View File

@ -33,6 +33,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},

View File

@ -24,6 +24,11 @@
"type" : "boolean",
"description" : "Specify whether wildcard and prefix queries should be analyzed (default: false)"
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
},
"default_operator": {
"type" : "enum",
"options" : ["AND","OR"],

View File

@ -67,6 +67,11 @@
"type" : "boolean",
"description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default" : false
},
"ccs_minimize_roundtrips": {
"type" : "boolean",
"description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution",
"default" : "true"
}
}
},

View File

@ -173,6 +173,7 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
String[] types,
String routing,
String searchType,
Boolean ccsMinimizeRoundtrips,
NamedXContentRegistry registry,
boolean allowExplicitIndex) throws IOException {
int from = 0;
@ -205,6 +206,9 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
if (searchType != null) {
searchRequest.searchType(searchType);
}
if (ccsMinimizeRoundtrips != null) {
searchRequest.setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips);
}
IndicesOptions defaultOptions = searchRequest.indicesOptions();
// now parse the action
if (nextMarker - from > 0) {
@ -226,6 +230,8 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
searchRequest.types(nodeStringArrayValue(value));
} else if ("search_type".equals(entry.getKey()) || "searchType".equals(entry.getKey())) {
searchRequest.searchType(nodeStringValue(value, null));
} else if ("ccs_minimize_roundtrips".equals(entry.getKey()) || "ccsMinimizeRoundtrips".equals(entry.getKey())) {
searchRequest.setCcsMinimizeRoundtrips(nodeBooleanValue(value));
} else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) {
searchRequest.requestCache(nodeBooleanValue(value, entry.getKey()));
} else if ("preference".equals(entry.getKey())) {
@ -327,6 +333,7 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
if (request.searchType() != null) {
xContentBuilder.field("search_type", request.searchType().name().toLowerCase(Locale.ROOT));
}
xContentBuilder.field("ccs_minimize_roundtrips", request.isCcsMinimizeRoundtrips());
if (request.requestCache() != null) {
xContentBuilder.field("request_cache", request.requestCache());
}

View File

@ -93,6 +93,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private String[] types = Strings.EMPTY_ARRAY;
private boolean ccsMinimizeRoundtrips = true;
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
@ -106,21 +108,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
* Constructs a new search request from the provided search request
*/
public SearchRequest(SearchRequest searchRequest) {
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults;
this.batchedReduceSize = searchRequest.batchedReduceSize;
this.indices = searchRequest.indices;
this.indicesOptions = searchRequest.indicesOptions;
this.maxConcurrentShardRequests = searchRequest.maxConcurrentShardRequests;
this.preference = searchRequest.preference;
this.preFilterShardSize = searchRequest.preFilterShardSize;
this.requestCache = searchRequest.requestCache;
this.routing = searchRequest.routing;
this.scroll = searchRequest.scroll;
this.searchType = searchRequest.searchType;
this.source = searchRequest.source;
this.types = searchRequest.types;
this.localClusterAlias = searchRequest.localClusterAlias;
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, searchRequest.absoluteStartMillis);
}
/**
@ -144,16 +132,40 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
}
/**
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
* Creates a new search request by providing the search request to copy all fields from, the indices to search against,
* the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time.
* Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction
* on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the
* alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters
* to ensure that the same value is used.
*/
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
static SearchRequest withLocalReduction(SearchRequest originalSearchRequest, String[] indices,
String localClusterAlias, long absoluteStartMillis) {
Objects.requireNonNull(originalSearchRequest, "search request must not be null");
validateIndices(indices);
Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
if (absoluteStartMillis < 0) {
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
}
return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis);
}
private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis) {
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults;
this.batchedReduceSize = searchRequest.batchedReduceSize;
this.ccsMinimizeRoundtrips = searchRequest.ccsMinimizeRoundtrips;
this.indices = indices;
this.indicesOptions = searchRequest.indicesOptions;
this.maxConcurrentShardRequests = searchRequest.maxConcurrentShardRequests;
this.preference = searchRequest.preference;
this.preFilterShardSize = searchRequest.preFilterShardSize;
this.requestCache = searchRequest.requestCache;
this.routing = searchRequest.routing;
this.scroll = searchRequest.scroll;
this.searchType = searchRequest.searchType;
this.source = searchRequest.source;
this.types = searchRequest.types;
this.localClusterAlias = localClusterAlias;
this.absoluteStartMillis = absoluteStartMillis;
}
@ -191,6 +203,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
localClusterAlias = null;
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ccsMinimizeRoundtrips = in.readBoolean();
}
}
@Override
@ -217,33 +232,37 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
out.writeVLong(absoluteStartMillis);
}
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(ccsMinimizeRoundtrips);
}
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
final Scroll scroll = scroll();
if (source != null
&& source.trackTotalHitsUpTo() != null
&& source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE
&& scroll != null) {
validationException =
addValidationError("disabling [track_total_hits] is not allowed in a scroll context", validationException);
}
if (source != null && source.from() > 0 && scroll != null) {
validationException =
addValidationError("using [from] is not allowed in a scroll context", validationException);
}
if (requestCache != null && requestCache && scroll != null) {
validationException =
addValidationError("[request_cache] cannot be used in a scroll context", validationException);
}
if (source != null && source.size() == 0 && scroll != null) {
validationException = addValidationError("[size] cannot be [0] in a scroll context", validationException);
}
if (source != null && source.rescores() != null && source.rescores().isEmpty() == false && scroll != null) {
validationException =
addValidationError("using [rescore] is not allowed in a scroll context", validationException);
boolean scroll = scroll() != null;
if (scroll) {
if (source != null) {
if (source.trackTotalHitsUpTo() != null && source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE) {
validationException =
addValidationError("disabling [track_total_hits] is not allowed in a scroll context", validationException);
}
if (source.from() > 0) {
validationException =
addValidationError("using [from] is not allowed in a scroll context", validationException);
}
if (source.size() == 0) {
validationException = addValidationError("[size] cannot be [0] in a scroll context", validationException);
}
if (source.rescores() != null && source.rescores().isEmpty() == false) {
validationException =
addValidationError("using [rescore] is not allowed in a scroll context", validationException);
}
}
if (requestCache != null && requestCache) {
validationException =
addValidationError("[request_cache] cannot be used in a scroll context", validationException);
}
}
return validationException;
}
@ -261,8 +280,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
* it will return {@link System#currentTimeMillis()}.
* request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided
* current time, otherwise it will return {@link System#currentTimeMillis()}.
*
*/
long getOrCreateAbsoluteStartMillis() {
@ -274,12 +293,16 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
*/
@Override
public SearchRequest indices(String... indices) {
validateIndices(indices);
this.indices = indices;
return this;
}
private static void validateIndices(String... indices) {
Objects.requireNonNull(indices, "indices must not be null");
for (String index : indices) {
Objects.requireNonNull(index, "index must not be null");
}
this.indices = indices;
return this;
}
@Override
@ -292,6 +315,21 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
return this;
}
/**
* Returns whether network round-trips should be minimized when executing cross-cluster search requests.
* Defaults to <code>true</code>.
*/
public boolean isCcsMinimizeRoundtrips() {
return ccsMinimizeRoundtrips;
}
/**
* Sets whether network round-trips should be minimized when executing cross-cluster search requests. Defaults to <code>true</code>.
*/
public void setCcsMinimizeRoundtrips(boolean ccsMinimizeRoundtrips) {
this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips;
}
/**
* The document types to execute the search against. Defaults to be executed against
* all types.
@ -583,14 +621,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
Objects.equals(localClusterAlias, that.localClusterAlias) &&
absoluteStartMillis == that.absoluteStartMillis;
absoluteStartMillis == that.absoluteStartMillis &&
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips;
}
@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults, localClusterAlias, absoluteStartMillis);
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips);
}
@Override
@ -610,6 +649,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
", allowPartialSearchResults=" + allowPartialSearchResults +
", localClusterAlias=" + localClusterAlias +
", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis +
", ccsMinimizeRoundtrips=" + ccsMinimizeRoundtrips +
", source=" + source + '}';
}
}

View File

@ -111,7 +111,6 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
return internalResponse.aggregations();
}
public Suggest getSuggest() {
return internalResponse.suggest();
}
@ -349,7 +348,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly,
profile, numReducePhases);
return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis,
failures.toArray(new ShardSearchFailure[failures.size()]), clusters);
failures.toArray(ShardSearchFailure.EMPTY_ARRAY), clusters);
}
@Override

View File

@ -39,6 +39,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.transport.RemoteClusterAware;
import java.util.ArrayList;
import java.util.Arrays;
@ -76,9 +77,9 @@ import static org.elasticsearch.action.search.SearchResponse.Clusters;
//from the remote clusters in the fetch phase. This would be identical to the removed QueryAndFetch strategy except that only the remote
//cluster response would have the fetch results.
final class SearchResponseMerger {
private final int from;
private final int size;
private final int trackTotalHitsUpTo;
final int from;
final int size;
final int trackTotalHitsUpTo;
private final SearchTimeProvider searchTimeProvider;
private final Function<Boolean, ReduceContext> reduceContextFunction;
private final List<SearchResponse> searchResponses = new CopyOnWriteArrayList<>();
@ -98,15 +99,28 @@ final class SearchResponseMerger {
* That may change in the future as it's possible to introduce incremental merges as responses come in if necessary.
*/
void add(SearchResponse searchResponse) {
assert searchResponse.getScrollId() == null : "merging scroll results is not supported";
searchResponses.add(searchResponse);
}
int numResponses() {
return searchResponses.size();
}
/**
* Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)}
* so that all responses are merged into a single one.
*/
SearchResponse getMergedResponse(Clusters clusters) {
assert searchResponses.size() > 1;
//if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
//we end up calling merge without anything to merge, we just return an empty search response
if (searchResponses.size() == 0) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
InternalAggregations.EMPTY, null, null, false, null, 0);
return new SearchResponse(internalSearchResponse, null, 0, 0, 0, searchTimeProvider.buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY, clusters);
}
int totalShards = 0;
int skippedShards = 0;
int successfulShards = 0;
@ -115,7 +129,7 @@ final class SearchResponseMerger {
List<ShardSearchFailure> failures = new ArrayList<>();
Map<String, ProfileShardResult> profileResults = new HashMap<>();
List<InternalAggregations> aggs = new ArrayList<>();
Map<ShardId, Integer> shards = new TreeMap<>();
Map<ShardIdAndClusterAlias, Integer> shards = new TreeMap<>();
List<TopDocs> topDocsList = new ArrayList<>(searchResponses.size());
Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<>();
Boolean trackTotalHits = null;
@ -171,10 +185,11 @@ final class SearchResponseMerger {
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
//make failures ordering consistent with ordinary search and CCS
Arrays.sort(shardFailures, FAILURES_COMPARATOR);
InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest,
new SearchProfileShardResults(profileResults), topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases);
InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, profileShardResults,
topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases);
long tookInMillis = searchTimeProvider.buildTookInMillis();
return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters);
}
@ -210,7 +225,7 @@ final class SearchResponseMerger {
}
};
private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map<ShardId, Integer> shards) {
private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map<ShardIdAndClusterAlias, Integer> shards) {
SearchHit[] hits = searchHits.getHits();
ScoreDoc[] scoreDocs = new ScoreDoc[hits.length];
final TopDocs topDocs;
@ -228,7 +243,8 @@ final class SearchResponseMerger {
for (int i = 0; i < hits.length; i++) {
SearchHit hit = hits[i];
ShardId shardId = hit.getShard().getShardId();
SearchShardTarget shard = hit.getShard();
ShardIdAndClusterAlias shardId = new ShardIdAndClusterAlias(shard.getShardId(), shard.getClusterAlias());
shards.putIfAbsent(shardId, null);
final SortField[] sortFields = searchHits.getSortFields();
final Object[] sortValues;
@ -246,18 +262,21 @@ final class SearchResponseMerger {
return topDocs;
}
private static void setShardIndex(Map<ShardId, Integer> shards, List<TopDocs> topDocsList) {
int shardIndex = 0;
for (Map.Entry<ShardId, Integer> shard : shards.entrySet()) {
shard.setValue(shardIndex++);
private static void setShardIndex(Map<ShardIdAndClusterAlias, Integer> shards, List<TopDocs> topDocsList) {
{
//assign a different shardIndex to each shard, based on their shardId natural ordering and their cluster alias
int shardIndex = 0;
for (Map.Entry<ShardIdAndClusterAlias, Integer> shard : shards.entrySet()) {
shard.setValue(shardIndex++);
}
}
//and go through all the scoreDocs from each cluster and set their corresponding shardIndex
//go through all the scoreDocs from each cluster and set their corresponding shardIndex
for (TopDocs topDocs : topDocsList) {
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc;
//When hits come from the indices with same names on multiple clusters and same shard identifier, we rely on such indices
//to have a different uuid across multiple clusters. That's how they will get a different shardIndex.
ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId();
SearchShardTarget shard = fieldDocAndSearchHit.searchHit.getShard();
ShardIdAndClusterAlias shardId = new ShardIdAndClusterAlias(shard.getShardId(), shard.getClusterAlias());
assert shards.containsKey(shardId);
fieldDocAndSearchHit.shardIndex = shards.get(shardId);
}
}
@ -294,4 +313,58 @@ final class SearchResponseMerger {
this.searchHit = searchHit;
}
}
/**
* This class is used instead of plain {@link ShardId} to support the scenario where the same remote cluster is registered twice using
* different aliases. In that case searching across the same cluster twice would make an assertion in lucene fail
* (see TopDocs#tieBreakLessThan line 86). Generally, indices with same names on different clusters have different index uuids which
* make their ShardIds different, which is not the case if the index is really the same one from the same cluster, in which case we
* need to look at the cluster alias and make sure to assign a different shardIndex based on that.
*/
private static final class ShardIdAndClusterAlias implements Comparable<ShardIdAndClusterAlias> {
private final ShardId shardId;
private final String clusterAlias;
ShardIdAndClusterAlias(ShardId shardId, String clusterAlias) {
this.shardId = shardId;
this.clusterAlias = clusterAlias;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ShardIdAndClusterAlias that = (ShardIdAndClusterAlias) o;
return shardId.equals(that.shardId) &&
clusterAlias.equals(that.clusterAlias);
}
@Override
public int hashCode() {
return Objects.hash(shardId, clusterAlias);
}
@Override
public int compareTo(ShardIdAndClusterAlias o) {
int shardIdCompareTo = shardId.compareTo(o.shardId);
if (shardIdCompareTo != 0) {
return shardIdCompareTo;
}
int clusterAliasCompareTo = clusterAlias.compareTo(o.clusterAlias);
if (clusterAliasCompareTo != 0) {
//TODO we may want to fix this, CCS returns remote results before local ones (TransportSearchAction#mergeShardsIterators)
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
return 1;
}
if (o.clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
return -1;
}
}
return clusterAliasCompareTo;
}
}
}

View File

@ -47,8 +47,10 @@ import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
@ -69,6 +71,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
@ -190,8 +193,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch
// situations when it possible due to a bug changes to null
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
final ClusterState clusterState = clusterService.state();
@ -199,26 +202,31 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters,
remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));
},
listener::onFailure));
if (shouldMinimizeRoundtrips(searchRequest)) {
ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext,
remoteClusterService, threadPool, listener,
(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));
},
listener::onFailure));
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
@ -229,12 +237,79 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
}
static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
if (searchRequest.isCcsMinimizeRoundtrips() == false) {
return false;
}
if (searchRequest.scroll() != null) {
return false;
}
SearchSourceBuilder source = searchRequest.source();
return source == null || source.collapse() == null || source.collapse().getInnerHits() == null ||
source.collapse().getInnerHits().isEmpty();
}
static void ccsRemoteReduce(SearchRequest searchRequest, OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices,
SearchTimeProvider timeProvider, Function<Boolean, InternalAggregation.ReduceContext> reduceContext,
RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener<SearchResponse> listener,
BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer) {
SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, reduceContext);
AtomicInteger skippedClusters = new AtomicInteger(0);
final AtomicReference<Exception> exceptions = new AtomicReference<>();
int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1);
final CountDown countDown = new CountDown(totalClusters);
for (Map.Entry<String, OriginalIndices> entry : remoteIndices.entrySet()) {
String clusterAlias = entry.getKey();
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
OriginalIndices indices = entry.getValue();
SearchRequest ccsSearchRequest = SearchRequest.withLocalReduction(searchRequest, indices.indices(),
clusterAlias, timeProvider.getAbsoluteStartMillis());
ActionListener<SearchResponse> ccsListener = createCCSListener(clusterAlias, skipUnavailable, countDown,
skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
remoteClusterClient.search(ccsSearchRequest, ccsListener);
}
if (localIndices != null) {
ActionListener<SearchResponse> ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
//here we provide the empty string a cluster alias, which means no prefix in index name,
//but the coord node will perform non final reduce as it's not null.
SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(),
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis());
localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener);
}
}
static SearchResponseMerger createSearchResponseMerger(SearchSourceBuilder source, SearchTimeProvider timeProvider,
Function<Boolean, InternalAggregation.ReduceContext> reduceContextFunction) {
final int from;
final int size;
final int trackTotalHitsUpTo;
if (source == null) {
from = SearchService.DEFAULT_FROM;
size = SearchService.DEFAULT_SIZE;
trackTotalHitsUpTo = SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO;
} else {
from = source.from() == -1 ? SearchService.DEFAULT_FROM : source.from();
size = source.size() == -1 ? SearchService.DEFAULT_SIZE : source.size();
trackTotalHitsUpTo = source.trackTotalHitsUpTo() == null
? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo();
//here we modify the original source so we can re-use it by setting it to each outgoing search request
source.from(0);
source.size(from + size);
//TODO when searching only against a remote cluster, we could ask directly for the final number of results and let
//the remote cluster do a final reduction, yet that is not possible as we are providing a localClusterAlias which
//will automatically make the reduction non final
}
return new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, reduceContextFunction);
}
static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters,
Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService,
ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<RemoteTransportException> transportException = new AtomicReference<>();
final AtomicReference<Exception> exceptions = new AtomicReference<>();
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
final String clusterAlias = entry.getKey();
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
@ -242,49 +317,53 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
final String[] indices = entry.getValue().indices();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
clusterClient.admin().cluster().searchShards(searchShardsRequest, new ActionListener<ClusterSearchShardsResponse>() {
clusterClient.admin().cluster().searchShards(searchShardsRequest,
new CCSActionListener<ClusterSearchShardsResponse, Map<String, ClusterSearchShardsResponse>>(
clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener) {
@Override
public void onResponse(ClusterSearchShardsResponse response) {
searchShardsResponses.put(clusterAlias, response);
maybeFinish();
void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse);
}
@Override
public void onFailure(Exception e) {
if (skipUnavailable) {
skippedClusters.incrementAndGet();
} else {
RemoteTransportException exception =
new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
}
maybeFinish();
}
private void maybeFinish() {
if (responsesCountDown.countDown()) {
RemoteTransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
listener.onFailure(transportException.get());
}
}
Map<String, ClusterSearchShardsResponse> createFinalResponse() {
return searchShardsResponses;
}
}
);
}
}
private static ActionListener<SearchResponse> createCCSListener(String clusterAlias, boolean skipUnavailable, CountDown countDown,
AtomicInteger skippedClusters, AtomicReference<Exception> exceptions,
SearchResponseMerger searchResponseMerger, int totalClusters,
ActionListener<SearchResponse> originalListener) {
return new CCSActionListener<SearchResponse, SearchResponse>(clusterAlias, skipUnavailable, countDown, skippedClusters,
exceptions, originalListener) {
@Override
void innerOnResponse(SearchResponse searchResponse) {
searchResponseMerger.add(searchResponse);
}
@Override
SearchResponse createFinalResponse() {
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalClusters, searchResponseMerger.numResponses(),
skippedClusters.get());
return searchResponseMerger.getMergedResponse(clusters);
}
};
}
private void executeLocalSearch(Task task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
ClusterState clusterState, ActionListener<SearchResponse> listener) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
}
static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, Map<String, DiscoveryNode>> clusterToNode = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterAlias = entry.getKey();
@ -491,4 +570,70 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
+ "] to a greater value if you really want to query that many shards at the same time.");
}
}
abstract static class CCSActionListener<Response, FinalResponse> implements ActionListener<Response> {
private final String clusterAlias;
private final boolean skipUnavailable;
private final CountDown countDown;
private final AtomicInteger skippedClusters;
private final AtomicReference<Exception> exceptions;
private final ActionListener<FinalResponse> originalListener;
CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
this.clusterAlias = clusterAlias;
this.skipUnavailable = skipUnavailable;
this.countDown = countDown;
this.skippedClusters = skippedClusters;
this.exceptions = exceptions;
this.originalListener = originalListener;
}
@Override
public final void onResponse(Response response) {
innerOnResponse(response);
maybeFinish();
}
abstract void innerOnResponse(Response response);
@Override
public final void onFailure(Exception e) {
if (skipUnavailable) {
skippedClusters.incrementAndGet();
} else {
Exception exception = e;
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
exception = new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e);
}
if (exceptions.compareAndSet(null, exception) == false) {
exceptions.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
}
maybeFinish();
}
private void maybeFinish() {
if (countDown.countDown()) {
Exception exception = exceptions.get();
if (exception == null) {
FinalResponse response;
try {
response = createFinalResponse();
} catch(Exception e) {
originalListener.onFailure(e);
return;
}
originalListener.onResponse(response);
} else {
originalListener.onFailure(exceptions.get());
}
}
}
abstract FinalResponse createFinalResponse();
}
}

View File

@ -1009,7 +1009,7 @@ public abstract class StreamInput extends InputStream {
}
/**
* Reads an enum with type E that was serialized based on the value of it's ordinal
* Reads an enum with type E that was serialized based on the value of its ordinal
*/
public <E extends Enum<E>> E readEnum(Class<E> enumClass) throws IOException {
int ordinal = readVInt();

View File

@ -1094,7 +1094,7 @@ public abstract class StreamOutput extends OutputStream {
}
/**
* Writes an enum with type E that by serialized it based on it's ordinal value
* Writes an enum with type E based on its ordinal value
*/
public <E extends Enum<E>> void writeEnum(E enumValue) throws IOException {
writeVInt(enumValue.ordinal());

View File

@ -147,13 +147,14 @@ public class RestMultiSearchAction extends BaseRestHandler {
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
String[] types = Strings.splitStringByCommaToArray(request.param("type"));
String searchType = request.param("search_type");
boolean ccsMinimizeRoundtrips = request.paramAsBoolean("ccs_minimize_roundtrips", true);
String routing = request.param("routing");
final Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
final XContent xContent = sourceTuple.v1().xContent();
final BytesReference data = sourceTuple.v2();
MultiSearchRequest.readMultiLineFormat(data, xContent, consumer, indices, indicesOptions, types, routing,
searchType, request.getXContentRegistry(), allowExplicitIndex);
searchType, ccsMinimizeRoundtrips, request.getXContentRegistry(), allowExplicitIndex);
}
@Override

View File

@ -173,6 +173,7 @@ public class RestSearchAction extends BaseRestHandler {
searchRequest.routing(request.param("routing"));
searchRequest.preference(request.param("preference"));
searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true));
checkRestTotalHits(request, searchRequest);
}

View File

@ -148,6 +148,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT =
Setting.intSetting("search.max_open_scroll_context", 500, 0, Property.Dynamic, Property.NodeScope);
public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;
private final ThreadPool threadPool;
@ -606,10 +608,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
// if the from and size are still not set, default them
if (context.from() == -1) {
context.from(0);
context.from(DEFAULT_FROM);
}
if (context.size() == -1) {
context.size(10);
context.size(DEFAULT_SIZE);
}
// pre process

View File

@ -180,7 +180,7 @@ public class MultiSearchRequestTests extends ESTestCase {
assertThat(request.requests().get(2).routing(), equalTo("123"));
}
public void testResponseErrorToXContent() throws IOException {
public void testResponseErrorToXContent() {
long tookInMillis = randomIntBetween(1, 1000);
MultiSearchResponse response = new MultiSearchResponse(
new MultiSearchResponse.Item[] {
@ -262,12 +262,12 @@ public class MultiSearchRequestTests extends ESTestCase {
parsedRequest.add(r);
};
MultiSearchRequest.readMultiLineFormat(new BytesArray(originalBytes), xContentType.xContent(),
consumer, null, null, null, null, null, xContentRegistry(), true);
consumer, null, null, null, null, null, null, xContentRegistry(), true);
assertEquals(originalRequest, parsedRequest);
}
}
public void testEqualsAndHashcode() throws IOException {
public void testEqualsAndHashcode() {
checkEqualsAndHashCode(createMultiSearchRequest(), MultiSearchRequestTests::copyRequest, MultiSearchRequestTests::mutate);
}
@ -282,7 +282,7 @@ public class MultiSearchRequestTests extends ESTestCase {
return mutation;
}
private static MultiSearchRequest copyRequest(MultiSearchRequest request) throws IOException {
private static MultiSearchRequest copyRequest(MultiSearchRequest request) {
MultiSearchRequest copy = new MultiSearchRequest();
if (request.maxConcurrentSearchRequests() > 0) {
copy.maxConcurrentSearchRequests(request.maxConcurrentSearchRequests());
@ -294,7 +294,7 @@ public class MultiSearchRequestTests extends ESTestCase {
return copy;
}
private static MultiSearchRequest createMultiSearchRequest() throws IOException {
private static MultiSearchRequest createMultiSearchRequest() {
int numSearchRequest = randomIntBetween(1, 128);
MultiSearchRequest request = new MultiSearchRequest();
for (int j = 0; j < numSearchRequest; j++) {
@ -321,7 +321,7 @@ public class MultiSearchRequestTests extends ESTestCase {
return request;
}
private static SearchRequest createSimpleSearchRequest() throws IOException {
private static SearchRequest createSimpleSearchRequest() {
return randomSearchRequest(() -> {
// No need to return a very complex SearchSourceBuilder here, that is tested elsewhere
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

View File

@ -46,8 +46,8 @@ public class MultiSearchResponseTests extends AbstractXContentTestCase<MultiSear
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
items[i] = new MultiSearchResponse.Item(searchResponse, null);
@ -60,14 +60,13 @@ public class MultiSearchResponseTests extends AbstractXContentTestCase<MultiSear
MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numItems];
for (int i = 0; i < numItems; i++) {
if (randomBoolean()) {
// Creating a minimal response is OK, because SearchResponse self
// is tested elsewhere.
// Creating a minimal response is OK, because SearchResponse is tested elsewhere.
long tookInMillis = randomNonNegativeLong();
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = totalShards - successfulShards;
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards);
SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards,
successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters);
items[i] = new MultiSearchResponse.Item(searchResponse, null);

View File

@ -22,12 +22,12 @@ package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.AbstractSearchTestCase;
import org.elasticsearch.search.RandomSearchRequestGenerator;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
@ -48,19 +48,23 @@ public class SearchRequestTests extends AbstractSearchTestCase {
@Override
protected SearchRequest createSearchRequest() throws IOException {
SearchRequest request = super.createSearchRequest();
if (randomBoolean()) {
return super.createSearchRequest();
return request;
}
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
return searchRequest;
return SearchRequest.withLocalReduction(request, request.indices(),
randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
}
public void testClusterAliasValidation() {
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
SearchRequest searchRequest = new SearchRequest("", 0);
public void testWithLocalReduction() {
expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(null, Strings.EMPTY_ARRAY, "", 0));
SearchRequest request = new SearchRequest();
expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, null, "", 0));
expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, new String[]{null}, "", 0));
expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, null, 0));
expectThrows(IllegalArgumentException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", -1));
SearchRequest searchRequest = SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", 0);
assertNull(searchRequest.validate());
}
@ -72,10 +76,15 @@ public class SearchRequestTests extends AbstractSearchTestCase {
assertNotSame(deserializedRequest, searchRequest);
}
public void testClusterAliasSerialization() throws IOException {
public void testRandomVersionSerialization() throws IOException {
SearchRequest searchRequest = createSearchRequest();
Version version = VersionUtils.randomVersion(random());
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new, version);
if (version.before(Version.V_7_0_0)) {
assertTrue(deserializedRequest.isCcsMinimizeRoundtrips());
} else {
assertEquals(searchRequest.isCcsMinimizeRoundtrips(), deserializedRequest.isCcsMinimizeRoundtrips());
}
if (version.before(Version.V_6_7_0)) {
assertNull(deserializedRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
@ -93,6 +102,7 @@ public class SearchRequestTests extends AbstractSearchTestCase {
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
assertNull(searchRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(searchRequest);
assertTrue(searchRequest.isCcsMinimizeRoundtrips());
}
}
@ -215,6 +225,7 @@ public class SearchRequestTests extends AbstractSearchTestCase {
mutators.add(() -> mutation.searchType(randomValueOtherThan(searchRequest.searchType(),
() -> randomFrom(SearchType.DFS_QUERY_THEN_FETCH, SearchType.QUERY_THEN_FETCH))));
mutators.add(() -> mutation.source(randomValueOtherThan(searchRequest.source(), this::createSearchSourceBuilder)));
mutators.add(() -> mutation.setCcsMinimizeRoundtrips(searchRequest.isCcsMinimizeRoundtrips() == false));
randomFrom(mutators).run();
return mutation;
}

View File

@ -73,7 +73,7 @@ public class SearchResponseMergerTests extends ESTestCase {
@Before
public void init() {
numResponses = randomIntBetween(2, 10);
numResponses = randomIntBetween(1, 10);
executorService = Executors.newFixedThreadPool(numResponses);
}
@ -87,7 +87,7 @@ public class SearchResponseMergerTests extends ESTestCase {
private void awaitResponsesAdded() throws InterruptedException {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
assertTrue(executorService.awaitTermination(5, TimeUnit.SECONDS));
}
public void testMergeTookInMillis() throws InterruptedException {
@ -137,6 +137,7 @@ public class SearchResponseMergerTests extends ESTestCase {
addResponse(merger, searchResponse);
}
awaitResponsesAdded();
assertEquals(numResponses, merger.numResponses());
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
SearchResponse mergedResponse = merger.getMergedResponse(clusters);
assertSame(clusters, mergedResponse.getClusters());
@ -170,6 +171,7 @@ public class SearchResponseMergerTests extends ESTestCase {
addResponse(merger, searchResponse);
}
awaitResponsesAdded();
assertEquals(numResponses, merger.numResponses());
ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY).getShardFailures();
assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY)));
}
@ -189,6 +191,7 @@ public class SearchResponseMergerTests extends ESTestCase {
addResponse(merger, searchResponse);
}
awaitResponsesAdded();
assertEquals(numResponses, merger.numResponses());
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
SearchResponse mergedResponse = merger.getMergedResponse(clusters);
assertSame(clusters, mergedResponse.getClusters());
@ -221,6 +224,7 @@ public class SearchResponseMergerTests extends ESTestCase {
addResponse(searchResponseMerger, searchResponse);
}
awaitResponsesAdded();
assertEquals(numResponses, searchResponseMerger.numResponses());
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters);
assertSame(clusters, mergedResponse.getClusters());
@ -267,6 +271,7 @@ public class SearchResponseMergerTests extends ESTestCase {
addResponse(searchResponseMerger, searchResponse);
}
awaitResponsesAdded();
assertEquals(numResponses, searchResponseMerger.numResponses());
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters);
assertSame(clusters, mergedResponse.getClusters());
@ -334,7 +339,7 @@ public class SearchResponseMergerTests extends ESTestCase {
Iterator<Map.Entry<String, Index[]>> indicesIterator = randomRealisticIndices(numIndices, numResponses).entrySet().iterator();
for (int i = 0; i < numResponses; i++) {
Map.Entry<String, Index[]> entry = indicesIterator.next();
String clusterAlias = entry.getKey().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? null : entry.getKey();
String clusterAlias = entry.getKey();
Index[] indices = entry.getValue();
int total = randomIntBetween(1, 1000);
expectedTotal += total;
@ -386,7 +391,7 @@ public class SearchResponseMergerTests extends ESTestCase {
}
awaitResponsesAdded();
assertEquals(numResponses, searchResponseMerger.numResponses());
final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters);
@ -434,6 +439,33 @@ public class SearchResponseMergerTests extends ESTestCase {
}
}
public void testMergeNoResponsesAdded() {
long currentRelativeTime = randomLong();
final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime);
SearchResponseMerger merger = new SearchResponseMerger(0, 10, Integer.MAX_VALUE, timeProvider, flag -> null);
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
assertEquals(0, merger.numResponses());
SearchResponse response = merger.getMergedResponse(clusters);
assertSame(clusters, response.getClusters());
assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), response.getTook().millis());
assertEquals(0, response.getTotalShards());
assertEquals(0, response.getSuccessfulShards());
assertEquals(0, response.getSkippedShards());
assertEquals(0, response.getFailedShards());
assertEquals(0, response.getNumReducePhases());
assertFalse(response.isTimedOut());
assertNotNull(response.getHits().getTotalHits());
assertEquals(0, response.getHits().getTotalHits().value);
assertEquals(0, response.getHits().getHits().length);
assertEquals(TotalHits.Relation.EQUAL_TO, response.getHits().getTotalHits().relation);
assertNull(response.getScrollId());
assertSame(InternalAggregations.EMPTY, response.getAggregations());
assertNull(response.getSuggest());
assertEquals(0, response.getProfileResults().size());
assertNull(response.isTerminatedEarly());
assertEquals(0, response.getShardFailures().length);
}
private static Tuple<Integer, TotalHits.Relation> randomTrackTotalHits() {
switch(randomIntBetween(0, 2)) {
case 0:
@ -499,8 +531,11 @@ public class SearchResponseMergerTests extends ESTestCase {
for (int i = 0; i < numClusters; i++) {
Index[] indices = new Index[indicesNames.length];
for (int j = 0; j < indices.length; j++) {
//Realistically clusters have the same indices with same names, but different uuid
indices[j] = new Index(indicesNames[j], randomAlphaOfLength(10));
String indexName = indicesNames[j];
//Realistically clusters have the same indices with same names, but different uuid. Yet it can happen that the same cluster
//is registered twice with different aliases and searched multiple times as part of the same search request.
String indexUuid = frequently() ? randomAlphaOfLength(10) : indexName;
indices[j] = new Index(indexName, indexUuid);
}
String clusterAlias;
if (frequently() || indicesPerCluster.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
@ -551,10 +586,22 @@ public class SearchResponseMergerTests extends ESTestCase {
}
}
}
int shardIdCompareTo = a.getShard().getShardId().compareTo(b.getShard().getShardId());
SearchShardTarget aShard = a.getShard();
SearchShardTarget bShard = b.getShard();
int shardIdCompareTo = aShard.getShardId().compareTo(bShard.getShardId());
if (shardIdCompareTo != 0) {
return shardIdCompareTo;
}
int clusterAliasCompareTo = aShard.getClusterAlias().compareTo(bShard.getClusterAlias());
if (clusterAliasCompareTo != 0) {
if (aShard.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
return 1;
}
if (bShard.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
return -1;
}
return clusterAliasCompareTo;
}
return Integer.compare(a.docId(), b.docId());
}
}

View File

@ -20,13 +20,11 @@
package org.elasticsearch.action.search;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -246,7 +244,8 @@ public class SearchResponseTests extends ESTestCase {
new InternalSearchResponse(
new SearchHits(hits, new TotalHits(100, TotalHits.Relation.EQUAL_TO), 1.5f), null, null, null, false, null, 1
),
null, 0, 0, 0, 0, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(5, 3, 2));
null, 0, 0, 0, 0, ShardSearchFailure.EMPTY_ARRAY,
new SearchResponse.Clusters(5, 3, 2));
StringBuilder expectedString = new StringBuilder();
expectedString.append("{");
{
@ -279,24 +278,18 @@ public class SearchResponseTests extends ESTestCase {
public void testSerialization() throws IOException {
SearchResponse searchResponse = createTestItem(false);
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
searchResponse.writeTo(bytesStreamOutput);
try (StreamInput in = new NamedWriteableAwareStreamInput(
StreamInput.wrap(bytesStreamOutput.bytes().toBytesRef().bytes), namedWriteableRegistry)) {
SearchResponse serialized = new SearchResponse();
serialized.readFrom(in);
if (searchResponse.getHits().getTotalHits() == null) {
assertNull(serialized.getHits().getTotalHits());
} else {
assertEquals(searchResponse.getHits().getTotalHits().value, serialized.getHits().getTotalHits().value);
assertEquals(searchResponse.getHits().getTotalHits().relation, serialized.getHits().getTotalHits().relation);
}
assertEquals(searchResponse.getHits().getHits().length, serialized.getHits().getHits().length);
assertEquals(searchResponse.getNumReducePhases(), serialized.getNumReducePhases());
assertEquals(searchResponse.getFailedShards(), serialized.getFailedShards());
assertEquals(searchResponse.getTotalShards(), serialized.getTotalShards());
assertEquals(searchResponse.getSkippedShards(), serialized.getSkippedShards());
assertEquals(searchResponse.getClusters(), serialized.getClusters());
SearchResponse deserialized = copyStreamable(searchResponse, namedWriteableRegistry, SearchResponse::new, Version.CURRENT);
if (searchResponse.getHits().getTotalHits() == null) {
assertNull(deserialized.getHits().getTotalHits());
} else {
assertEquals(searchResponse.getHits().getTotalHits().value, deserialized.getHits().getTotalHits().value);
assertEquals(searchResponse.getHits().getTotalHits().relation, deserialized.getHits().getTotalHits().relation);
}
assertEquals(searchResponse.getHits().getHits().length, deserialized.getHits().getHits().length);
assertEquals(searchResponse.getNumReducePhases(), deserialized.getNumReducePhases());
assertEquals(searchResponse.getFailedShards(), deserialized.getFailedShards());
assertEquals(searchResponse.getTotalShards(), deserialized.getTotalShards());
assertEquals(searchResponse.getSkippedShards(), deserialized.getSkippedShards());
assertEquals(searchResponse.getClusters(), deserialized.getClusters());
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
@ -41,7 +42,7 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
assertEquals(RestStatus.CREATED, indexResponse.status());
{
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "local", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
@ -52,7 +53,7 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
assertEquals("1", hit.getId());
}
{
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
@ -93,19 +94,19 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
assertEquals(0, searchResponse.getTotalShards());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits().value);
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0);
searchRequest.indices("<test-{now/d}>");
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
rangeQuery.gte("1970-01-01");

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.search;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
@ -34,13 +36,24 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -320,169 +333,495 @@ public class TransportSearchActionTests extends ESTestCase {
}
}
private MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes) {
return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool);
}
public void testCollectSearchShards() throws Exception {
int numClusters = randomIntBetween(2, 10);
private MockTransportService[] startTransport(int numClusters, DiscoveryNode[] nodes, Map<String, OriginalIndices> remoteIndices,
Settings.Builder settingsBuilder) {
MockTransportService[] mockTransportServices = new MockTransportService[numClusters];
DiscoveryNode[] nodes = new DiscoveryNode[numClusters];
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
Settings.Builder builder = Settings.builder();
for (int i = 0; i < numClusters; i++) {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes);
MockTransportService remoteSeedTransport = RemoteClusterConnectionTests.startTransport("node_remote" + i, knownNodes,
Version.CURRENT, threadPool);
mockTransportServices[i] = remoteSeedTransport;
DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode();
knownNodes.add(remoteSeedNode);
nodes[i] = remoteSeedNode;
builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
settingsBuilder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
remoteIndices.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
}
return mockTransportServices;
}
private static SearchResponse emptySearchResponse() {
InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0],
new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1);
return new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
}
public void testCCSRemoteReduceMergeFails() throws Exception {
int numClusters = randomIntBetween(2, 10);
DiscoveryNode[] nodes = new DiscoveryNode[numClusters];
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
Settings.Builder builder = Settings.builder();
MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder);
Settings settings = builder.build();
try {
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicInteger skippedClusters = new AtomicInteger();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
assertEquals(1, shardsResponse.getNodes().length);
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> failure = new AtomicReference<>();
AtomicInteger skippedClusters = new AtomicInteger(0);
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
}
int numDisconnectedClusters = randomIntBetween(1, numClusters);
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
while (disconnectedNodes.size() < numDisconnectedClusters) {
int i = randomIntBetween(0, numClusters - 1);
if (disconnectedNodes.add(nodes[i])) {
assertTrue(disconnectedNodesIndices.add(i));
}
}
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
}
});
for (DiscoveryNode disconnectedNode : disconnectedNodes) {
service.addFailToSendNoConnectRule(disconnectedNode.getAddress());
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Exception> failure = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
}
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
for (int i : disconnectedNodesIndices) {
RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true);
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters - disconnectedNodesIndices.size(), map.size());
assertEquals(skippedClusters.get(), disconnectedNodesIndices.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
if (disconnectedNodesIndices.contains(i)) {
assertFalse(map.containsKey(clusterAlias));
} else {
assertNotNull(map.get(clusterAlias));
}
}
}
//give transport service enough time to realize that the node is down, and to notify the connection listeners
//so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next
assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS));
service.clearAllRules();
if (randomBoolean()) {
for (int i : disconnectedNodesIndices) {
if (randomBoolean()) {
RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true);
}
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
assertNotNull(map.get(clusterAlias));
}
}
assertEquals(0, service.getConnectionManager().size());
boolean local = randomBoolean();
OriginalIndices localIndices = local ? new OriginalIndices(new String[]{"index"}, SearchRequest.DEFAULT_INDICES_OPTIONS) : null;
TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
Function<Boolean, InternalAggregation.ReduceContext> reduceContext = finalReduce -> null;
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
SearchRequest searchRequest = new SearchRequest();
searchRequest.preference("null_target");
final CountDownLatch latch = new CountDownLatch(1);
SetOnce<Tuple<SearchRequest, ActionListener<SearchResponse>>> setOnce = new SetOnce<>();
AtomicReference<Exception> failure = new AtomicReference<>();
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(r -> fail("no response expected"), failure::set), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext,
remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
Tuple<SearchRequest, ActionListener<SearchResponse>> tuple = setOnce.get();
assertEquals("", tuple.v1().getLocalClusterAlias());
assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class));
tuple.v2().onResponse(emptySearchResponse());
}
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNotNull(failure.get());
//the intention here is not to test that we throw NPE, rather to trigger a situation that makes
//SearchResponseMerger#getMergedResponse fail unexpectedly and verify that the listener is properly notified with the NPE
assertThat(failure.get(), instanceOf(NullPointerException.class));
assertEquals(0, service.getConnectionManager().size());
} finally {
for (MockTransportService mockTransportService : mockTransportServices) {
mockTransportService.close();
}
}
}
public void testCCSRemoteReduce() throws Exception {
int numClusters = randomIntBetween(2, 10);
DiscoveryNode[] nodes = new DiscoveryNode[numClusters];
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
Settings.Builder builder = Settings.builder();
MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder);
Settings settings = builder.build();
boolean local = randomBoolean();
OriginalIndices localIndices = local ? new OriginalIndices(new String[]{"index"}, SearchRequest.DEFAULT_INDICES_OPTIONS) : null;
int totalClusters = numClusters + (local ? 1 : 0);
TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
Function<Boolean, InternalAggregation.ReduceContext> reduceContext = finalReduce -> null;
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
{
SearchRequest searchRequest = new SearchRequest();
final CountDownLatch latch = new CountDownLatch(1);
SetOnce<Tuple<SearchRequest, ActionListener<SearchResponse>>> setOnce = new SetOnce<>();
AtomicReference<SearchResponse> response = new AtomicReference<>();
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(response::set, e -> fail("no failures expected")), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext,
remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
Tuple<SearchRequest, ActionListener<SearchResponse>> tuple = setOnce.get();
assertEquals("", tuple.v1().getLocalClusterAlias());
assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class));
tuple.v2().onResponse(emptySearchResponse());
}
awaitLatch(latch, 5, TimeUnit.SECONDS);
SearchResponse searchResponse = response.get();
assertEquals(0, searchResponse.getClusters().getSkipped());
assertEquals(totalClusters, searchResponse.getClusters().getTotal());
assertEquals(totalClusters, searchResponse.getClusters().getSuccessful());
assertEquals(totalClusters + 1, searchResponse.getNumReducePhases());
}
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.preference("index_not_found");
final CountDownLatch latch = new CountDownLatch(1);
SetOnce<Tuple<SearchRequest, ActionListener<SearchResponse>>> setOnce = new SetOnce<>();
AtomicReference<Exception> failure = new AtomicReference<>();
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(r -> fail("no response expected"), failure::set), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext,
remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
Tuple<SearchRequest, ActionListener<SearchResponse>> tuple = setOnce.get();
assertEquals("", tuple.v1().getLocalClusterAlias());
assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class));
tuple.v2().onResponse(emptySearchResponse());
}
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
}
int numDisconnectedClusters = randomIntBetween(1, numClusters);
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
while (disconnectedNodes.size() < numDisconnectedClusters) {
int i = randomIntBetween(0, numClusters - 1);
if (disconnectedNodes.add(nodes[i])) {
assertTrue(disconnectedNodesIndices.add(i));
}
}
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
}
});
for (DiscoveryNode disconnectedNode : disconnectedNodes) {
service.addFailToSendNoConnectRule(disconnectedNode.getAddress());
}
{
SearchRequest searchRequest = new SearchRequest();
final CountDownLatch latch = new CountDownLatch(1);
SetOnce<Tuple<SearchRequest, ActionListener<SearchResponse>>> setOnce = new SetOnce<>();
AtomicReference<Exception> failure = new AtomicReference<>();
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(r -> fail("no response expected"), failure::set), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext,
remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
Tuple<SearchRequest, ActionListener<SearchResponse>> tuple = setOnce.get();
assertEquals("", tuple.v1().getLocalClusterAlias());
assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class));
tuple.v2().onResponse(emptySearchResponse());
}
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
}
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
for (int i : disconnectedNodesIndices) {
RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true);
}
{
SearchRequest searchRequest = new SearchRequest();
final CountDownLatch latch = new CountDownLatch(1);
SetOnce<Tuple<SearchRequest, ActionListener<SearchResponse>>> setOnce = new SetOnce<>();
AtomicReference<SearchResponse> response = new AtomicReference<>();
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(response::set, e -> fail("no failures expected")), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext,
remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
Tuple<SearchRequest, ActionListener<SearchResponse>> tuple = setOnce.get();
assertEquals("", tuple.v1().getLocalClusterAlias());
assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class));
tuple.v2().onResponse(emptySearchResponse());
}
awaitLatch(latch, 5, TimeUnit.SECONDS);
SearchResponse searchResponse = response.get();
assertEquals(disconnectedNodesIndices.size(), searchResponse.getClusters().getSkipped());
assertEquals(totalClusters, searchResponse.getClusters().getTotal());
int successful = totalClusters - disconnectedNodesIndices.size();
assertEquals(successful, searchResponse.getClusters().getSuccessful());
assertEquals(successful == 0 ? 0 : successful + 1, searchResponse.getNumReducePhases());
}
//give transport service enough time to realize that the node is down, and to notify the connection listeners
//so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next
assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS));
service.clearAllRules();
if (randomBoolean()) {
for (int i : disconnectedNodesIndices) {
if (randomBoolean()) {
RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true);
}
}
}
{
SearchRequest searchRequest = new SearchRequest();
final CountDownLatch latch = new CountDownLatch(1);
SetOnce<Tuple<SearchRequest, ActionListener<SearchResponse>>> setOnce = new SetOnce<>();
AtomicReference<SearchResponse> response = new AtomicReference<>();
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(response::set, e -> fail("no failures expected")), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext,
remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
Tuple<SearchRequest, ActionListener<SearchResponse>> tuple = setOnce.get();
assertEquals("", tuple.v1().getLocalClusterAlias());
assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class));
tuple.v2().onResponse(emptySearchResponse());
}
awaitLatch(latch, 5, TimeUnit.SECONDS);
SearchResponse searchResponse = response.get();
assertEquals(0, searchResponse.getClusters().getSkipped());
assertEquals(totalClusters, searchResponse.getClusters().getTotal());
assertEquals(totalClusters, searchResponse.getClusters().getSuccessful());
assertEquals(totalClusters + 1, searchResponse.getNumReducePhases());
}
assertEquals(0, service.getConnectionManager().size());
} finally {
for (MockTransportService mockTransportService : mockTransportServices) {
mockTransportService.close();
}
}
}
public void testCollectSearchShards() throws Exception {
int numClusters = randomIntBetween(2, 10);
DiscoveryNode[] nodes = new DiscoveryNode[numClusters];
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
Settings.Builder builder = Settings.builder();
MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder);
Settings settings = builder.build();
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicInteger skippedClusters = new AtomicInteger();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
assertEquals(1, shardsResponse.getNodes().length);
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> failure = new AtomicReference<>();
AtomicInteger skippedClusters = new AtomicInteger(0);
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
}
int numDisconnectedClusters = randomIntBetween(1, numClusters);
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
while (disconnectedNodes.size() < numDisconnectedClusters) {
int i = randomIntBetween(0, numClusters - 1);
if (disconnectedNodes.add(nodes[i])) {
assertTrue(disconnectedNodesIndices.add(i));
}
}
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
}
});
for (DiscoveryNode disconnectedNode : disconnectedNodes) {
service.addFailToSendNoConnectRule(disconnectedNode.getAddress());
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Exception> failure = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
}
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
for (int i : disconnectedNodesIndices) {
RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true);
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters - disconnectedNodesIndices.size(), map.size());
assertEquals(skippedClusters.get(), disconnectedNodesIndices.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
if (disconnectedNodesIndices.contains(i)) {
assertFalse(map.containsKey(clusterAlias));
} else {
assertNotNull(map.get(clusterAlias));
}
}
}
//give transport service enough time to realize that the node is down, and to notify the connection listeners
//so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next
assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS));
service.clearAllRules();
if (randomBoolean()) {
for (int i : disconnectedNodesIndices) {
if (randomBoolean()) {
RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true);
}
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
assertNotNull(map.get(clusterAlias));
}
}
assertEquals(0, service.getConnectionManager().size());
} finally {
for (MockTransportService mockTransportService : mockTransportServices) {
mockTransportService.close();
}
}
}
public void testCreateSearchResponseMerger() {
TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
Function<Boolean, InternalAggregation.ReduceContext> reduceContext = flag -> null;
{
SearchSourceBuilder source = new SearchSourceBuilder();
assertEquals(-1, source.size());
assertEquals(-1, source.from());
assertNull(source.trackTotalHitsUpTo());
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, reduceContext);
assertEquals(0, merger.from);
assertEquals(10, merger.size);
assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo);
assertEquals(0, source.from());
assertEquals(10, source.size());
assertNull(source.trackTotalHitsUpTo());
}
{
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(null, timeProvider, reduceContext);
assertEquals(0, merger.from);
assertEquals(10, merger.size);
assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo);
}
{
SearchSourceBuilder source = new SearchSourceBuilder();
int originalFrom = randomIntBetween(0, 1000);
source.from(originalFrom);
int originalSize = randomIntBetween(0, 1000);
source.size(originalSize);
int trackTotalHitsUpTo = randomIntBetween(0, Integer.MAX_VALUE);
source.trackTotalHitsUpTo(trackTotalHitsUpTo);
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, reduceContext);
assertEquals(0, source.from());
assertEquals(originalFrom + originalSize, source.size());
assertEquals(trackTotalHitsUpTo, (int)source.trackTotalHitsUpTo());
assertEquals(originalFrom, merger.from);
assertEquals(originalSize, merger.size);
assertEquals(trackTotalHitsUpTo, merger.trackTotalHitsUpTo);
}
}
public void testShouldMinimizeRoundtrips() throws Exception {
{
SearchRequest searchRequest = new SearchRequest();
assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest));
}
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder());
assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest));
}
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.scroll("5s");
assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest));
}
{
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder source = new SearchSourceBuilder();
searchRequest.source(source);
CollapseBuilder collapseBuilder = new CollapseBuilder("field");
source.collapse(collapseBuilder);
collapseBuilder.setInnerHits(new InnerHitBuilder("inner"));
assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest));
}
{
SearchRequestTests searchRequestTests = new SearchRequestTests();
searchRequestTests.setUp();
SearchRequest searchRequest = searchRequestTests.createSearchRequest();
searchRequest.scroll((Scroll)null);
SearchSourceBuilder source = searchRequest.source();
if (source != null) {
CollapseBuilder collapse = source.collapse();
if (collapse != null) {
collapse.setInnerHits(Collections.emptyList());
}
}
searchRequest.setCcsMinimizeRoundtrips(true);
assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest));
searchRequest.setCcsMinimizeRoundtrips(false);
assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest));
}
}
}

View File

@ -813,7 +813,7 @@ public class BytesStreamsTests extends ESTestCase {
assertEquals(0, input.available());
}
private void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException {
private static void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
out.writeTimeValue(value);
assertEquals(expectedSize, out.size());

View File

@ -62,10 +62,10 @@ public class RemoteClusterClientTests extends ESTestCase {
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
// also test a failure, there is no handler for search registered
// also test a failure, there is no handler for scroll registered
ActionNotFoundTransportException ex = expectThrows(ActionNotFoundTransportException.class,
() -> client.prepareSearch().get());
assertEquals("No handler for action [indices:data/read/search]", ex.getMessage());
() -> client.prepareSearchScroll("").get());
assertEquals("No handler for action [indices:data/read/scroll]", ex.getMessage());
}
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -29,6 +30,10 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -47,6 +52,10 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -130,6 +139,24 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
}
});
newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new,
(request, channel, task) -> {
if ("index_not_found".equals(request.preference())) {
channel.sendResponse(new IndexNotFoundException("index"));
return;
}
SearchHits searchHits;
if ("null_target".equals(request.preference())) {
searchHits = new SearchHits(new SearchHit[] {new SearchHit(0)}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1F);
} else {
searchHits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN);
}
InternalSearchResponse response = new InternalSearchResponse(searchHits,
InternalAggregations.EMPTY, null, null, false, null, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
channel.sendResponse(searchResponse);
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();

View File

@ -84,18 +84,11 @@ public class RandomSearchRequestGenerator {
* {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}.
*/
public static SearchRequest randomSearchRequest(Supplier<SearchSourceBuilder> randomSearchSourceBuilder) {
return randomSearchRequest(new SearchRequest(), randomSearchSourceBuilder);
}
/**
* Set random fields to the provided search request.
*
* @param searchRequest the search request
* @param randomSearchSourceBuilder builds a random {@link SearchSourceBuilder}. You can use
* {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}.
*/
public static SearchRequest randomSearchRequest(SearchRequest searchRequest, Supplier<SearchSourceBuilder> randomSearchSourceBuilder) {
SearchRequest searchRequest = new SearchRequest();
searchRequest.allowPartialSearchResults(true);
if (randomBoolean()) {
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
}
if (randomBoolean()) {
searchRequest.indices(generateRandomStringArray(10, 10, false, false));
}

View File

@ -97,6 +97,9 @@ teardown:
terms:
field: f1.keyword
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
@ -115,6 +118,9 @@ teardown:
terms:
field: f1.keyword
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
@ -134,6 +140,9 @@ teardown:
terms:
field: f1.keyword
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6}
- match: { hits.hits.0._index: "my_remote_cluster:test_index"}
@ -152,6 +161,7 @@ teardown:
terms:
field: f1.keyword
- is_false: _clusters
- match: { _shards.total: 2 }
- match: { hits.total: 5}
- match: { hits.hits.0._index: "local_index"}
@ -182,6 +192,9 @@ teardown:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
@ -193,6 +206,9 @@ teardown:
rest_total_hits_as_int: true
index: "*_remote_cluster:test_ind*"
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 6 }
- match: { hits.total: 12 }
@ -205,6 +221,9 @@ teardown:
rest_total_hits_as_int: true
index: my_remote_cluster:aliased_test_index
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 2 }
- match: { hits.hits.0._source.filter_field: 1 }
@ -219,6 +238,9 @@ teardown:
rest_total_hits_as_int: true
index: my_remote_cluster:secure_alias # TODO make this a wildcard once
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 2 }
- match: { hits.total: 1 }
- is_true: hits.hits.0._source.secure

View File

@ -52,6 +52,9 @@ teardown:
query:
match_all: {}
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- set: {_scroll_id: scroll_id}
- match: {hits.total: 6 }
- length: {hits.hits: 4 }
@ -66,6 +69,7 @@ teardown:
rest_total_hits_as_int: true
body: { "scroll_id": "$scroll_id", "scroll": "1m"}
- is_false: _clusters
- match: {hits.total: 6 }
- length: {hits.hits: 2 }
- match: {hits.hits.0._source.filter_field: 1 }
@ -100,6 +104,9 @@ teardown:
query:
match_all: {}
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- set: {_scroll_id: scroll_id}
- match: {hits.total: 6 }
- length: {hits.hits: 4 }

View File

@ -66,6 +66,7 @@ teardown:
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
ccs_minimize_roundtrips: false
rest_total_hits_as_int: true
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
@ -83,6 +84,7 @@ teardown:
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
ccs_minimize_roundtrips: false
rest_total_hits_as_int: true
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1