Separate reduce (aggs, suggest and profile) from merging fetched hits (#23017)

Today we carry on all search results including aggs, suggest and profile results
until we have successfully fetched all hits for the search request. This can potentially
hold on to a large amount of memory if there are heavy aggregations involved. With
this change aggs and profiles are entirely consumed an released for GC before the fetch
phase is executing. This is a first step towards reducing results on-the-fly if the number
of non-empty response are large.
This commit is contained in:
Simon Willnauer 2017-02-08 10:11:51 +01:00 committed by GitHub
parent 9154686623
commit a8b376670c
7 changed files with 290 additions and 210 deletions

View File

@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.IntConsumer; import java.util.function.IntConsumer;
abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction { abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
private static final float DEFAULT_INDEX_BOOST = 1.0f; private static final float DEFAULT_INDEX_BOOST = 1.0f;
protected final Logger logger; protected final Logger logger;
@ -319,10 +318,10 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
} }
} }
protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, int index, IntArrayList entry, protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry,
ScoreDoc[] lastEmittedDocPerShard) { ScoreDoc[] lastEmittedDocPerShard) {
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null; final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null;
return new ShardFetchSearchRequest(request, queryResult.id(), entry, lastEmittedDoc); return new ShardFetchSearchRequest(request, queryId, entry, lastEmittedDoc);
} }
protected abstract void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request, protected abstract void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
@ -435,24 +434,51 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
@Override @Override
public void run() throws Exception { public void run() throws Exception {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
// we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase
// off immediately instead of forking when we send back the response to the user since there we only need
// to merge together the fetched results which is a linear operation.
innerRun();
}
@Override
public void onFailure(Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
}
});
}
private void innerRun() throws IOException{
final int numShards = shardsIts.size();
final boolean isScrollRequest = request.scroll() != null; final boolean isScrollRequest = request.scroll() != null;
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
if (queryResults.length() == 1) { String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(queryResults) : null;
List<AtomicArray.Entry<QuerySearchResultProvider>> queryResultsAsList = queryResults.asList();
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResultsAsList);
final boolean queryAndFetchOptimization = queryResults.length() == 1;
final IntConsumer finishPhase = successOpts
-> sendResponse(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
if (queryAndFetchOptimization) {
assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null; assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null;
// query AND fetch optimization // query AND fetch optimization
sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, queryResults); finishPhase.accept(successfulOps.get());
} else { } else {
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs);
final IntConsumer finishPhase = successOpts
-> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults);
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
queryResults.asList().stream() queryResultsAsList.stream()
.map(e -> e.value.queryResult()) .map(e -> e.value.queryResult())
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.accept(successfulOps.get()); finishPhase.accept(successfulOps.get());
} else { } else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length()) searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards)
: null; : null;
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults, final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults,
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
@ -471,7 +497,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
counter.countDown(); counter.countDown();
} else { } else {
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId()); Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry, ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().id(), i, entry,
lastEmittedDocPerShard); lastEmittedDocPerShard);
executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
connection); connection);
@ -524,34 +550,20 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
} }
} }
} }
/**
* Sends back a result to the user.
*/
private void sendResponse(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, reducedQueryPhase,
fetchResultsArr);
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
} }
/**
* Sends back a result to the user. This method will create the sorted docs if they are null and will build the scrollID for the
* response. Note: This method will send the response in a different thread depending on the executor.
*/
final void sendResponseAsync(String phase, SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, queryResultsArr,
fetchResultsArr);
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(queryResultsArr) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
@Override
public void onFailure(Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException(phase, "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
}
});
}
} }

View File

