diff --git a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 920dd1b0009..a0e313f1d73 100644 --- a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -98,27 +98,26 @@ final class FetchSearchPhase extends SearchPhase { final int numShards = context.getNumShards(); final boolean isScrollSearch = context.getRequest().scroll() != null; List phaseResults = queryResults.asList(); - ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults); String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null; final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce(); final boolean queryAndFetchOptimization = queryResults.length() == 1; final Runnable finishPhase = () - -> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ? + -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ? queryResults : fetchResults); if (queryAndFetchOptimization) { assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null; // query AND fetch optimization finishPhase.run(); } else { - final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs); - if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs); + if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return phaseResults.stream() .map(e -> e.queryResult()) .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources finishPhase.run(); } else { final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ? - searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards) + searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards) : null; final CountedCollector counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r), docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not @@ -188,7 +187,7 @@ final class FetchSearchPhase extends SearchPhase { private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { // we only release search context that we did not fetch from if we are not scrolling // and if it has at lease one hit that didn't make it to the global topDocs - if (context.getRequest().scroll() == null && queryResult.hasHits()) { + if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) { try { Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId()); context.sendReleaseSearchContext(queryResult.getRequestId(), connection); @@ -198,11 +197,11 @@ final class FetchSearchPhase extends SearchPhase { } } - private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, + private void moveToNextPhase(SearchPhaseController searchPhaseController, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, AtomicArray fetchResultsArr) { final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null, - sortedDocs, reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get); + reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get); context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId))); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 38d793b93ed..13b4b2f73d4 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; @@ -56,7 +57,6 @@ import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -147,42 +147,47 @@ public final class SearchPhaseController extends AbstractComponent { * @param ignoreFrom Whether to ignore the from and sort all hits in each shard result. * Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase. * @param results the search phase results to obtain the sort docs from + * @param bufferedTopDocs the pre-consumed buffered top docs + * @param topDocsStats the top docs stats to fill + * @param from the offset into the search results top docs + * @param size the number of hits to return from the merged top docs */ - public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection results) throws IOException { + public SortedTopDocs sortDocs(boolean ignoreFrom, Collection results, + final Collection bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) { if (results.isEmpty()) { - return EMPTY_DOCS; + return SortedTopDocs.EMPTY; } - final Collection topDocs = new ArrayList<>(); + final Collection topDocs = bufferedTopDocs == null ? new ArrayList<>() : bufferedTopDocs; final Map>> groupedCompletionSuggestions = new HashMap<>(); - int from = -1; - int size = -1; - for (SearchPhaseResult sortedResult : results) { + for (SearchPhaseResult sortedResult : results) { // TODO we can move this loop into the reduce call to only loop over this once /* We loop over all results once, group together the completion suggestions if there are any and collect relevant * top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road * this allowed to remove a single shared optimization code here since now we don't materialized a dense array of * top docs anymore but instead only pass relevant results / top docs to the merge method*/ QuerySearchResult queryResult = sortedResult.queryResult(); - if (queryResult.hasHits()) { - from = queryResult.from(); - size = queryResult.size(); - TopDocs td = queryResult.topDocs(); - if (td != null && td.scoreDocs.length > 0) { + if (queryResult.hasConsumedTopDocs() == false) { // already consumed? + final TopDocs td = queryResult.consumeTopDocs(); + assert td != null; + topDocsStats.add(td); + if (td.scoreDocs.length > 0) { // make sure we set the shard index before we add it - the consumer didn't do that yet setShardIndex(td, queryResult.getShardIndex()); topDocs.add(td); } + } + if (queryResult.hasSuggestHits()) { Suggest shardSuggest = queryResult.suggest(); - if (shardSuggest != null) { - for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { - suggestion.setShardIndex(sortedResult.getShardIndex()); - List> suggestions = - groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); - suggestions.add(suggestion); - } + for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { + suggestion.setShardIndex(sortedResult.getShardIndex()); + List> suggestions = + groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); + suggestions.add(suggestion); } } } - if (size != -1) { - final ScoreDoc[] mergedScoreDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from); + final boolean hasHits = (groupedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false; + if (hasHits) { + final TopDocs mergedTopDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from); + final ScoreDoc[] mergedScoreDocs = mergedTopDocs == null ? EMPTY_DOCS : mergedTopDocs.scoreDocs; ScoreDoc[] scoreDocs = mergedScoreDocs; if (groupedCompletionSuggestions.isEmpty() == false) { int numSuggestDocs = 0; @@ -204,23 +209,35 @@ public final class SearchPhaseController extends AbstractComponent { } } } - return scoreDocs; + final boolean isSortedByField; + final SortField[] sortFields; + if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) { + TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs; + isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs && + fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false; + sortFields = fieldDocs.fields; + } else { + isSortedByField = false; + sortFields = null; + } + return new SortedTopDocs(scoreDocs, isSortedByField, sortFields); } else { - // no relevant docs - just return an empty array - return EMPTY_DOCS; + // no relevant docs + return SortedTopDocs.EMPTY; } } - private ScoreDoc[] mergeTopDocs(Collection results, int topN, int from) { + TopDocs mergeTopDocs(Collection results, int topN, int from) { if (results.isEmpty()) { - return EMPTY_DOCS; + return null; } + assert results.isEmpty() == false; final boolean setShardIndex = false; final TopDocs topDocs = results.stream().findFirst().get(); final TopDocs mergedTopDocs; final int numShards = results.size(); if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them. - return topDocs.scoreDocs; + return topDocs; } else if (topDocs instanceof CollapseTopFieldDocs) { CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) topDocs; final Sort sort = new Sort(firstTopDocs.fields); @@ -235,7 +252,7 @@ public final class SearchPhaseController extends AbstractComponent { final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]); mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, setShardIndex); } - return mergedTopDocs.scoreDocs; + return mergedTopDocs; } private static void setShardIndex(TopDocs topDocs, int shardIndex) { @@ -249,12 +266,12 @@ public final class SearchPhaseController extends AbstractComponent { } } - public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, - ScoreDoc[] sortedScoreDocs, int numShards) { - ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; - if (reducedQueryPhase.isEmpty() == false) { + public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) { + final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; + if (reducedQueryPhase.isEmptyResult == false) { + final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs; // from is always zero as when we use scroll, we ignore from - long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.oneResult.size()); + long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size); // with collapsing we can have more hits than sorted docs size = Math.min(sortedScoreDocs.length, size); for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) { @@ -288,13 +305,13 @@ public final class SearchPhaseController extends AbstractComponent { * Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named * completion suggestion ordered by suggestion name */ - public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, - ReducedQueryPhase reducedQueryPhase, + public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reducedQueryPhase, Collection fetchResults, IntFunction resultsLookup) { - if (reducedQueryPhase.isEmpty()) { + if (reducedQueryPhase.isEmptyResult) { return InternalSearchResponse.empty(); } - SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResults, resultsLookup); + ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs; + SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup); if (reducedQueryPhase.suggest != null) { if (!fetchResults.isEmpty()) { int currentOffset = hits.getHits().length; @@ -329,21 +346,15 @@ public final class SearchPhaseController extends AbstractComponent { return reducedQueryPhase.buildResponse(hits); } - private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs, + private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, Collection fetchResults, IntFunction resultsLookup) { - boolean sorted = false; + final boolean sorted = reducedQueryPhase.isSortedByField; + ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs; int sortScoreIndex = -1; - if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) { - TopFieldDocs fieldDocs = (TopFieldDocs) reducedQueryPhase.oneResult.queryResult().topDocs(); - if (fieldDocs instanceof CollapseTopFieldDocs && - fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) { - sorted = false; - } else { - sorted = true; - for (int i = 0; i < fieldDocs.fields.length; i++) { - if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) { - sortScoreIndex = i; - } + if (sorted) { + for (int i = 0; i < reducedQueryPhase.sortField.length; i++) { + if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) { + sortScoreIndex = i; } } } @@ -351,8 +362,8 @@ public final class SearchPhaseController extends AbstractComponent { for (SearchPhaseResult entry : fetchResults) { entry.fetchResult().initCounter(); } - int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from(); - int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size()); + int from = ignoreFrom ? 0 : reducedQueryPhase.from; + int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size); // with collapsing we can have more fetch hits than sorted docs numSearchHits = Math.min(sortedDocs.length, numSearchHits); // merge hits @@ -376,7 +387,7 @@ public final class SearchPhaseController extends AbstractComponent { searchHit.shard(fetchResult.getSearchShardTarget()); if (sorted) { FieldDoc fieldDoc = (FieldDoc) shardDoc; - searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats()); + searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats); if (sortScoreIndex != -1) { searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue()); } @@ -393,42 +404,42 @@ public final class SearchPhaseController extends AbstractComponent { * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public ReducedQueryPhase reducedQueryPhase(List queryResults) { - return reducedQueryPhase(queryResults, null, 0); + public ReducedQueryPhase reducedQueryPhase(Collection queryResults, boolean isScrollRequest) { + return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(), 0, isScrollRequest); } /** * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results - * @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed + * @param bufferedAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed + * from all non-null query results. + * @param bufferedTopDocs a list of pre-collected / buffered top docs. if this list is non-null all top docs have been consumed * from all non-null query results. * @param numReducePhases the number of non-final reduce phases applied to the query results. * @see QuerySearchResult#consumeAggs() * @see QuerySearchResult#consumeProfileResult() */ private ReducedQueryPhase reducedQueryPhase(Collection queryResults, - List bufferdAggs, int numReducePhases) { + List bufferedAggs, List bufferedTopDocs, + TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) { assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase - long totalHits = 0; - long fetchHits = 0; - float maxScore = Float.NEGATIVE_INFINITY; boolean timedOut = false; Boolean terminatedEarly = null; if (queryResults.isEmpty()) { // early terminate we have nothing to reduce - return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null, - numReducePhases); + return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore, + timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null, null, numReducePhases, false, 0, 0, true); } final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); final boolean hasSuggest = firstResult.suggest() != null; final boolean hasProfileResults = firstResult.hasProfileResults(); final boolean consumeAggs; final List aggregationsList; - if (bufferdAggs != null) { + if (bufferedAggs != null) { consumeAggs = false; // we already have results from intermediate reduces and just need to perform the final reduce assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?"; - aggregationsList = bufferdAggs; + aggregationsList = bufferedAggs; } else if (firstResult.hasAggs()) { // the number of shards was less than the buffer size so we reduce agg results directly aggregationsList = new ArrayList<>(queryResults.size()); @@ -443,8 +454,12 @@ public final class SearchPhaseController extends AbstractComponent { final Map> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); final Map profileResults = hasProfileResults ? new HashMap<>(queryResults.size()) : Collections.emptyMap(); + int from = 0; + int size = 0; for (SearchPhaseResult entry : queryResults) { QuerySearchResult result = entry.queryResult(); + from = result.from(); + size = result.size(); if (result.searchTimedOut()) { timedOut = true; } @@ -455,11 +470,6 @@ public final class SearchPhaseController extends AbstractComponent { terminatedEarly = true; } } - totalHits += result.topDocs().totalHits; - fetchHits += result.topDocs().scoreDocs.length; - if (!Float.isNaN(result.topDocs().getMaxScore())) { - maxScore = Math.max(maxScore, result.topDocs().getMaxScore()); - } if (hasSuggest) { assert result.suggest() != null; for (Suggestion> suggestion : result.suggest()) { @@ -480,8 +490,11 @@ public final class SearchPhaseController extends AbstractComponent { final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, firstResult.pipelineAggregators(), reduceContext); final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); - return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations, - shardResults, numReducePhases); + final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size); + return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore, + timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields, + firstResult != null ? firstResult.sortValueFormats() : null, + numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null); } @@ -522,8 +535,6 @@ public final class SearchPhaseController extends AbstractComponent { final boolean timedOut; // non null and true if at least one reduced result was terminated early final Boolean terminatedEarly; - // an non-null arbitrary query result if was at least one reduced result - final QuerySearchResult oneResult; // the reduced suggest results final Suggest suggest; // the reduced internal aggregations @@ -532,10 +543,25 @@ public final class SearchPhaseController extends AbstractComponent { final SearchProfileShardResults shardResults; // the number of reduces phases final int numReducePhases; + // the searches merged top docs + final ScoreDoc[] scoreDocs; + // the top docs sort fields used to sort the score docs, null if the results are not sorted + final SortField[] sortField; + // true iff the result score docs is sorted by a field (not score), this implies that sortField is set. + final boolean isSortedByField; + // the size of the top hits to return + final int size; + // true iff the query phase had no results. Otherwise false + final boolean isEmptyResult; + // the offset into the merged top hits + final int from; + // sort value formats used to sort / format the result + final DocValueFormat[] sortValueFormats; - ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, - QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations, - SearchProfileShardResults shardResults, int numReducePhases) { + ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest, + InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs, + SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size, + int from, boolean isEmptyResult) { if (numReducePhases <= 0) { throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases); } @@ -548,27 +574,26 @@ public final class SearchPhaseController extends AbstractComponent { } this.timedOut = timedOut; this.terminatedEarly = terminatedEarly; - this.oneResult = oneResult; this.suggest = suggest; this.aggregations = aggregations; this.shardResults = shardResults; this.numReducePhases = numReducePhases; + this.scoreDocs = scoreDocs; + this.sortField = sortFields; + this.isSortedByField = isSortedByField; + this.size = size; + this.from = from; + this.isEmptyResult = isEmptyResult; + this.sortValueFormats = sortValueFormats; } /** * Creates a new search response from the given merged hits. - * @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, Collection, IntFunction) + * @see #merge(boolean, ReducedQueryPhase, Collection, IntFunction) */ public InternalSearchResponse buildResponse(SearchHits hits) { return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases); } - - /** - * Returns true iff the query phase had no results. Otherwise false - */ - public boolean isEmpty() { - return oneResult == null; - } } /** @@ -577,12 +602,16 @@ public final class SearchPhaseController extends AbstractComponent { * This implementation can be configured to batch up a certain amount of results and only reduce them * iff the buffer is exhausted. */ - static final class QueryPhaseResultConsumer - extends InitialSearchPhase.SearchPhaseResults { - private final InternalAggregations[] buffer; + static final class QueryPhaseResultConsumer extends InitialSearchPhase.SearchPhaseResults { + private final InternalAggregations[] aggsBuffer; + private final TopDocs[] topDocsBuffer; + private final boolean hasAggs; + private final boolean hasTopDocs; + private final int bufferSize; private int index; private final SearchPhaseController controller; private int numReducePhases = 0; + private final TopDocsStats topDocsStats = new TopDocsStats(); /** * Creates a new {@link QueryPhaseResultConsumer} @@ -591,7 +620,8 @@ public final class SearchPhaseController extends AbstractComponent { * @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results * the buffer is used to incrementally reduce aggregation results before all shards responded. */ - private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize) { + private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize, + boolean hasTopDocs, boolean hasAggs) { super(expectedResultSize); if (expectedResultSize != 1 && bufferSize < 2) { throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result"); @@ -599,39 +629,68 @@ public final class SearchPhaseController extends AbstractComponent { if (expectedResultSize <= bufferSize) { throw new IllegalArgumentException("buffer size must be less than the expected result size"); } + if (hasAggs == false && hasTopDocs == false) { + throw new IllegalArgumentException("either aggs or top docs must be present"); + } this.controller = controller; // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. - this.buffer = new InternalAggregations[bufferSize]; + this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0]; + this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0]; + this.hasTopDocs = hasTopDocs; + this.hasAggs = hasAggs; + this.bufferSize = bufferSize; + } @Override public void consumeResult(SearchPhaseResult result) { super.consumeResult(result); QuerySearchResult queryResult = result.queryResult(); - assert queryResult.hasAggs() : "this collector should only be used if aggs are requested"; consumeInternal(queryResult); } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { - InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs(); - if (index == buffer.length) { - InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer)); - Arrays.fill(buffer, null); + if (index == bufferSize) { + if (hasAggs) { + InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer)); + Arrays.fill(aggsBuffer, null); + aggsBuffer[0] = reducedAggs; + } + if (hasTopDocs) { + TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer), + querySearchResult.from() + querySearchResult.size() // we have to merge here in the same way we collect on a shard + , 0); + Arrays.fill(topDocsBuffer, null); + topDocsBuffer[0] = reducedTopDocs; + } numReducePhases++; - buffer[0] = reducedAggs; index = 1; } final int i = index++; - buffer[i] = aggregations; + if (hasAggs) { + aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); + } + if (hasTopDocs) { + final TopDocs topDocs = querySearchResult.consumeTopDocs(); // can't be null + topDocsStats.add(topDocs); + SearchPhaseController.setShardIndex(topDocs, querySearchResult.getShardIndex()); + topDocsBuffer[i] = topDocs; + } } - private synchronized List getRemaining() { - return Arrays.asList(buffer).subList(0, index); + private synchronized List getRemainingAggs() { + return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null; } + private synchronized List getRemainingTopDocs() { + return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null; + } + + @Override public ReducedQueryPhase reduce() { - return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases); + return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, + numReducePhases, false); } /** @@ -649,17 +708,49 @@ public final class SearchPhaseController extends AbstractComponent { */ InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) { SearchSourceBuilder source = request.source(); - if (source != null && source.aggregations() != null) { + boolean isScrollRequest = request.scroll() != null; + final boolean hasAggs = source != null && source.aggregations() != null; + final boolean hasTopDocs = source == null || source.size() != 0; + + if (isScrollRequest == false && (hasAggs || hasTopDocs)) { + // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize()); + return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs); } } return new InitialSearchPhase.SearchPhaseResults(numShards) { @Override public ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList()); + return reducedQueryPhase(results.asList(), isScrollRequest); } }; } + + static final class TopDocsStats { + long totalHits; + long fetchHits; + float maxScore = Float.NEGATIVE_INFINITY; + + void add(TopDocs topDocs) { + totalHits += topDocs.totalHits; + fetchHits += topDocs.scoreDocs.length; + if (!Float.isNaN(topDocs.getMaxScore())) { + maxScore = Math.max(maxScore, topDocs.getMaxScore()); + } + } + } + + static final class SortedTopDocs { + static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null); + final ScoreDoc[] scoreDocs; + final boolean isSortedByField; + final SortField[] sortFields; + + SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields) { + this.scoreDocs = scoreDocs; + this.isSortedByField = isSortedByField; + this.sortFields = sortFields; + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index c39a9fe6f25..b3ebaed3cb6 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -173,9 +173,8 @@ final class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { private void innerFinishHim() throws Exception { List queryFetchSearchResults = queryFetchResults.asList(); - ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList()); - final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, - searchPhaseController.reducedQueryPhase(queryFetchSearchResults), queryFetchSearchResults, queryFetchResults::get); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, + searchPhaseController.reducedQueryPhase(queryFetchSearchResults, true), queryFetchSearchResults, queryFetchResults::get); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index 37071485a03..709738dcafb 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -55,7 +55,6 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private volatile AtomicArray shardFailures; final AtomicArray queryResults; final AtomicArray fetchResults; - private volatile ScoreDoc[] sortedShardDocs; private final AtomicInteger successfulOps; SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService, @@ -171,16 +170,15 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { } private void executeFetchPhase() throws Exception { - sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList()); - if (sortedShardDocs.length == 0) { - finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList())); + final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), + true); + if (reducedQueryPhase.scoreDocs.length == 0) { + finishHim(reducedQueryPhase); return; } - final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); - SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList()); - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, - queryResults.length()); + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), reducedQueryPhase.scoreDocs); + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, queryResults.length()); final CountDown counter = new CountDown(docIdsToLoad.length); for (int i = 0; i < docIdsToLoad.length; i++) { final int index = i; @@ -222,8 +220,8 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) { try { - final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase, - fetchResults.asList(), fetchResults::get); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, queryPhase, fetchResults.asList(), + fetchResults::get); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java index 2bf5e50a1c2..fa82aa0ac63 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java @@ -31,14 +31,6 @@ import java.util.concurrent.atomic.AtomicReferenceArray; * to get the concrete values as a list using {@link #asList()}. */ public class AtomicArray { - - private static final AtomicArray EMPTY = new AtomicArray(0); - - @SuppressWarnings("unchecked") - public static E empty() { - return (E) EMPTY; - } - private final AtomicReferenceArray array; private volatile List nonNullList; @@ -53,7 +45,6 @@ public class AtomicArray { return array.length(); } - /** * Sets the element at position {@code i} to the given value. * diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index a0352281952..e601cec0fea 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -259,7 +259,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv loadOrExecuteQueryPhase(request, context); - if (context.queryResult().hasHits() == false && context.scrollContext() == null) { + if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { freeContext(context.id()); } else { contextProcessedSuccessfully(context); @@ -341,7 +341,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv operationListener.onPreQueryPhase(context); long time = System.nanoTime(); queryPhase.execute(context); - if (context.queryResult().hasHits() == false && context.scrollContext() == null) { + if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase freeContext(context.id()); } else { diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index 83af0b9abd4..97f2681252b 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -166,7 +166,7 @@ public class FetchPhase implements SearchPhase { fetchSubPhase.hitsExecute(context, hits); } - context.fetchResult().hits(new SearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore())); + context.fetchResult().hits(new SearchHits(hits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore())); } private int findRootDocumentIfNested(SearchContext context, LeafReaderContext subReaderContext, int subDocId) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 13f32f74d0d..272c57fe980 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -142,7 +142,6 @@ public class QueryPhase implements SearchPhase { queryResult.searchTimedOut(false); final boolean doProfile = searchContext.getProfilers() != null; - final SearchType searchType = searchContext.searchType(); boolean rescore = false; try { queryResult.from(searchContext.from()); @@ -165,12 +164,7 @@ public class QueryPhase implements SearchPhase { if (searchContext.getProfilers() != null) { collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_COUNT, Collections.emptyList()); } - topDocsCallable = new Callable() { - @Override - public TopDocs call() throws Exception { - return new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0); - } - }; + topDocsCallable = () -> new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0); } else { // Perhaps have a dedicated scroll phase? final ScrollContext scrollContext = searchContext.scrollContext(); @@ -238,38 +232,35 @@ public class QueryPhase implements SearchPhase { if (doProfile) { collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TOP_HITS, Collections.emptyList()); } - topDocsCallable = new Callable() { - @Override - public TopDocs call() throws Exception { - final TopDocs topDocs; - if (topDocsCollector instanceof TopDocsCollector) { - topDocs = ((TopDocsCollector) topDocsCollector).topDocs(); - } else if (topDocsCollector instanceof CollapsingTopDocsCollector) { - topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs(); - } else { - throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName()); - } - if (scrollContext != null) { - if (scrollContext.totalHits == -1) { - // first round - scrollContext.totalHits = topDocs.totalHits; - scrollContext.maxScore = topDocs.getMaxScore(); - } else { - // subsequent round: the total number of hits and - // the maximum score were computed on the first round - topDocs.totalHits = scrollContext.totalHits; - topDocs.setMaxScore(scrollContext.maxScore); - } - if (searchContext.request().numberOfShards() == 1) { - // if we fetch the document in the same roundtrip, we already know the last emitted doc - if (topDocs.scoreDocs.length > 0) { - // set the last emitted doc - scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1]; - } - } - } - return topDocs; + topDocsCallable = () -> { + final TopDocs topDocs; + if (topDocsCollector instanceof TopDocsCollector) { + topDocs = ((TopDocsCollector) topDocsCollector).topDocs(); + } else if (topDocsCollector instanceof CollapsingTopDocsCollector) { + topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs(); + } else { + throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName()); } + if (scrollContext != null) { + if (scrollContext.totalHits == -1) { + // first round + scrollContext.totalHits = topDocs.totalHits; + scrollContext.maxScore = topDocs.getMaxScore(); + } else { + // subsequent round: the total number of hits and + // the maximum score were computed on the first round + topDocs.totalHits = scrollContext.totalHits; + topDocs.setMaxScore(scrollContext.maxScore); + } + if (searchContext.request().numberOfShards() == 1) { + // if we fetch the document in the same roundtrip, we already know the last emitted doc + if (topDocs.scoreDocs.length > 0) { + // set the last emitted doc + scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1]; + } + } + } + return topDocs; }; } diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 15403f99677..f071c62f12c 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -55,6 +55,9 @@ public final class QuerySearchResult extends SearchPhaseResult { private Boolean terminatedEarly = null; private ProfileShardResult profileShardResults; private boolean hasProfileResults; + private boolean hasScoreDocs; + private int totalHits; + private float maxScore; public QuerySearchResult() { } @@ -87,11 +90,34 @@ public final class QuerySearchResult extends SearchPhaseResult { } public TopDocs topDocs() { + if (topDocs == null) { + throw new IllegalStateException("topDocs already consumed"); + } + return topDocs; + } + + /** + * Returns true iff the top docs have already been consumed. + */ + public boolean hasConsumedTopDocs() { + return topDocs == null; + } + + /** + * Returns and nulls out the top docs for this search results. This allows to free up memory once the top docs are consumed. + * @throws IllegalStateException if the top docs have already been consumed. + */ + public TopDocs consumeTopDocs() { + TopDocs topDocs = this.topDocs; + if (topDocs == null) { + throw new IllegalStateException("topDocs already consumed"); + } + this.topDocs = null; return topDocs; } public void topDocs(TopDocs topDocs, DocValueFormat[] sortValueFormats) { - this.topDocs = topDocs; + setTopDocs(topDocs); if (topDocs.scoreDocs.length > 0 && topDocs.scoreDocs[0] instanceof FieldDoc) { int numFields = ((FieldDoc) topDocs.scoreDocs[0]).fields.length; if (numFields != sortValueFormats.length) { @@ -102,12 +128,19 @@ public final class QuerySearchResult extends SearchPhaseResult { this.sortValueFormats = sortValueFormats; } + private void setTopDocs(TopDocs topDocs) { + this.topDocs = topDocs; + hasScoreDocs = topDocs.scoreDocs.length > 0; + this.totalHits = topDocs.totalHits; + this.maxScore = topDocs.getMaxScore(); + } + public DocValueFormat[] sortValueFormats() { return sortValueFormats; } /** - * Retruns true if this query result has unconsumed aggregations + * Returns true if this query result has unconsumed aggregations */ public boolean hasAggs() { return hasAggs; @@ -195,10 +228,15 @@ public final class QuerySearchResult extends SearchPhaseResult { return this; } - /** Returns true iff the result has hits */ - public boolean hasHits() { - return (topDocs != null && topDocs.scoreDocs.length > 0) || - (suggest != null && suggest.hasScoreDocs()); + /** + * Returns true if this result has any suggest score docs + */ + public boolean hasSuggestHits() { + return (suggest != null && suggest.hasScoreDocs()); + } + + public boolean hasSearchContext() { + return hasScoreDocs || hasSuggestHits(); } public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOException { @@ -227,7 +265,7 @@ public final class QuerySearchResult extends SearchPhaseResult { sortValueFormats[i] = in.readNamedWriteable(DocValueFormat.class); } } - topDocs = readTopDocs(in); + setTopDocs(readTopDocs(in)); if (hasAggs = in.readBoolean()) { aggregations = InternalAggregations.readAggregations(in); } @@ -278,4 +316,12 @@ public final class QuerySearchResult extends SearchPhaseResult { out.writeOptionalBoolean(terminatedEarly); out.writeOptionalWriteable(profileShardResults); } + + public int getTotalHits() { + return totalHits; + } + + public float getMaxScore() { + return maxScore; + } } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 207183bae4e..c92caef628a 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import com.carrotsearch.randomizedtesting.RandomizedContext; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.common.lucene.Lucene; @@ -42,6 +43,7 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestCluster; import org.junit.Before; import java.io.IOException; @@ -51,12 +53,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class SearchPhaseControllerTests extends ESTestCase { @@ -75,8 +81,16 @@ public class SearchPhaseControllerTests extends ESTestCase { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); AtomicArray results = generateQueryResults(nShards, suggestions, queryResultSize, false); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList()); + Optional first = results.asList().stream().findFirst(); + int from = 0, size = 0; + if (first.isPresent()) { + from = first.get().queryResult().from(); + size = first.get().queryResult().size(); + } int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); + ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(), + from, size) + .scoreDocs; for (Suggest.Suggestion suggestion : reducedSuggest(results)) { int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); accumulatedLength += suggestionSize; @@ -84,48 +98,71 @@ public class SearchPhaseControllerTests extends ESTestCase { assertThat(sortedDocs.length, equalTo(accumulatedLength)); } - public void testSortIsIdempotent() throws IOException { + public void testSortIsIdempotent() throws Exception { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); - AtomicArray results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize, - randomBoolean() || true); + long randomSeed = randomLong(); + boolean useConstantScore = randomBoolean(); + AtomicArray results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, + useConstantScore); boolean ignoreFrom = randomBoolean(); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList()); + Optional first = results.asList().stream().findFirst(); + int from = 0, size = 0; + if (first.isPresent()) { + from = first.get().queryResult().from(); + size = first.get().queryResult().size(); + } + SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(); + ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs; - ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList()); - assertArrayEquals(sortedDocs, sortedDocs2); + results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, + useConstantScore); + SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(); + ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs; + assertEquals(sortedDocs.length, sortedDocs2.length); + for (int i = 0; i < sortedDocs.length; i++) { + assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc); + assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex); + assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f); + } + assertEquals(topDocsStats.maxScore, topDocsStats2.maxScore, 0.0f); + assertEquals(topDocsStats.totalHits, topDocsStats2.totalHits); + assertEquals(topDocsStats.fetchHits, topDocsStats2.fetchHits); + } + + private AtomicArray generateSeededQueryResults(long seed, int nShards, + List suggestions, + int searchHitsSize, boolean useConstantScore) throws Exception { + return RandomizedContext.current().runWithPrivateRandomness(seed, + () -> generateQueryResults(nShards, suggestions, searchHitsSize, useConstantScore)); } public void testMerge() throws IOException { List suggestions = new ArrayList<>(); + int maxSuggestSize = 0; for (int i = 0; i < randomIntBetween(1, 5); i++) { - suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20))); + int size = randomIntBetween(1, 20); + maxSuggestSize += size; + suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), size)); } int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); AtomicArray queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); - - // calculate offsets and score doc array - List mergedScoreDocs = new ArrayList<>(); - ScoreDoc[] mergedSearchDocs = getTopShardDocs(queryResults); - mergedScoreDocs.addAll(Arrays.asList(mergedSearchDocs)); - Suggest mergedSuggest = reducedSuggest(queryResults); - for (Suggest.Suggestion suggestion : mergedSuggest) { - if (suggestion instanceof CompletionSuggestion) { - CompletionSuggestion completionSuggestion = ((CompletionSuggestion) suggestion); - mergedScoreDocs.addAll(completionSuggestion.getOptions().stream() - .map(CompletionSuggestion.Entry.Option::getDoc) - .collect(Collectors.toList())); - } - } - ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]); - AtomicArray searchPhaseResultAtomicArray = generateFetchResults(nShards, mergedSearchDocs, mergedSuggest); - InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, - searchPhaseController.reducedQueryPhase(queryResults.asList()), + SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), false); + AtomicArray searchPhaseResultAtomicArray = generateFetchResults(nShards, reducedQueryPhase.scoreDocs, + reducedQueryPhase.suggest); + InternalSearchResponse mergedResponse = searchPhaseController.merge(false, + reducedQueryPhase, searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get); - assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length)); + int suggestSize = 0; + for (Suggest.Suggestion s : reducedQueryPhase.suggest) { + Stream stream = s.getEntries().stream(); + suggestSize += stream.collect(Collectors.summingInt(e -> e.getOptions().size())); + } + assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize)); + assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.scoreDocs.length-suggestSize)); Suggest suggestResult = mergedResponse.suggest(); - for (Suggest.Suggestion suggestion : mergedSuggest) { + for (Suggest.Suggestion suggestion : reducedQueryPhase.suggest) { assertThat(suggestion, instanceOf(CompletionSuggestion.class)); if (suggestion.getEntries().get(0).getOptions().size() > 0) { CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName()); @@ -209,16 +246,6 @@ public class SearchPhaseControllerTests extends ESTestCase { .collect(Collectors.toList())); } - private ScoreDoc[] getTopShardDocs(AtomicArray results) throws IOException { - List resultList = results.asList(); - TopDocs[] shardTopDocs = new TopDocs[resultList.size()]; - for (int i = 0; i < resultList.size(); i++) { - shardTopDocs[i] = resultList.get(i).queryResult().topDocs(); - } - int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results)); - return TopDocs.merge(topN, shardTopDocs).scoreDocs; - } - private AtomicArray generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { AtomicArray fetchResults = new AtomicArray<>(nShards); for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { @@ -309,30 +336,96 @@ public class SearchPhaseControllerTests extends ESTestCase { InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); AtomicInteger max = new AtomicInteger(); - CountDownLatch latch = new CountDownLatch(expectedNumResults); + Thread[] threads = new Thread[expectedNumResults]; for (int i = 0; i < expectedNumResults; i++) { int id = i; - Thread t = new Thread(() -> { + threads[i] = new Thread(() -> { int number = randomIntBetween(1, 1000); max.updateAndGet(prev -> Math.max(prev, number)); QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); - result.topDocs(new TopDocs(id, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); + result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(id); + result.size(1); consumer.consumeResult(result); - latch.countDown(); }); - t.start(); + threads[i].start(); + } + for (int i = 0; i < expectedNumResults; i++) { + threads[i].join(); } - latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(max.get(), internalMax.getValue(), 0.0D); + assertEquals(1, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f); } + public void testConsumerOnlyAggs() throws InterruptedException { + int expectedNumResults = randomIntBetween(1, 100); + int bufferSize = randomIntBetween(2, 200); + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); + request.setBatchedReduceSize(bufferSize); + InitialSearchPhase.SearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); + result.topDocs(new TopDocs(1, new ScoreDoc[0], number), new DocValueFormat[0]); + InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, + DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); + assertEquals(max.get(), internalMax.getValue(), 0.0D); + assertEquals(0, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + } + + + public void testConsumerOnlyHits() throws InterruptedException { + int expectedNumResults = randomIntBetween(1, 100); + int bufferSize = randomIntBetween(2, 200); + SearchRequest request = new SearchRequest(); + if (randomBoolean()) { + request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10))); + } + request.setBatchedReduceSize(bufferSize); + InitialSearchPhase.SearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); + result.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(0, number)}, number), new DocValueFormat[0]); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertEquals(1, reduce.scoreDocs.length); + assertEquals(max.get(), reduce.maxScore, 0.0f); + assertEquals(expectedNumResults, reduce.totalHits); + assertEquals(max.get(), reduce.scoreDocs[0].score, 0.0f); + } + + public void testNewSearchPhaseResults() { for (int i = 0; i < 10; i++) { int expectedNumResults = randomIntBetween(1, 10); @@ -342,10 +435,22 @@ public class SearchPhaseControllerTests extends ESTestCase { if ((hasAggs = randomBoolean())) { request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); } + final boolean hasTopDocs; + if ((hasTopDocs = randomBoolean())) { + if (request.source() != null) { + request.source().size(randomIntBetween(1, 100)); + } // no source means size = 10 + } else { + if (request.source() == null) { + request.source(new SearchSourceBuilder().size(0)); + } else { + request.source().size(0); + } + } request.setBatchedReduceSize(bufferSize); InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); - if (hasAggs && expectedNumResults > bufferSize) { + if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) { assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); } else { @@ -354,4 +459,36 @@ public class SearchPhaseControllerTests extends ESTestCase { } } } + + public void testReduceTopNWithFromOffset() { + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder().size(5).from(5)); + request.setBatchedReduceSize(randomIntBetween(2, 4)); + InitialSearchPhase.SearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(request, 4); + int score = 100; + for (int i = 0; i < 4; i++) { + QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i)); + ScoreDoc[] docs = new ScoreDoc[3]; + for (int j = 0; j < docs.length; j++) { + docs[j] = new ScoreDoc(0, score--); + } + result.topDocs(new TopDocs(3, docs, docs[0].score), new DocValueFormat[0]); + result.setShardIndex(i); + result.size(5); + result.from(5); + consumer.consumeResult(result); + } + // 4*3 results = 12 we get result 5 to 10 here with from=5 and size=5 + + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertEquals(5, reduce.scoreDocs.length); + assertEquals(100.f, reduce.maxScore, 0.0f); + assertEquals(12, reduce.totalHits); + assertEquals(95.0f, reduce.scoreDocs[0].score, 0.0f); + assertEquals(94.0f, reduce.scoreDocs[1].score, 0.0f); + assertEquals(93.0f, reduce.scoreDocs[2].score, 0.0f); + assertEquals(92.0f, reduce.scoreDocs[3].score, 0.0f); + assertEquals(91.0f, reduce.scoreDocs[4].score, 0.0f); + } } diff --git a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java index f3ff6be1cc1..6fc795a8825 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -223,8 +223,13 @@ public class SearchServiceTests extends ESSingleNodeTestCase { new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), null); - // the search context should inherit the default timeout - assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); + try { + // the search context should inherit the default timeout + assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); + } finally { + contextWithDefaultTimeout.decRef(); + service.freeContext(contextWithDefaultTimeout.id()); + } final long seconds = randomIntBetween(6, 10); final SearchContext context = service.createContext( @@ -238,8 +243,14 @@ public class SearchServiceTests extends ESSingleNodeTestCase { new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), null); - // the search context should inherit the query timeout - assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); + try { + // the search context should inherit the query timeout + assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); + } finally { + context.decRef(); + service.freeContext(context.id()); + } + } public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {