diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index f031dfa5810..d6abbf73e88 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.Nullable; @@ -43,7 +44,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -70,7 +70,7 @@ abstract class AbstractSearchAsyncAction exten private final Object shardFailuresMutex = new Object(); private final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger skippedOps = new AtomicInteger(); - private final TransportSearchAction.SearchTimeProvider timeProvider; + private final SearchTimeProvider timeProvider; private final SearchResponse.Clusters clusters; AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, @@ -79,7 +79,7 @@ abstract class AbstractSearchAsyncAction exten Map> indexRoutings, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, - TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, + SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters) { super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor); @@ -103,8 +103,7 @@ abstract class AbstractSearchAsyncAction exten * Builds how long it took to execute the search. */ long buildTookInMillis() { - return TimeUnit.NANOSECONDS.toMillis( - timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos()); + return timeProvider.buildTookInMillis(); } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 418d95b2077..027d9d5f10c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -170,7 +170,7 @@ public final class SearchPhaseController { if (queryResult.hasConsumedTopDocs() == false) { // already consumed? final TopDocsAndMaxScore td = queryResult.consumeTopDocs(); assert td != null; - topDocsStats.add(td); + topDocsStats.add(td, queryResult.searchTimedOut(), queryResult.terminatedEarly()); // make sure we set the shard index before we add it - the consumer didn't do that yet if (td.topDocs.scoreDocs.length > 0) { setShardIndex(td.topDocs, queryResult.getShardIndex()); @@ -439,12 +439,10 @@ public final class SearchPhaseController { boolean performFinalReduce) { assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase - boolean timedOut = false; - Boolean terminatedEarly = null; if (queryResults.isEmpty()) { // early terminate we have nothing to reduce final TotalHits totalHits = topDocsStats.getTotalHits(); - return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore, - timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true); + return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(), + false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true); } final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); final boolean hasSuggest = firstResult.suggest() != null; @@ -476,16 +474,6 @@ public final class SearchPhaseController { QuerySearchResult result = entry.queryResult(); from = result.from(); size = result.size(); - if (result.searchTimedOut()) { - timedOut = true; - } - if (result.terminatedEarly() != null) { - if (terminatedEarly == null) { - terminatedEarly = result.terminatedEarly(); - } else if (result.terminatedEarly()) { - terminatedEarly = true; - } - } if (hasSuggest) { assert result.suggest() != null; for (Suggestion> suggestion : result.suggest()) { @@ -508,8 +496,8 @@ public final class SearchPhaseController { final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size); final TotalHits totalHits = topDocsStats.getTotalHits(); - return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore, - timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs, + return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(), + topDocsStats.timedOut, topDocsStats.terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs, firstResult.sortValueFormats(), numReducePhases, size, from, false); } @@ -577,11 +565,7 @@ public final class SearchPhaseController { } this.totalHits = totalHits; this.fetchHits = fetchHits; - if (Float.isInfinite(maxScore)) { - this.maxScore = Float.NaN; - } else { - this.maxScore = maxScore; - } + this.maxScore = maxScore; this.timedOut = timedOut; this.terminatedEarly = terminatedEarly; this.suggest = suggest; @@ -682,7 +666,7 @@ public final class SearchPhaseController { } if (hasTopDocs) { final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null - topDocsStats.add(topDocs); + topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly()); setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); topDocsBuffer[i] = topDocs.topDocs; } @@ -744,11 +728,9 @@ public final class SearchPhaseController { private long totalHits; private TotalHits.Relation totalHitsRelation; long fetchHits; - float maxScore = Float.NEGATIVE_INFINITY; - - TopDocsStats() { - this(SearchContext.TRACK_TOTAL_HITS_ACCURATE); - } + private float maxScore = Float.NEGATIVE_INFINITY; + boolean timedOut; + Boolean terminatedEarly; TopDocsStats(int trackTotalHitsUpTo) { this.trackTotalHitsUpTo = trackTotalHitsUpTo; @@ -756,6 +738,10 @@ public final class SearchPhaseController { this.totalHitsRelation = Relation.EQUAL_TO; } + float getMaxScore() { + return Float.isInfinite(maxScore) ? Float.NaN : maxScore; + } + TotalHits getTotalHits() { if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) { return null; @@ -766,7 +752,7 @@ public final class SearchPhaseController { if (totalHits < trackTotalHitsUpTo) { return new TotalHits(totalHits, totalHitsRelation); } else { - /** + /* * The user requested to count the total hits up to trackTotalHitsUpTo * so we return this lower bound when the total hits is greater than this value. * This can happen when multiple shards are merged since the limit to track total hits @@ -777,7 +763,7 @@ public final class SearchPhaseController { } } - void add(TopDocsAndMaxScore topDocs) { + void add(TopDocsAndMaxScore topDocs, boolean timedOut, Boolean terminatedEarly) { if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) { totalHits += topDocs.topDocs.totalHits.value; if (topDocs.topDocs.totalHits.relation == Relation.GREATER_THAN_OR_EQUAL_TO) { @@ -788,6 +774,16 @@ public final class SearchPhaseController { if (!Float.isNaN(topDocs.maxScore)) { maxScore = Math.max(maxScore, topDocs.maxScore); } + if (timedOut) { + this.timedOut = true; + } + if (terminatedEarly != null) { + if (this.terminatedEarly == null) { + this.terminatedEarly = terminatedEarly; + } else if (terminatedEarly) { + this.terminatedEarly = true; + } + } } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java new file mode 100644 index 00000000000..b146d42c0d2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -0,0 +1,297 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldDocs; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.search.grouping.CollapseTopFieldDocs; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider; +import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.profile.ProfileShardResult; +import org.elasticsearch.search.profile.SearchProfileShardResults; +import org.elasticsearch.search.suggest.Suggest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; + +import static org.elasticsearch.action.search.SearchPhaseController.TopDocsStats; +import static org.elasticsearch.action.search.SearchPhaseController.mergeTopDocs; +import static org.elasticsearch.action.search.SearchResponse.Clusters; + +/** + * Merges multiple search responses into one. Used in cross-cluster search when reduction is performed locally on each cluster. + * The CCS coordinating node sends one search request per remote cluster involved and gets one search response back from each one of them. + * Such responses contain all the info to be able to perform an additional reduction and return results back to the user. + * Preconditions are that only non final reduction has been performed on each cluster, meaning that buckets have not been pruned locally + * and pipeline aggregations have not yet been executed. Also, from+size search hits need to be requested to each cluster and such results + * have all already been fetched downstream. + * This approach consists of a different trade-off compared to ordinary cross-cluster search where we fan out to all the shards, no matter + * whether they belong to the local or the remote cluster. Assuming that there commonly is network latency when communicating with remote + * clusters, limiting the number of requests to one per cluster is beneficial, and outweighs the downside of fetching many more hits than + * needed downstream and returning bigger responses to the coordinating node. + * Known limitations: + * - scroll requests are not supported + * - field collapsing is supported, but whenever inner_hits are requested, they will be retrieved by each cluster locally after the fetch + * phase, through the {@link ExpandSearchPhase}. Such inner_hits are not merged together as part of hits reduction. + */ +//TODO it may make sense to integrate the remote clusters responses as a shard response in the initial search phase and ignore hits coming +//from the remote clusters in the fetch phase. This would be identical to the removed QueryAndFetch strategy except that only the remote +//cluster response would have the fetch results. +final class SearchResponseMerger { + private final int from; + private final int size; + private final int trackTotalHitsUpTo; + private final SearchTimeProvider searchTimeProvider; + private final Function reduceContextFunction; + private final List searchResponses = new CopyOnWriteArrayList<>(); + + SearchResponseMerger(int from, int size, int trackTotalHitsUpTo, SearchTimeProvider searchTimeProvider, + Function reduceContextFunction) { + this.from = from; + this.size = size; + this.trackTotalHitsUpTo = trackTotalHitsUpTo; + this.searchTimeProvider = Objects.requireNonNull(searchTimeProvider); + this.reduceContextFunction = Objects.requireNonNull(reduceContextFunction); + } + + /** + * Add a search response to the list of responses to be merged together into one. + * Merges currently happen at once when all responses are available and {@link #getMergedResponse(Clusters)} )} is called. + * That may change in the future as it's possible to introduce incremental merges as responses come in if necessary. + */ + void add(SearchResponse searchResponse) { + searchResponses.add(searchResponse); + } + + /** + * Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)} + * so that all responses are merged into a single one. + */ + SearchResponse getMergedResponse(Clusters clusters) { + assert searchResponses.size() > 1; + int totalShards = 0; + int skippedShards = 0; + int successfulShards = 0; + //the current reduce phase counts as one + int numReducePhases = 1; + List failures = new ArrayList<>(); + Map profileResults = new HashMap<>(); + List aggs = new ArrayList<>(); + Map shards = new TreeMap<>(); + List topDocsList = new ArrayList<>(searchResponses.size()); + Map> groupedSuggestions = new HashMap<>(); + Boolean trackTotalHits = null; + + TopDocsStats topDocsStats = new TopDocsStats(trackTotalHitsUpTo); + + for (SearchResponse searchResponse : searchResponses) { + totalShards += searchResponse.getTotalShards(); + skippedShards += searchResponse.getSkippedShards(); + successfulShards += searchResponse.getSuccessfulShards(); + numReducePhases += searchResponse.getNumReducePhases(); + + Collections.addAll(failures, searchResponse.getShardFailures()); + + profileResults.putAll(searchResponse.getProfileResults()); + + if (searchResponse.getAggregations() != null) { + InternalAggregations internalAggs = (InternalAggregations) searchResponse.getAggregations(); + aggs.add(internalAggs); + } + + Suggest suggest = searchResponse.getSuggest(); + if (suggest != null) { + for (Suggest.Suggestion> entries : suggest) { + List suggestionList = groupedSuggestions.computeIfAbsent(entries.getName(), s -> new ArrayList<>()); + suggestionList.add(entries); + } + } + + SearchHits searchHits = searchResponse.getHits(); + + final TotalHits totalHits; + if (searchHits.getTotalHits() == null) { + //in case we didn't track total hits, we get null from each cluster, but we need to set 0 eq to the TopDocs + totalHits = new TotalHits(0, TotalHits.Relation.EQUAL_TO); + assert trackTotalHits == null || trackTotalHits == false; + trackTotalHits = false; + } else { + totalHits = searchHits.getTotalHits(); + assert trackTotalHits == null || trackTotalHits; + trackTotalHits = true; + } + TopDocs topDocs = searchHitsToTopDocs(searchHits, totalHits, shards); + topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore()), + searchResponse.isTimedOut(), searchResponse.isTerminatedEarly()); + topDocsList.add(topDocs); + } + + //after going through all the hits and collecting all their distinct shards, we can assign shardIndex and set it to the ScoreDocs + setShardIndex(shards, topDocsList); + TopDocs topDocs = mergeTopDocs(topDocsList, size, from); + SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats); + Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); + InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true)); + ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY); + //make failures ordering consistent with ordinary search and CCS + Arrays.sort(shardFailures, FAILURES_COMPARATOR); + InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, + new SearchProfileShardResults(profileResults), topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases); + long tookInMillis = searchTimeProvider.buildTookInMillis(); + return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters); + } + + private static final Comparator FAILURES_COMPARATOR = new Comparator() { + @Override + public int compare(ShardSearchFailure o1, ShardSearchFailure o2) { + ShardId shardId1 = extractShardId(o1); + ShardId shardId2 = extractShardId(o2); + if (shardId1 == null && shardId2 == null) { + return 0; + } + if (shardId1 == null) { + return -1; + } + if (shardId2 == null) { + return 1; + } + return shardId1.compareTo(shardId2); + } + + private ShardId extractShardId(ShardSearchFailure failure) { + SearchShardTarget shard = failure.shard(); + if (shard != null) { + return shard.getShardId(); + } + Throwable cause = failure.getCause(); + if (cause instanceof ElasticsearchException) { + ElasticsearchException e = (ElasticsearchException) cause; + return e.getShardId(); + } + return null; + } + }; + + private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map shards) { + SearchHit[] hits = searchHits.getHits(); + ScoreDoc[] scoreDocs = new ScoreDoc[hits.length]; + final TopDocs topDocs; + if (searchHits.getSortFields() != null) { + if (searchHits.getCollapseField() != null) { + assert searchHits.getCollapseValues() != null; + topDocs = new CollapseTopFieldDocs(searchHits.getCollapseField(), totalHits, scoreDocs, + searchHits.getSortFields(), searchHits.getCollapseValues()); + } else { + topDocs = new TopFieldDocs(totalHits, scoreDocs, searchHits.getSortFields()); + } + } else { + topDocs = new TopDocs(totalHits, scoreDocs); + } + + for (int i = 0; i < hits.length; i++) { + SearchHit hit = hits[i]; + ShardId shardId = hit.getShard().getShardId(); + shards.putIfAbsent(shardId, null); + final SortField[] sortFields = searchHits.getSortFields(); + final Object[] sortValues; + if (sortFields == null) { + sortValues = null; + } else { + if (sortFields.length == 1 && sortFields[0].getType() == SortField.Type.SCORE) { + sortValues = new Object[]{hit.getScore()}; + } else { + sortValues = hit.getRawSortValues(); + } + } + scoreDocs[i] = new FieldDocAndSearchHit(hit.docId(), hit.getScore(), sortValues, hit); + } + return topDocs; + } + + private static void setShardIndex(Map shards, List topDocsList) { + int shardIndex = 0; + for (Map.Entry shard : shards.entrySet()) { + shard.setValue(shardIndex++); + } + //and go through all the scoreDocs from each cluster and set their corresponding shardIndex + for (TopDocs topDocs : topDocsList) { + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc; + //When hits come from the indices with same names on multiple clusters and same shard identifier, we rely on such indices + //to have a different uuid across multiple clusters. That's how they will get a different shardIndex. + ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId(); + fieldDocAndSearchHit.shardIndex = shards.get(shardId); + } + } + } + + private static SearchHits topDocsToSearchHits(TopDocs topDocs, TopDocsStats topDocsStats) { + SearchHit[] searchHits = new SearchHit[topDocs.scoreDocs.length]; + for (int i = 0; i < topDocs.scoreDocs.length; i++) { + FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i]; + searchHits[i] = scoreDoc.searchHit; + } + + SortField[] sortFields = null; + String collapseField = null; + Object[] collapseValues = null; + if (topDocs instanceof TopFieldDocs) { + sortFields = ((TopFieldDocs)topDocs).fields; + if (topDocs instanceof CollapseTopFieldDocs) { + CollapseTopFieldDocs collapseTopFieldDocs = (CollapseTopFieldDocs)topDocs; + collapseField = collapseTopFieldDocs.field; + collapseValues = collapseTopFieldDocs.collapseValues; + } + } + return new SearchHits(searchHits, topDocsStats.getTotalHits(), topDocsStats.getMaxScore(), + sortFields, collapseField, collapseValues); + } + + private static final class FieldDocAndSearchHit extends FieldDoc { + private final SearchHit searchHit; + + //to simplify things, we use a FieldDoc all the time, even when only a ScoreDoc is needed, in which case fields are null. + FieldDocAndSearchHit(int doc, float score, Object[] fields, SearchHit searchHit) { + super(doc, score, fields); + this.searchHit = searchHit; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 88e2764982c..3f03c521df5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -61,6 +61,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -140,7 +141,7 @@ public class TransportSearchAction extends HandledTransportAction suggestion : reducedSuggest(results)) { int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); accumulatedLength += suggestionSize; @@ -126,12 +125,12 @@ public class SearchPhaseControllerTests extends ESTestCase { from = first.get().queryResult().from(); size = first.get().queryResult().size(); } - SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(); + SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE); ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs; results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, useConstantScore); - SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(); + SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE); ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs; assertEquals(sortedDocs.length, sortedDocs2.length); for (int i = 0; i < sortedDocs.length; i++) { @@ -139,7 +138,7 @@ public class SearchPhaseControllerTests extends ESTestCase { assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex); assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f); } - assertEquals(topDocsStats.maxScore, topDocsStats2.maxScore, 0.0f); + assertEquals(topDocsStats.getMaxScore(), topDocsStats2.getMaxScore(), 0.0f); assertEquals(topDocsStats.getTotalHits().value, topDocsStats2.getTotalHits().value); assertEquals(topDocsStats.getTotalHits().relation, topDocsStats2.getTotalHits().relation); assertEquals(topDocsStats.fetchHits, topDocsStats2.fetchHits); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java new file mode 100644 index 00000000000..d02b712eaae --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -0,0 +1,561 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.range.InternalDateRange; +import org.elasticsearch.search.aggregations.bucket.range.Range; +import org.elasticsearch.search.aggregations.metrics.InternalMax; +import org.elasticsearch.search.aggregations.metrics.Max; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.profile.ProfileShardResult; +import org.elasticsearch.search.profile.SearchProfileShardResults; +import org.elasticsearch.search.profile.SearchProfileShardResultsTests; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.completion.CompletionSuggestion; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.RemoteClusterAware; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class SearchResponseMergerTests extends ESTestCase { + + private int numResponses; + private ExecutorService executorService; + + @Before + public void init() { + numResponses = randomIntBetween(2, 10); + executorService = Executors.newFixedThreadPool(numResponses); + } + + private void addResponse(SearchResponseMerger searchResponseMerger, SearchResponse searchResponse) { + if (randomBoolean()) { + executorService.submit(() -> searchResponseMerger.add(searchResponse)); + } else { + searchResponseMerger.add(searchResponse); + } + } + + private void awaitResponsesAdded() throws InterruptedException { + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + } + + public void testMergeTookInMillis() throws InterruptedException { + long currentRelativeTime = randomLong(); + SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); + SearchResponseMerger merger = new SearchResponseMerger(randomIntBetween(0, 1000), randomIntBetween(0, 10000), + SearchContext.TRACK_TOTAL_HITS_ACCURATE, timeProvider, flag -> null); + for (int i = 0; i < numResponses; i++) { + SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, 1, 1, 0, randomLong(), + ShardSearchFailure.EMPTY_ARRAY, SearchResponseTests.randomClusters()); + addResponse(merger, searchResponse); + } + awaitResponsesAdded(); + SearchResponse searchResponse = merger.getMergedResponse(SearchResponse.Clusters.EMPTY); + assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); + } + + public void testMergeShardFailures() throws InterruptedException { + SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, + searchTimeProvider, flag -> null); + PriorityQueue> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1)); + int numIndices = numResponses * randomIntBetween(1, 3); + Iterator> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); + for (int i = 0; i < numResponses; i++) { + Map.Entry entry = indicesPerCluster.next(); + String clusterAlias = entry.getKey(); + Index[] indices = entry.getValue(); + int numFailures = randomIntBetween(1, 10); + ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; + for (int j = 0; j < numFailures; j++) { + ShardId shardId = new ShardId(randomFrom(indices), j); + ShardSearchFailure failure; + if (randomBoolean()) { + SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null); + failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget); + } else { + ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException()); + elasticsearchException.setShard(shardId); + failure = new ShardSearchFailure(elasticsearchException); + } + shardSearchFailures[j] = failure; + priorityQueue.add(Tuple.tuple(shardId, failure)); + } + SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, + 1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); + addResponse(merger, searchResponse); + } + awaitResponsesAdded(); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = merger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(priorityQueue.size(), mergedResponse.getFailedShards()); + ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures(); + assertEquals(priorityQueue.size(), shardFailures.length); + for (ShardSearchFailure shardFailure : shardFailures) { + ShardSearchFailure expected = priorityQueue.poll().v2(); + assertSame(expected, shardFailure); + } + } + + public void testMergeShardFailuresNullShardId() throws InterruptedException { + SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, + searchTimeProvider, flag -> null); + List expectedFailures = new ArrayList<>(); + for (int i = 0; i < numResponses; i++) { + int numFailures = randomIntBetween(1, 50); + ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures]; + for (int j = 0; j < numFailures; j++) { + ShardSearchFailure shardSearchFailure = new ShardSearchFailure(new ElasticsearchException(new IllegalArgumentException())); + shardSearchFailures[j] = shardSearchFailure; + expectedFailures.add(shardSearchFailure); + } + SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null, + 1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY); + addResponse(merger, searchResponse); + } + awaitResponsesAdded(); + ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY).getShardFailures(); + assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); + } + + public void testMergeProfileResults() throws InterruptedException { + SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0); + SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE, + searchTimeProvider, flag -> null); + Map expectedProfile = new HashMap<>(); + for (int i = 0; i < numResponses; i++) { + SearchProfileShardResults profile = SearchProfileShardResultsTests.createTestItem(); + expectedProfile.putAll(profile.getShardResults()); + SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, null, null, profile, false, null, 1); + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, 1, 1, 0, 100L, + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + addResponse(merger, searchResponse); + } + awaitResponsesAdded(); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = merger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(0, mergedResponse.getFailedShards()); + assertEquals(0, mergedResponse.getShardFailures().length); + assertEquals(expectedProfile, mergedResponse.getProfileResults()); + } + + public void testMergeSuggestions() throws InterruptedException { + String suggestionName = randomAlphaOfLengthBetween(4, 8); + boolean skipDuplicates = randomBoolean(); + int size = randomIntBetween(1, 100); + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(0, 0, 0, new SearchTimeProvider(0, 0, () -> 0), flag -> null); + for (int i = 0; i < numResponses; i++) { + List>> suggestions = + new ArrayList<>(); + CompletionSuggestion completionSuggestion = new CompletionSuggestion(suggestionName, size, skipDuplicates); + CompletionSuggestion.Entry options = new CompletionSuggestion.Entry(new Text("suggest"), 0, 10); + options.addOption(new CompletionSuggestion.Entry.Option(randomInt(), new Text("suggestion"), i, Collections.emptyMap())); + completionSuggestion.addTerm(options); + suggestions.add(completionSuggestion); + Suggest suggest = new Suggest(suggestions); + SearchHits searchHits = new SearchHits(new SearchHit[0], null, Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, null, suggest, null, false, null, 1); + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, 1, 1, 0, randomLong(), + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + addResponse(searchResponseMerger, searchResponse); + } + awaitResponsesAdded(); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(0, mergedResponse.getFailedShards()); + assertEquals(0, mergedResponse.getShardFailures().length); + Suggest.Suggestion> suggestion = + mergedResponse.getSuggest().getSuggestion(suggestionName); + assertEquals(1, suggestion.getEntries().size()); + Suggest.Suggestion.Entry options = suggestion.getEntries().get(0); + assertEquals(skipDuplicates ? 1 : Math.min(numResponses, size), options.getOptions().size()); + int i = numResponses; + for (Suggest.Suggestion.Entry.Option option : options) { + assertEquals("suggestion", option.getText().string()); + assertEquals(--i, option.getScore(), 0f); + } + } + + public void testMergeAggs() throws InterruptedException { + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(0, 0, 0, new SearchTimeProvider(0, 0, () -> 0), + flag -> new InternalAggregation.ReduceContext(null, null, flag)); + String maxAggName = randomAlphaOfLengthBetween(5, 8); + String rangeAggName = randomAlphaOfLengthBetween(5, 8); + int totalCount = 0; + double maxValue = Double.MIN_VALUE; + for (int i = 0; i < numResponses; i++) { + double value = randomDouble(); + maxValue = Math.max(value, maxValue); + InternalMax max = new InternalMax(maxAggName, value, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()); + InternalDateRange.Factory factory = new InternalDateRange.Factory(); + int count = randomIntBetween(1, 1000); + totalCount += count; + InternalDateRange.Bucket bucket = factory.createBucket("bucket", 0, 10000, count, InternalAggregations.EMPTY, + false, DocValueFormat.RAW); + InternalDateRange range = factory.create(rangeAggName, Collections.singletonList(bucket), DocValueFormat.RAW, false, + Collections.emptyList(), Collections.emptyMap()); + InternalAggregations aggs = new InternalAggregations(Arrays.asList(range, max)); + SearchHits searchHits = new SearchHits(new SearchHit[0], null, Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, aggs, null, null, false, null, 1); + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, 1, 1, 0, randomLong(), + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + addResponse(searchResponseMerger, searchResponse); + } + awaitResponsesAdded(); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + assertSame(clusters, mergedResponse.getClusters()); + assertEquals(numResponses, mergedResponse.getTotalShards()); + assertEquals(numResponses, mergedResponse.getSuccessfulShards()); + assertEquals(0, mergedResponse.getSkippedShards()); + assertEquals(0, mergedResponse.getFailedShards()); + assertEquals(0, mergedResponse.getShardFailures().length); + assertEquals(0, mergedResponse.getHits().getHits().length); + assertEquals(2, mergedResponse.getAggregations().asList().size()); + Max max = mergedResponse.getAggregations().get(maxAggName); + assertEquals(maxValue, max.getValue(), 0d); + Range range = mergedResponse.getAggregations().get(rangeAggName); + assertEquals(1, range.getBuckets().size()); + Range.Bucket bucket = range.getBuckets().get(0); + assertEquals("0.0", bucket.getFromAsString()); + assertEquals("10000.0", bucket.getToAsString()); + assertEquals(totalCount, bucket.getDocCount()); + } + + public void testMergeSearchHits() throws InterruptedException { + final long currentRelativeTime = randomLong(); + final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); + final int size = randomIntBetween(0, 100); + final int from = size > 0 ? randomIntBetween(0, 100) : 0; + final int requestedSize = from + size; + final SortField[] sortFields; + final String collapseField; + boolean scoreSort = false; + if (randomBoolean()) { + int numFields = randomIntBetween(1, 3); + sortFields = new SortField[numFields]; + for (int i = 0; i < numFields; i++) { + final SortField sortField; + if (randomBoolean()) { + sortField = new SortField("field-" + i, SortField.Type.INT, randomBoolean()); + } else { + scoreSort = true; + sortField = SortField.FIELD_SCORE; + } + sortFields[i] = sortField; + } + collapseField = randomBoolean() ? "collapse" : null; + } else { + collapseField = null; + sortFields = null; + scoreSort = true; + } + Tuple randomTrackTotalHits = randomTrackTotalHits(); + int trackTotalHitsUpTo = randomTrackTotalHits.v1(); + TotalHits.Relation totalHitsRelation = randomTrackTotalHits.v2(); + + PriorityQueue priorityQueue = new PriorityQueue<>(new SearchHitComparator(sortFields)); + SearchResponseMerger searchResponseMerger = new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, flag -> null); + + TotalHits expectedTotalHits = null; + int expectedTotal = 0; + int expectedSuccessful = 0; + int expectedSkipped = 0; + int expectedReducePhases = 1; + boolean expectedTimedOut = false; + Boolean expectedTerminatedEarly = null; + float expectedMaxScore = Float.NEGATIVE_INFINITY; + int numIndices = requestedSize == 0 ? 0 : randomIntBetween(1, requestedSize); + Iterator> indicesIterator = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); + for (int i = 0; i < numResponses; i++) { + Map.Entry entry = indicesIterator.next(); + String clusterAlias = entry.getKey().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? null : entry.getKey(); + Index[] indices = entry.getValue(); + int total = randomIntBetween(1, 1000); + expectedTotal += total; + int successful = randomIntBetween(1, total); + expectedSuccessful += successful; + int skipped = randomIntBetween(1, total); + expectedSkipped += skipped; + + TotalHits totalHits = null; + if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) { + totalHits = new TotalHits(randomLongBetween(0, 1000), totalHitsRelation); + long previousValue = expectedTotalHits == null ? 0 : expectedTotalHits.value; + expectedTotalHits = new TotalHits(Math.min(previousValue + totalHits.value, trackTotalHitsUpTo), totalHitsRelation); + } + + final int numDocs = totalHits == null || totalHits.value >= requestedSize ? requestedSize : (int) totalHits.value; + int scoreFactor = randomIntBetween(1, numResponses); + float maxScore = scoreSort ? numDocs * scoreFactor : Float.NaN; + SearchHit[] hits = randomSearchHitArray(numDocs, numResponses, clusterAlias, indices, maxScore, scoreFactor, + sortFields, priorityQueue); + expectedMaxScore = Math.max(expectedMaxScore, maxScore); + + Object[] collapseValues = null; + if (collapseField != null) { + collapseValues = new Object[numDocs]; + for (int j = 0; j < numDocs; j++) { + //set different collapse values for each cluster for simplicity + collapseValues[j] = j + 1000 * i; + } + } + + SearchHits searchHits = new SearchHits(hits, totalHits, maxScore == Float.NEGATIVE_INFINITY ? Float.NaN : maxScore, + sortFields, collapseField, collapseValues); + + int numReducePhases = randomIntBetween(1, 5); + expectedReducePhases += numReducePhases; + boolean timedOut = rarely(); + expectedTimedOut = expectedTimedOut || timedOut; + Boolean terminatedEarly = frequently() ? null : true; + expectedTerminatedEarly = expectedTerminatedEarly == null ? terminatedEarly : expectedTerminatedEarly; + + InternalSearchResponse internalSearchResponse = new InternalSearchResponse( + searchHits, null, null, null, timedOut, terminatedEarly, numReducePhases); + + SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, total, successful, skipped, + randomLong(), ShardSearchFailure.EMPTY_ARRAY, SearchResponseTests.randomClusters()); + + addResponse(searchResponseMerger, searchResponse); + } + + awaitResponsesAdded(); + + final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters); + + assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); + assertEquals(expectedTotal, searchResponse.getTotalShards()); + assertEquals(expectedSuccessful, searchResponse.getSuccessfulShards()); + assertEquals(expectedSkipped, searchResponse.getSkippedShards()); + assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(expectedReducePhases, searchResponse.getNumReducePhases()); + assertEquals(expectedTimedOut, searchResponse.isTimedOut()); + assertEquals(expectedTerminatedEarly, searchResponse.isTerminatedEarly()); + + assertSame(clusters, searchResponse.getClusters()); + assertNull(searchResponse.getScrollId()); + + SearchHits searchHits = searchResponse.getHits(); + assertArrayEquals(sortFields, searchHits.getSortFields()); + assertEquals(collapseField, searchHits.getCollapseField()); + if (expectedTotalHits == null) { + assertNull(searchHits.getTotalHits()); + } else { + assertNotNull(searchHits.getTotalHits()); + assertEquals(expectedTotalHits.value, searchHits.getTotalHits().value); + assertSame(expectedTotalHits.relation, searchHits.getTotalHits().relation); + } + if (expectedMaxScore == Float.NEGATIVE_INFINITY) { + assertTrue(Float.isNaN(searchHits.getMaxScore())); + } else { + assertEquals(expectedMaxScore, searchHits.getMaxScore(), 0f); + } + + for (int i = 0; i < from; i++) { + priorityQueue.poll(); + } + SearchHit[] hits = searchHits.getHits(); + if (collapseField != null) { + assertEquals(hits.length, searchHits.getCollapseValues().length); + } else { + assertNull(searchHits.getCollapseValues()); + } + assertThat(hits.length, lessThanOrEqualTo(size)); + for (SearchHit hit : hits) { + SearchHit expected = priorityQueue.poll(); + assertSame(expected, hit); + } + } + + private static Tuple randomTrackTotalHits() { + switch(randomIntBetween(0, 2)) { + case 0: + return Tuple.tuple(SearchContext.TRACK_TOTAL_HITS_DISABLED, null); + case 1: + return Tuple.tuple(randomIntBetween(10, 1000), TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO); + case 2: + return Tuple.tuple(SearchContext.TRACK_TOTAL_HITS_ACCURATE, TotalHits.Relation.EQUAL_TO); + default: + throw new UnsupportedOperationException(); + } + } + + private static SearchHit[] randomSearchHitArray(int numDocs, int numResponses, String clusterAlias, Index[] indices, float maxScore, + int scoreFactor, SortField[] sortFields, PriorityQueue priorityQueue) { + SearchHit[] hits = new SearchHit[numDocs]; + + int[] sortFieldFactors = new int[sortFields == null ? 0 : sortFields.length]; + for (int j = 0; j < sortFieldFactors.length; j++) { + sortFieldFactors[j] = randomIntBetween(1, numResponses); + } + + for (int j = 0; j < numDocs; j++) { + ShardId shardId = new ShardId(randomFrom(indices), randomIntBetween(0, 10)); + SearchShardTarget shardTarget = new SearchShardTarget(randomAlphaOfLengthBetween(3, 8), shardId, + clusterAlias, OriginalIndices.NONE); + SearchHit hit = new SearchHit(randomIntBetween(0, Integer.MAX_VALUE)); + + float score = Float.NaN; + if (Float.isNaN(maxScore) == false) { + score = (maxScore - j) * scoreFactor; + hit.score(score); + } + + hit.shard(shardTarget); + if (sortFields != null) { + Object[] rawSortValues = new Object[sortFields.length]; + DocValueFormat[] docValueFormats = new DocValueFormat[sortFields.length]; + for (int k = 0; k < sortFields.length; k++) { + SortField sortField = sortFields[k]; + if (sortField == SortField.FIELD_SCORE) { + hit.score(score); + rawSortValues[k] = score; + } else { + rawSortValues[k] = sortField.getReverse() ? numDocs * sortFieldFactors[k] - j : j; + } + docValueFormats[k] = DocValueFormat.RAW; + } + hit.sortValues(rawSortValues, docValueFormats); + } + hits[j] = hit; + priorityQueue.add(hit); + } + return hits; + } + + private static Map randomRealisticIndices(int numIndices, int numClusters) { + String[] indicesNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indicesNames[i] = randomAlphaOfLengthBetween(5, 10); + } + Map indicesPerCluster = new TreeMap<>(); + for (int i = 0; i < numClusters; i++) { + Index[] indices = new Index[indicesNames.length]; + for (int j = 0; j < indices.length; j++) { + //Realistically clusters have the same indices with same names, but different uuid + indices[j] = new Index(indicesNames[j], randomAlphaOfLength(10)); + } + String clusterAlias; + if (frequently() || indicesPerCluster.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + clusterAlias = randomAlphaOfLengthBetween(5, 10); + } else { + clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + } + indicesPerCluster.put(clusterAlias, indices); + } + return indicesPerCluster; + } + + private static final class SearchHitComparator implements Comparator { + + private final SortField[] sortFields; + + SearchHitComparator(SortField[] sortFields) { + this.sortFields = sortFields; + } + + @Override + public int compare(SearchHit a, SearchHit b) { + if (sortFields == null) { + int scoreCompare = Float.compare(b.getScore(), a.getScore()); + if (scoreCompare != 0) { + return scoreCompare; + } + } else { + for (int i = 0; i < sortFields.length; i++) { + SortField sortField = sortFields[i]; + if (sortField == SortField.FIELD_SCORE) { + int scoreCompare = Float.compare(b.getScore(), a.getScore()); + if (scoreCompare != 0) { + return scoreCompare; + } + } else { + Integer aSortValue = (Integer)a.getRawSortValues()[i]; + Integer bSortValue = (Integer)b.getRawSortValues()[i]; + final int compare; + if (sortField.getReverse()) { + compare = Integer.compare(bSortValue, aSortValue); + } else { + compare = Integer.compare(aSortValue, bSortValue); + } + if (compare != 0) { + return compare; + } + } + } + } + int shardIdCompareTo = a.getShard().getShardId().compareTo(b.getShard().getShardId()); + if (shardIdCompareTo != 0) { + return shardIdCompareTo; + } + return Integer.compare(a.docId(), b.docId()); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java index ecd2dc44a70..f07be38765f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java @@ -128,7 +128,7 @@ public class SearchResponseTests extends ESTestCase { shardSearchFailures, randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY); } - private static SearchResponse.Clusters randomClusters() { + static SearchResponse.Clusters randomClusters() { int totalClusters = randomIntBetween(0, 10); int successfulClusters = randomIntBetween(0, totalClusters); int skippedClusters = totalClusters - successfulClusters;