@ -244,7 +244,7 @@ public class SearchPhaseController extends AbstractComponent {
Arrays.sort(sortedResults, QUERY_RESULT_ORDERING); Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
QuerySearchResultProvider firstResult = sortedResults[0].value; QuerySearchResultProvider firstResult = sortedResults[0].value;
int topN = topN(results); int topN = firstResult.queryResult().size();
int from = firstResult.queryResult().from(); int from = firstResult.queryResult().from();
if (ignoreFrom) { if (ignoreFrom) {
from = 0; from = 0;
@ -340,16 +340,12 @@ public class SearchPhaseController extends AbstractComponent {
return scoreDocs; return scoreDocs;
} }
public ScoreDoc[] getLastEmittedDocPerShard(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults, public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
ScoreDoc[] sortedScoreDocs, int numShards) { ScoreDoc[] sortedScoreDocs, int numShards) {
ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
if (queryResults.isEmpty() == false) { if (reducedQueryPhase.isEmpty() == false) {
long fetchHits = 0;
for (AtomicArray.Entry<? extends QuerySearchResultProvider> queryResult : queryResults) {
fetchHits += queryResult.value.queryResult().topDocs().scoreDocs.length;
}
// from is always zero as when we use scroll, we ignore from // from is always zero as when we use scroll, we ignore from
long size = Math.min(fetchHits, topN(queryResults)); long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.oneResult.size());
// with collapsing we can have more hits than sorted docs // with collapsing we can have more hits than sorted docs
size = Math.min(sortedScoreDocs.length, size); size = Math.min(sortedScoreDocs.length, size);
for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) { for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) {
@ -384,22 +380,50 @@ public class SearchPhaseController extends AbstractComponent {
* completion suggestion ordered by suggestion name * completion suggestion ordered by suggestion name
*/ */
public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
AtomicArray<? extends QuerySearchResultProvider> queryResultsArr, ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) { AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
if (reducedQueryPhase.isEmpty()) {
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> fetchResults = fetchResultsArr.asList();
if (queryResults.isEmpty()) {
return InternalSearchResponse.empty(); return InternalSearchResponse.empty();
} }
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> fetchResults = fetchResultsArr.asList();
InternalSearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResultsArr);
if (reducedQueryPhase.suggest != null) {
if (!fetchResults.isEmpty()) {
int currentOffset = hits.getHits().length;
for (CompletionSuggestion suggestion : reducedQueryPhase.suggest.filter(CompletionSuggestion.class)) {
final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions();
for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) {
ScoreDoc shardDoc = sortedDocs[scoreDocIndex];
QuerySearchResultProvider searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (searchResultProvider == null) {
continue;
}
FetchSearchResult fetchResult = searchResultProvider.fetchResult();
int fetchResultIndex = fetchResult.counterGetAndIncrement();
if (fetchResultIndex < fetchResult.hits().internalHits().length) {
InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex];
CompletionSuggestion.Entry.Option suggestOption =
suggestionOptions.get(scoreDocIndex - currentOffset);
hit.score(shardDoc.score);
hit.shard(fetchResult.shardTarget());
suggestOption.setHit(hit);
}
}
currentOffset += suggestionOptions.size();
}
assert currentOffset == sortedDocs.length : "expected no more score doc slices";
}
}
return reducedQueryPhase.buildResponse(hits);
}
QuerySearchResult firstResult = queryResults.get(0).value.queryResult(); private InternalSearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> fetchResults = fetchResultsArr.asList();
boolean sorted = false; boolean sorted = false;
int sortScoreIndex = -1; int sortScoreIndex = -1;
if (firstResult.topDocs() instanceof TopFieldDocs) { if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) firstResult.queryResult().topDocs(); TopFieldDocs fieldDocs = (TopFieldDocs) reducedQueryPhase.oneResult.queryResult().topDocs();
if (fieldDocs instanceof CollapseTopFieldDocs && if (fieldDocs instanceof CollapseTopFieldDocs &&
fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) { fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) {
sorted = false; sorted = false;
@ -412,41 +436,12 @@ public class SearchPhaseController extends AbstractComponent {
} }
} }
} }
// count the total (we use the query result provider here, since we might not get any hits (we scrolled past them))
long totalHits = 0;
long fetchHits = 0;
float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
QuerySearchResult result = entry.value.queryResult();
if (result.searchTimedOut()) {
timedOut = true;
}
if (result.terminatedEarly() != null) {
if (terminatedEarly == null) {
terminatedEarly = result.terminatedEarly();
} else if (result.terminatedEarly()) {
terminatedEarly = true;
}
}
totalHits += result.topDocs().totalHits;
fetchHits += result.topDocs().scoreDocs.length;
if (!Float.isNaN(result.topDocs().getMaxScore())) {
maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
}
}
if (Float.isInfinite(maxScore)) {
maxScore = Float.NaN;
}
// clean the fetch counter // clean the fetch counter
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : fetchResults) { for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : fetchResults) {
entry.value.fetchResult().initCounter(); entry.value.fetchResult().initCounter();
} }
int from = ignoreFrom ? 0 : firstResult.queryResult().from(); int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from();
int numSearchHits = (int) Math.min(fetchHits - from, topN(queryResults)); int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size());
// with collapsing we can have more fetch hits than sorted docs // with collapsing we can have more fetch hits than sorted docs
numSearchHits = Math.min(sortedDocs.length, numSearchHits); numSearchHits = Math.min(sortedDocs.length, numSearchHits);
// merge hits // merge hits
@ -466,7 +461,7 @@ public class SearchPhaseController extends AbstractComponent {
searchHit.shard(fetchResult.shardTarget()); searchHit.shard(fetchResult.shardTarget());
if (sorted) { if (sorted) {
FieldDoc fieldDoc = (FieldDoc) shardDoc; FieldDoc fieldDoc = (FieldDoc) shardDoc;
searchHit.sortValues(fieldDoc.fields, firstResult.sortValueFormats()); searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats());
if (sortScoreIndex != -1) { if (sortScoreIndex != -1) {
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue()); searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
} }
@ -475,99 +470,142 @@ public class SearchPhaseController extends AbstractComponent {
} }
} }
} }
return new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), reducedQueryPhase.totalHits,
// merge suggest results reducedQueryPhase.maxScore);
Suggest suggest = null;
if (firstResult.suggest() != null) {
final Map<String, List<Suggestion>> groupedSuggestions = new HashMap<>();
for (AtomicArray.Entry<? extends QuerySearchResultProvider> queryResult : queryResults) {
Suggest shardSuggest = queryResult.value.queryResult().suggest();
if (shardSuggest != null) {
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : shardSuggest) {
List<Suggestion> suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
suggestionList.add(suggestion);
}
}
}
if (groupedSuggestions.isEmpty() == false) {
suggest = new Suggest(Suggest.reduce(groupedSuggestions));
if (!fetchResults.isEmpty()) {
int currentOffset = numSearchHits;
for (CompletionSuggestion suggestion : suggest.filter(CompletionSuggestion.class)) {
final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions();
for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) {
ScoreDoc shardDoc = sortedDocs[scoreDocIndex];
QuerySearchResultProvider searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (searchResultProvider == null) {
continue;
}
FetchSearchResult fetchResult = searchResultProvider.fetchResult();
int fetchResultIndex = fetchResult.counterGetAndIncrement();
if (fetchResultIndex < fetchResult.hits().internalHits().length) {
InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex];
CompletionSuggestion.Entry.Option suggestOption =
suggestionOptions.get(scoreDocIndex - currentOffset);
hit.score(shardDoc.score);
hit.shard(fetchResult.shardTarget());
suggestOption.setHit(hit);
}
}
currentOffset += suggestionOptions.size();
}
assert currentOffset == sortedDocs.length : "expected no more score doc slices";
}
}
}
// merge Aggregation
InternalAggregations aggregations = null;
if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) {
List<InternalAggregations> aggregationsList = new ArrayList<>(queryResults.size());
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
}
ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService);
aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
List<SiblingPipelineAggregator> pipelineAggregators = firstResult.pipelineAggregators();
if (pipelineAggregators != null) {
List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
newAggs.add(newAgg);
}
aggregations = new InternalAggregations(newAggs);
}
}
//Collect profile results
SearchProfileShardResults shardResults = null;
if (firstResult.profileResults() != null) {
Map<String, ProfileShardResult> profileResults = new HashMap<>(queryResults.size());
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
String key = entry.value.queryResult().shardTarget().toString();
profileResults.put(key, entry.value.queryResult().profileResults());
}
shardResults = new SearchProfileShardResults(profileResults);
}
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
return new InternalSearchResponse(searchHits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
} }
/** /**
* returns the number of top results to be considered across all shards * Reduces the given query results and consumes all aggregations and profile results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/ */
private static int topN(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) { public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
QuerySearchResultProvider firstResult = queryResults.get(0).value; long totalHits = 0;
int topN = firstResult.queryResult().size(); long fetchHits = 0;
if (firstResult.fetchResult() != null) { float maxScore = Float.NEGATIVE_INFINITY;
// if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them... boolean timedOut = false;
// this is also important since we shortcut and fetch only docs from "from" and up to "size" Boolean terminatedEarly = null;
topN *= queryResults.size(); if (queryResults.isEmpty()) {
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null);
} }
return topN; QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasAggs = firstResult.hasAggs();
final boolean hasProfileResults = firstResult.hasProfileResults();
final List<InternalAggregations> aggregationsList = hasAggs ? new ArrayList<>(queryResults.size()) : Collections.emptyList();
// count the total (we use the query result provider here, since we might not get any hits (we scrolled past them))
final Map<String, List<Suggestion>> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap();
final Map<String, ProfileShardResult> profileResults = hasProfileResults ? new HashMap<>(queryResults.size())
: Collections.emptyMap();
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
QuerySearchResult result = entry.value.queryResult();
if (result.searchTimedOut()) {
timedOut = true;
}
if (result.terminatedEarly() != null) {
if (terminatedEarly == null) {
terminatedEarly = result.terminatedEarly();
} else if (result.terminatedEarly()) {
terminatedEarly = true;
}
}
totalHits += result.topDocs().totalHits;
fetchHits += result.topDocs().scoreDocs.length;
if (!Float.isNaN(result.topDocs().getMaxScore())) {
maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
}
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
List<Suggestion> suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
suggestionList.add(suggestion);
}
}
if (hasAggs) {
aggregationsList.add((InternalAggregations) result.consumeAggs());
}
if (hasProfileResults) {
String key = result.shardTarget().toString();
profileResults.put(key, result.consumeProfileResult());
}
}
final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators());
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
shardResults);
} }
private InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> pipelineAggregators) {
ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService);
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
if (pipelineAggregators != null) {
List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
newAggs.add(newAgg);
}
return new InternalAggregations(newAggs);
}
return aggregations;
}
public static final class ReducedQueryPhase {
// the sum of all hits across all reduces shards
final long totalHits;
// the number of returned hits (doc IDs) across all reduces shards
final long fetchHits;
// the max score across all reduces hits or {@link Float#NaN} if no hits returned
final float maxScore;
// <code>true</code> if at least one reduced result timed out
final boolean timedOut;
// non null and true if at least one reduced result was terminated early
final Boolean terminatedEarly;
// an non-null arbitrary query result if was at least one reduced result
final QuerySearchResult oneResult;
// the reduced suggest results
final Suggest suggest;
// the reduced internal aggregations
final InternalAggregations aggregations;
// the reduced profile results
final SearchProfileShardResults shardResults;
ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
SearchProfileShardResults shardResults) {
this.totalHits = totalHits;
this.fetchHits = fetchHits;
if (Float.isInfinite(maxScore)) {
this.maxScore = Float.NaN;
} else {
this.maxScore = maxScore;
}
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.oneResult = oneResult;
this.suggest = suggest;
this.aggregations = aggregations;
this.shardResults = shardResults;
}
/**
* Creates a new search response from the given merged hits.
* @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, AtomicArray)
*/
public InternalSearchResponse buildResponse(InternalSearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
}
/**
* Returns <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
*/
public boolean isEmpty() {
return oneResult == null;
}
}
} }

