diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 41dc741de66..0a7a4fe8612 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.IntConsumer; - abstract class AbstractSearchAsyncAction extends AbstractAsyncAction { private static final float DEFAULT_INDEX_BOOST = 1.0f; protected final Logger logger; @@ -319,10 +318,10 @@ abstract class AbstractSearchAsyncAction } } - protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, int index, IntArrayList entry, + protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry, ScoreDoc[] lastEmittedDocPerShard) { final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null; - return new ShardFetchSearchRequest(request, queryResult.id(), entry, lastEmittedDoc); + return new ShardFetchSearchRequest(request, queryId, entry, lastEmittedDoc); } protected abstract void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request, @@ -435,24 +434,51 @@ abstract class AbstractSearchAsyncAction @Override public void run() throws Exception { + getExecutor().execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + // we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase + // off immediately instead of forking when we send back the response to the user since there we only need + // to merge together the fetched results which is a linear operation. + innerRun(); + } + + @Override + public void onFailure(Exception e) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } + }); + } + + private void innerRun() throws IOException{ + final int numShards = shardsIts.size(); final boolean isScrollRequest = request.scroll() != null; ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); - if (queryResults.length() == 1) { + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(queryResults) : null; + List> queryResultsAsList = queryResults.asList(); + final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResultsAsList); + final boolean queryAndFetchOptimization = queryResults.length() == 1; + final IntConsumer finishPhase = successOpts + -> sendResponse(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ? + queryResults : fetchResults); + if (queryAndFetchOptimization) { assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null; // query AND fetch optimization - sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, queryResults); + finishPhase.accept(successfulOps.get()); } else { - final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); - final IntConsumer finishPhase = successOpts - -> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults); + final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs); if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return - queryResults.asList().stream() + queryResultsAsList.stream() .map(e -> e.value.queryResult()) .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources finishPhase.accept(successfulOps.get()); } else { final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? - searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) + searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards) : null; final CountedCollector counter = new CountedCollector<>(fetchResults, docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not @@ -471,7 +497,7 @@ abstract class AbstractSearchAsyncAction counter.countDown(); } else { Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry, + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().id(), i, entry, lastEmittedDocPerShard); executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), connection); @@ -524,34 +550,20 @@ abstract class AbstractSearchAsyncAction } } } + + /** + * Sends back a result to the user. + */ + private void sendResponse(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, + String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, + AtomicArray fetchResultsArr) { + final boolean isScrollRequest = request.scroll() != null; + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, reducedQueryPhase, + fetchResultsArr); + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } } - /** - * Sends back a result to the user. This method will create the sorted docs if they are null and will build the scrollID for the - * response. Note: This method will send the response in a different thread depending on the executor. - */ - final void sendResponseAsync(String phase, SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, - AtomicArray queryResultsArr, - AtomicArray fetchResultsArr) { - getExecutor().execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final boolean isScrollRequest = request.scroll() != null; - final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, queryResultsArr, - fetchResultsArr); - String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(queryResultsArr) : null; - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } - @Override - public void onFailure(Exception e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException(phase, "", e, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } - }); - } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index d104917411b..33c8859e044 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -244,7 +244,7 @@ public class SearchPhaseController extends AbstractComponent { Arrays.sort(sortedResults, QUERY_RESULT_ORDERING); QuerySearchResultProvider firstResult = sortedResults[0].value; - int topN = topN(results); + int topN = firstResult.queryResult().size(); int from = firstResult.queryResult().from(); if (ignoreFrom) { from = 0; @@ -340,16 +340,12 @@ public class SearchPhaseController extends AbstractComponent { return scoreDocs; } - public ScoreDoc[] getLastEmittedDocPerShard(List> queryResults, + public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, ScoreDoc[] sortedScoreDocs, int numShards) { ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; - if (queryResults.isEmpty() == false) { - long fetchHits = 0; - for (AtomicArray.Entry queryResult : queryResults) { - fetchHits += queryResult.value.queryResult().topDocs().scoreDocs.length; - } + if (reducedQueryPhase.isEmpty() == false) { // from is always zero as when we use scroll, we ignore from - long size = Math.min(fetchHits, topN(queryResults)); + long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.oneResult.size()); // with collapsing we can have more hits than sorted docs size = Math.min(sortedScoreDocs.length, size); for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) { @@ -384,22 +380,50 @@ public class SearchPhaseController extends AbstractComponent { * completion suggestion ordered by suggestion name */ public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, - AtomicArray queryResultsArr, + ReducedQueryPhase reducedQueryPhase, AtomicArray fetchResultsArr) { - - List> queryResults = queryResultsArr.asList(); - List> fetchResults = fetchResultsArr.asList(); - - if (queryResults.isEmpty()) { + if (reducedQueryPhase.isEmpty()) { return InternalSearchResponse.empty(); } + List> fetchResults = fetchResultsArr.asList(); + InternalSearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResultsArr); + if (reducedQueryPhase.suggest != null) { + if (!fetchResults.isEmpty()) { + int currentOffset = hits.getHits().length; + for (CompletionSuggestion suggestion : reducedQueryPhase.suggest.filter(CompletionSuggestion.class)) { + final List suggestionOptions = suggestion.getOptions(); + for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) { + ScoreDoc shardDoc = sortedDocs[scoreDocIndex]; + QuerySearchResultProvider searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); + if (searchResultProvider == null) { + continue; + } + FetchSearchResult fetchResult = searchResultProvider.fetchResult(); + int fetchResultIndex = fetchResult.counterGetAndIncrement(); + if (fetchResultIndex < fetchResult.hits().internalHits().length) { + InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex]; + CompletionSuggestion.Entry.Option suggestOption = + suggestionOptions.get(scoreDocIndex - currentOffset); + hit.score(shardDoc.score); + hit.shard(fetchResult.shardTarget()); + suggestOption.setHit(hit); + } + } + currentOffset += suggestionOptions.size(); + } + assert currentOffset == sortedDocs.length : "expected no more score doc slices"; + } + } + return reducedQueryPhase.buildResponse(hits); + } - QuerySearchResult firstResult = queryResults.get(0).value.queryResult(); - + private InternalSearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs, + AtomicArray fetchResultsArr) { + List> fetchResults = fetchResultsArr.asList(); boolean sorted = false; int sortScoreIndex = -1; - if (firstResult.topDocs() instanceof TopFieldDocs) { - TopFieldDocs fieldDocs = (TopFieldDocs) firstResult.queryResult().topDocs(); + if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) { + TopFieldDocs fieldDocs = (TopFieldDocs) reducedQueryPhase.oneResult.queryResult().topDocs(); if (fieldDocs instanceof CollapseTopFieldDocs && fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) { sorted = false; @@ -412,41 +436,12 @@ public class SearchPhaseController extends AbstractComponent { } } } - - // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them)) - long totalHits = 0; - long fetchHits = 0; - float maxScore = Float.NEGATIVE_INFINITY; - boolean timedOut = false; - Boolean terminatedEarly = null; - for (AtomicArray.Entry entry : queryResults) { - QuerySearchResult result = entry.value.queryResult(); - if (result.searchTimedOut()) { - timedOut = true; - } - if (result.terminatedEarly() != null) { - if (terminatedEarly == null) { - terminatedEarly = result.terminatedEarly(); - } else if (result.terminatedEarly()) { - terminatedEarly = true; - } - } - totalHits += result.topDocs().totalHits; - fetchHits += result.topDocs().scoreDocs.length; - if (!Float.isNaN(result.topDocs().getMaxScore())) { - maxScore = Math.max(maxScore, result.topDocs().getMaxScore()); - } - } - if (Float.isInfinite(maxScore)) { - maxScore = Float.NaN; - } - // clean the fetch counter for (AtomicArray.Entry entry : fetchResults) { entry.value.fetchResult().initCounter(); } - int from = ignoreFrom ? 0 : firstResult.queryResult().from(); - int numSearchHits = (int) Math.min(fetchHits - from, topN(queryResults)); + int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from(); + int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size()); // with collapsing we can have more fetch hits than sorted docs numSearchHits = Math.min(sortedDocs.length, numSearchHits); // merge hits @@ -466,7 +461,7 @@ public class SearchPhaseController extends AbstractComponent { searchHit.shard(fetchResult.shardTarget()); if (sorted) { FieldDoc fieldDoc = (FieldDoc) shardDoc; - searchHit.sortValues(fieldDoc.fields, firstResult.sortValueFormats()); + searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats()); if (sortScoreIndex != -1) { searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue()); } @@ -475,99 +470,142 @@ public class SearchPhaseController extends AbstractComponent { } } } - - // merge suggest results - Suggest suggest = null; - if (firstResult.suggest() != null) { - final Map> groupedSuggestions = new HashMap<>(); - for (AtomicArray.Entry queryResult : queryResults) { - Suggest shardSuggest = queryResult.value.queryResult().suggest(); - if (shardSuggest != null) { - for (Suggestion> suggestion : shardSuggest) { - List suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); - suggestionList.add(suggestion); - } - } - } - if (groupedSuggestions.isEmpty() == false) { - suggest = new Suggest(Suggest.reduce(groupedSuggestions)); - if (!fetchResults.isEmpty()) { - int currentOffset = numSearchHits; - for (CompletionSuggestion suggestion : suggest.filter(CompletionSuggestion.class)) { - final List suggestionOptions = suggestion.getOptions(); - for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) { - ScoreDoc shardDoc = sortedDocs[scoreDocIndex]; - QuerySearchResultProvider searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); - if (searchResultProvider == null) { - continue; - } - FetchSearchResult fetchResult = searchResultProvider.fetchResult(); - int fetchResultIndex = fetchResult.counterGetAndIncrement(); - if (fetchResultIndex < fetchResult.hits().internalHits().length) { - InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex]; - CompletionSuggestion.Entry.Option suggestOption = - suggestionOptions.get(scoreDocIndex - currentOffset); - hit.score(shardDoc.score); - hit.shard(fetchResult.shardTarget()); - suggestOption.setHit(hit); - } - } - currentOffset += suggestionOptions.size(); - } - assert currentOffset == sortedDocs.length : "expected no more score doc slices"; - } - } - } - - // merge Aggregation - InternalAggregations aggregations = null; - if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) { - List aggregationsList = new ArrayList<>(queryResults.size()); - for (AtomicArray.Entry entry : queryResults) { - aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); - } - ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService); - aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); - List pipelineAggregators = firstResult.pipelineAggregators(); - if (pipelineAggregators != null) { - List newAggs = StreamSupport.stream(aggregations.spliterator(), false) - .map((p) -> (InternalAggregation) p) - .collect(Collectors.toList()); - for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); - newAggs.add(newAgg); - } - aggregations = new InternalAggregations(newAggs); - } - } - - //Collect profile results - SearchProfileShardResults shardResults = null; - if (firstResult.profileResults() != null) { - Map profileResults = new HashMap<>(queryResults.size()); - for (AtomicArray.Entry entry : queryResults) { - String key = entry.value.queryResult().shardTarget().toString(); - profileResults.put(key, entry.value.queryResult().profileResults()); - } - shardResults = new SearchProfileShardResults(profileResults); - } - - InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore); - - return new InternalSearchResponse(searchHits, aggregations, suggest, shardResults, timedOut, terminatedEarly); + return new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), reducedQueryPhase.totalHits, + reducedQueryPhase.maxScore); } /** - * returns the number of top results to be considered across all shards + * Reduces the given query results and consumes all aggregations and profile results. + * @see QuerySearchResult#consumeAggs() + * @see QuerySearchResult#consumeProfileResult() */ - private static int topN(List> queryResults) { - QuerySearchResultProvider firstResult = queryResults.get(0).value; - int topN = firstResult.queryResult().size(); - if (firstResult.fetchResult() != null) { - // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them... - // this is also important since we shortcut and fetch only docs from "from" and up to "size" - topN *= queryResults.size(); + public final ReducedQueryPhase reducedQueryPhase(List> queryResults) { + long totalHits = 0; + long fetchHits = 0; + float maxScore = Float.NEGATIVE_INFINITY; + boolean timedOut = false; + Boolean terminatedEarly = null; + if (queryResults.isEmpty()) { + return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null); } - return topN; + QuerySearchResult firstResult = queryResults.get(0).value.queryResult(); + final boolean hasSuggest = firstResult.suggest() != null; + final boolean hasAggs = firstResult.hasAggs(); + final boolean hasProfileResults = firstResult.hasProfileResults(); + final List aggregationsList = hasAggs ? new ArrayList<>(queryResults.size()) : Collections.emptyList(); + // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them)) + final Map> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); + final Map profileResults = hasProfileResults ? new HashMap<>(queryResults.size()) + : Collections.emptyMap(); + for (AtomicArray.Entry entry : queryResults) { + QuerySearchResult result = entry.value.queryResult(); + if (result.searchTimedOut()) { + timedOut = true; + } + if (result.terminatedEarly() != null) { + if (terminatedEarly == null) { + terminatedEarly = result.terminatedEarly(); + } else if (result.terminatedEarly()) { + terminatedEarly = true; + } + } + totalHits += result.topDocs().totalHits; + fetchHits += result.topDocs().scoreDocs.length; + if (!Float.isNaN(result.topDocs().getMaxScore())) { + maxScore = Math.max(maxScore, result.topDocs().getMaxScore()); + } + if (hasSuggest) { + assert result.suggest() != null; + for (Suggestion> suggestion : result.suggest()) { + List suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); + suggestionList.add(suggestion); + } + } + if (hasAggs) { + aggregationsList.add((InternalAggregations) result.consumeAggs()); + } + if (hasProfileResults) { + String key = result.shardTarget().toString(); + profileResults.put(key, result.consumeProfileResult()); + } + } + final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); + final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, + firstResult.pipelineAggregators()); + final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); + return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations, + shardResults); } + + private InternalAggregations reduceAggs(List aggregationsList, + List pipelineAggregators) { + ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService); + InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); + if (pipelineAggregators != null) { + List newAggs = StreamSupport.stream(aggregations.spliterator(), false) + .map((p) -> (InternalAggregation) p) + .collect(Collectors.toList()); + for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { + InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); + newAggs.add(newAgg); + } + return new InternalAggregations(newAggs); + } + return aggregations; + } + + public static final class ReducedQueryPhase { + // the sum of all hits across all reduces shards + final long totalHits; + // the number of returned hits (doc IDs) across all reduces shards + final long fetchHits; + // the max score across all reduces hits or {@link Float#NaN} if no hits returned + final float maxScore; + // true if at least one reduced result timed out + final boolean timedOut; + // non null and true if at least one reduced result was terminated early + final Boolean terminatedEarly; + // an non-null arbitrary query result if was at least one reduced result + final QuerySearchResult oneResult; + // the reduced suggest results + final Suggest suggest; + // the reduced internal aggregations + final InternalAggregations aggregations; + // the reduced profile results + final SearchProfileShardResults shardResults; + + ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, + QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations, + SearchProfileShardResults shardResults) { + this.totalHits = totalHits; + this.fetchHits = fetchHits; + if (Float.isInfinite(maxScore)) { + this.maxScore = Float.NaN; + } else { + this.maxScore = maxScore; + } + this.timedOut = timedOut; + this.terminatedEarly = terminatedEarly; + this.oneResult = oneResult; + this.suggest = suggest; + this.aggregations = aggregations; + this.shardResults = shardResults; + } + + /** + * Creates a new search response from the given merged hits. + * @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, AtomicArray) + */ + public InternalSearchResponse buildResponse(InternalSearchHits hits) { + return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly); + } + + /** + * Returns true iff the query phase had no results. Otherwise false + */ + public boolean isEmpty() { + return oneResult == null; + } + } + } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index bf53fc719c6..b005c0fc2fe 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -171,8 +171,8 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { private void innerFinishHim() throws Exception { ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults, - queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, + searchPhaseController.reducedQueryPhase(queryFetchResults.asList()), queryFetchResults); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index f7c213c6f1f..6d8d3a42be3 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -170,13 +171,14 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private void executeFetchPhase() throws Exception { sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); if (sortedShardDocs.length == 0) { - finishHim(); + finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList())); return; } final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), - sortedShardDocs, queryResults.length()); + SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList()); + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, + queryResults.length()); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.length); for (int i = 0; i < docIdsToLoad.length; i++) { final int index = i; @@ -192,7 +194,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { result.shardTarget(querySearchResult.shardTarget()); fetchResults.set(index, result); if (counter.decrementAndGet() == 0) { - finishHim(); + finishHim(reducedQueryPhase); } } @@ -203,34 +205,30 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { } successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { - finishHim(); + finishHim(reducedQueryPhase); } } }); } else { // the counter is set to the total size of docIdsToLoad which can have null values so we have to count them down too if (counter.decrementAndGet() == 0) { - finishHim(); + finishHim(reducedQueryPhase); } } } } - private void finishHim() { + private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) { try { - innerFinishHim(); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase, fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = request.scrollId(); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); } catch (Exception e) { listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures())); } } - - private void innerFinishHim() { - InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = request.scrollId(); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 03c21c9dad5..cb7c716198e 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -84,7 +84,7 @@ public class AggregationPhase implements SearchPhase { return; } - if (context.queryResult().aggregations() != null) { + if (context.queryResult().hasAggs()) { // no need to compute the aggs twice, they should be computed on a per context basis return; } diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index f930e4beeb2..a8d8ae74062 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -21,7 +21,6 @@ package org.elasticsearch.search.query; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TopDocs; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; @@ -41,7 +40,7 @@ import static java.util.Collections.emptyList; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; -public class QuerySearchResult extends QuerySearchResultProvider { +public final class QuerySearchResult extends QuerySearchResultProvider { private long id; private SearchShardTarget shardTarget; @@ -50,14 +49,15 @@ public class QuerySearchResult extends QuerySearchResultProvider { private TopDocs topDocs; private DocValueFormat[] sortValueFormats; private InternalAggregations aggregations; + private boolean hasAggs; private List pipelineAggregators; private Suggest suggest; private boolean searchTimedOut; private Boolean terminatedEarly = null; private ProfileShardResult profileShardResults; + private boolean hasProfileResults; public QuerySearchResult() { - } public QuerySearchResult(long id, SearchShardTarget shardTarget) { @@ -121,20 +121,47 @@ public class QuerySearchResult extends QuerySearchResultProvider { return sortValueFormats; } - public Aggregations aggregations() { - return aggregations; + /** + * Retruns true if this query result has unconsumed aggregations + */ + public boolean hasAggs() { + return hasAggs; + } + + /** + * Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed. + * @throws IllegalStateException if the aggregations have already been consumed. + */ + public Aggregations consumeAggs() { + if (aggregations == null) { + throw new IllegalStateException("aggs already consumed"); + } + Aggregations aggs = aggregations; + aggregations = null; + return aggs; } public void aggregations(InternalAggregations aggregations) { this.aggregations = aggregations; + hasAggs = aggregations != null; } /** - * Returns the profiled results for this search, or potentially null if result was empty - * @return The profiled results, or null + * Returns and nulls out the profiled results for this search, or potentially null if result was empty. + * This allows to free up memory once the profiled result is consumed. + * @throws IllegalStateException if the profiled result has already been consumed. */ - @Nullable public ProfileShardResult profileResults() { - return profileShardResults; + public ProfileShardResult consumeProfileResult() { + if (profileShardResults == null) { + throw new IllegalStateException("profile results already consumed"); + } + ProfileShardResult result = profileShardResults; + profileShardResults = null; + return result; + } + + public boolean hasProfileResults() { + return hasProfileResults; } /** @@ -143,6 +170,7 @@ public class QuerySearchResult extends QuerySearchResultProvider { */ public void profileResults(ProfileShardResult shardResults) { this.profileShardResults = shardResults; + hasProfileResults = shardResults != null; } public List pipelineAggregators() { @@ -170,6 +198,9 @@ public class QuerySearchResult extends QuerySearchResultProvider { return this; } + /** + * Returns the maximum size of this results top docs. + */ public int size() { return size; } @@ -212,7 +243,7 @@ public class QuerySearchResult extends QuerySearchResultProvider { } } topDocs = readTopDocs(in); - if (in.readBoolean()) { + if (hasAggs = in.readBoolean()) { aggregations = InternalAggregations.readAggregations(in); } pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a) @@ -223,6 +254,7 @@ public class QuerySearchResult extends QuerySearchResultProvider { searchTimedOut = in.readBoolean(); terminatedEarly = in.readOptionalBoolean(); profileShardResults = in.readOptionalWriteable(ProfileShardResult::new); + hasProfileResults = profileShardResults != null; } @Override diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index fd1ac6eb917..db729cac1c2 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -98,8 +98,8 @@ public class SearchPhaseControllerTests extends ESTestCase { } } ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]); - - InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, queryResults, + InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, + searchPhaseController.reducedQueryPhase(queryResults.asList()), generateFetchResults(nShards, mergedSearchDocs, mergedSuggest)); assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length)); Suggest suggestResult = mergedResponse.suggest();