View File

@ -171,8 +171,8 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private void innerFinishHim() throws Exception { private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults, final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs,
queryFetchResults); searchPhaseController.reducedQueryPhase(queryFetchResults.asList()), queryFetchResults);
String scrollId = null; String scrollId = null;
if (request.scroll() != null) { if (request.scroll() != null) {
scrollId = request.scrollId(); scrollId = request.scrollId();

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
@ -170,13 +171,14 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private void executeFetchPhase() throws Exception { private void executeFetchPhase() throws Exception {
sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); sortedShardDocs = searchPhaseController.sortDocs(true, queryResults);
if (sortedShardDocs.length == 0) { if (sortedShardDocs.length == 0) {
finishHim(); finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList()));
return; return;
} }
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs); final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs);
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList());
sortedShardDocs, queryResults.length()); final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs,
queryResults.length());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.length); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.length);
for (int i = 0; i < docIdsToLoad.length; i++) { for (int i = 0; i < docIdsToLoad.length; i++) {
final int index = i; final int index = i;
@ -192,7 +194,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
result.shardTarget(querySearchResult.shardTarget()); result.shardTarget(querySearchResult.shardTarget());
fetchResults.set(index, result); fetchResults.set(index, result);
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim(reducedQueryPhase);
} }
} }
@ -203,34 +205,30 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
} }
successfulOps.decrementAndGet(); successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim(reducedQueryPhase);
} }
} }
}); });
} else { } else {
// the counter is set to the total size of docIdsToLoad which can have null values so we have to count them down too // the counter is set to the total size of docIdsToLoad which can have null values so we have to count them down too
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim(reducedQueryPhase);
} }
} }
} }
} }
private void finishHim() { private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) {
try { try {
innerFinishHim(); final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures())); listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
} }
} }
private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
} }

View File

@ -84,7 +84,7 @@ public class AggregationPhase implements SearchPhase {
return; return;
} }
if (context.queryResult().aggregations() != null) { if (context.queryResult().hasAggs()) {
// no need to compute the aggs twice, they should be computed on a per context basis // no need to compute the aggs twice, they should be computed on a per context basis
return; return;
} }

View File

@ -21,7 +21,6 @@ package org.elasticsearch.search.query;
import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
@ -41,7 +40,7 @@ import static java.util.Collections.emptyList;
import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
public class QuerySearchResult extends QuerySearchResultProvider { public final class QuerySearchResult extends QuerySearchResultProvider {
private long id; private long id;
private SearchShardTarget shardTarget; private SearchShardTarget shardTarget;
@ -50,14 +49,15 @@ public class QuerySearchResult extends QuerySearchResultProvider {
private TopDocs topDocs; private TopDocs topDocs;
private DocValueFormat[] sortValueFormats; private DocValueFormat[] sortValueFormats;
private InternalAggregations aggregations; private InternalAggregations aggregations;
private boolean hasAggs;
private List<SiblingPipelineAggregator> pipelineAggregators; private List<SiblingPipelineAggregator> pipelineAggregators;
private Suggest suggest; private Suggest suggest;
private boolean searchTimedOut; private boolean searchTimedOut;
private Boolean terminatedEarly = null; private Boolean terminatedEarly = null;
private ProfileShardResult profileShardResults; private ProfileShardResult profileShardResults;
private boolean hasProfileResults;
public QuerySearchResult() { public QuerySearchResult() {
} }
public QuerySearchResult(long id, SearchShardTarget shardTarget) { public QuerySearchResult(long id, SearchShardTarget shardTarget) {
@ -121,20 +121,47 @@ public class QuerySearchResult extends QuerySearchResultProvider {
return sortValueFormats; return sortValueFormats;
} }
public Aggregations aggregations() { /**
return aggregations; * Retruns <code>true</code> if this query result has unconsumed aggregations
*/
public boolean hasAggs() {
return hasAggs;
}
/**
* Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed.
* @throws IllegalStateException if the aggregations have already been consumed.
*/
public Aggregations consumeAggs() {
if (aggregations == null) {
throw new IllegalStateException("aggs already consumed");
}
Aggregations aggs = aggregations;
aggregations = null;
return aggs;
} }
public void aggregations(InternalAggregations aggregations) { public void aggregations(InternalAggregations aggregations) {
this.aggregations = aggregations; this.aggregations = aggregations;
hasAggs = aggregations != null;
} }
/** /**
* Returns the profiled results for this search, or potentially null if result was empty * Returns and nulls out the profiled results for this search, or potentially null if result was empty.
* @return The profiled results, or null * This allows to free up memory once the profiled result is consumed.
* @throws IllegalStateException if the profiled result has already been consumed.
*/ */
@Nullable public ProfileShardResult profileResults() { public ProfileShardResult consumeProfileResult() {
return profileShardResults; if (profileShardResults == null) {
throw new IllegalStateException("profile results already consumed");
}
ProfileShardResult result = profileShardResults;
profileShardResults = null;
return result;
}
public boolean hasProfileResults() {
return hasProfileResults;
} }
/** /**
@ -143,6 +170,7 @@ public class QuerySearchResult extends QuerySearchResultProvider {
*/ */
public void profileResults(ProfileShardResult shardResults) { public void profileResults(ProfileShardResult shardResults) {
this.profileShardResults = shardResults; this.profileShardResults = shardResults;
hasProfileResults = shardResults != null;
} }
public List<SiblingPipelineAggregator> pipelineAggregators() { public List<SiblingPipelineAggregator> pipelineAggregators() {
@ -170,6 +198,9 @@ public class QuerySearchResult extends QuerySearchResultProvider {
return this; return this;
} }
/**
* Returns the maximum size of this results top docs.
*/
public int size() { public int size() {
return size; return size;
} }
@ -212,7 +243,7 @@ public class QuerySearchResult extends QuerySearchResultProvider {
} }
} }
topDocs = readTopDocs(in); topDocs = readTopDocs(in);
if (in.readBoolean()) { if (hasAggs = in.readBoolean()) {
aggregations = InternalAggregations.readAggregations(in); aggregations = InternalAggregations.readAggregations(in);
} }
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a) pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a)
@ -223,6 +254,7 @@ public class QuerySearchResult extends QuerySearchResultProvider {
searchTimedOut = in.readBoolean(); searchTimedOut = in.readBoolean();
terminatedEarly = in.readOptionalBoolean(); terminatedEarly = in.readOptionalBoolean();
profileShardResults = in.readOptionalWriteable(ProfileShardResult::new); profileShardResults = in.readOptionalWriteable(ProfileShardResult::new);
hasProfileResults = profileShardResults != null;
} }
@Override @Override

View File

@ -98,8 +98,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
} }
} }
ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]); ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]);
InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs,
InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, queryResults, searchPhaseController.reducedQueryPhase(queryResults.asList()),
generateFetchResults(nShards, mergedSearchDocs, mergedSuggest)); generateFetchResults(nShards, mergedSearchDocs, mergedSuggest));
assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length)); assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length));
Suggest suggestResult = mergedResponse.suggest(); Suggest suggestResult = mergedResponse.suggest();