Cleanup SearchPhaseController interface (#23844)

SearchPhaseController is tighly coupled to AtomicArray which makes
non-dense representations of results very difficult. This commit removes
the coupling and cuts over to Collection rather than List to ensure no
order or random access lookup is implied.
This commit is contained in:
Simon Willnauer 2017-03-31 16:25:15 +02:00 committed by GitHub
parent a8250b26e7
commit 135eae42b9
5 changed files with 24 additions and 23 deletions

View File

@ -202,7 +202,7 @@ final class FetchSearchPhase extends SearchPhase {
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) { AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null, final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
sortedDocs, reducedQueryPhase, fetchResultsArr); sortedDocs, reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId))); context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId)));
} }

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
@ -61,14 +60,16 @@ import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.IntFunction;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
public class SearchPhaseController extends AbstractComponent { public final class SearchPhaseController extends AbstractComponent {
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0]; private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
@ -81,7 +82,7 @@ public class SearchPhaseController extends AbstractComponent {
this.scriptService = scriptService; this.scriptService = scriptService;
} }
public AggregatedDfs aggregateDfs(List<DfsSearchResult> results) { public AggregatedDfs aggregateDfs(Collection<DfsSearchResult> results) {
ObjectObjectHashMap<Term, TermStatistics> termStatistics = HppcMaps.newNoNullKeysMap(); ObjectObjectHashMap<Term, TermStatistics> termStatistics = HppcMaps.newNoNullKeysMap();
ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap(); ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap();
long aggMaxDoc = 0; long aggMaxDoc = 0;
@ -148,7 +149,7 @@ public class SearchPhaseController extends AbstractComponent {
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase. * Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param results Shard result holder * @param results Shard result holder
*/ */
public ScoreDoc[] sortDocs(boolean ignoreFrom, List<? extends SearchPhaseResult> results, int numShards) throws IOException { public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results, int numShards) throws IOException {
if (results.isEmpty()) { if (results.isEmpty()) {
return EMPTY_DOCS; return EMPTY_DOCS;
} }
@ -158,7 +159,7 @@ public class SearchPhaseController extends AbstractComponent {
int shardIndex = -1; int shardIndex = -1;
if (results.size() == 1) { if (results.size() == 1) {
canOptimize = true; canOptimize = true;
result = results.get(0).queryResult(); result = results.stream().findFirst().get().queryResult();
shardIndex = result.getShardIndex(); shardIndex = result.getShardIndex();
} else { } else {
boolean hasResult = false; boolean hasResult = false;
@ -176,7 +177,7 @@ public class SearchPhaseController extends AbstractComponent {
shardIndex = resultToOptimize.getShardIndex(); shardIndex = resultToOptimize.getShardIndex();
} }
} }
result = canOptimize ? resultToOptimize : results.get(0).queryResult(); result = canOptimize ? resultToOptimize : results.stream().findFirst().get().queryResult();
assert result != null; assert result != null;
} }
if (canOptimize) { if (canOptimize) {
@ -284,8 +285,7 @@ public class SearchPhaseController extends AbstractComponent {
} }
static <T extends TopDocs> void fillTopDocs(T[] shardTopDocs, static <T extends TopDocs> void fillTopDocs(T[] shardTopDocs,
List<? extends SearchPhaseResult> results, Collection<? extends SearchPhaseResult> results, T empytTopDocs) {
T empytTopDocs) {
if (results.size() != shardTopDocs.length) { if (results.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs // TopDocs#merge can't deal with null shard TopDocs
Arrays.fill(shardTopDocs, empytTopDocs); Arrays.fill(shardTopDocs, empytTopDocs);
@ -338,12 +338,11 @@ public class SearchPhaseController extends AbstractComponent {
*/ */
public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
ReducedQueryPhase reducedQueryPhase, ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) { Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
if (reducedQueryPhase.isEmpty()) { if (reducedQueryPhase.isEmpty()) {
return InternalSearchResponse.empty(); return InternalSearchResponse.empty();
} }
List<? extends SearchPhaseResult> fetchResults = fetchResultsArr.asList(); SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResults, resultsLookup);
SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResultsArr);
if (reducedQueryPhase.suggest != null) { if (reducedQueryPhase.suggest != null) {
if (!fetchResults.isEmpty()) { if (!fetchResults.isEmpty()) {
int currentOffset = hits.getHits().length; int currentOffset = hits.getHits().length;
@ -351,7 +350,7 @@ public class SearchPhaseController extends AbstractComponent {
final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions(); final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions();
for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) { for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) {
ScoreDoc shardDoc = sortedDocs[scoreDocIndex]; ScoreDoc shardDoc = sortedDocs[scoreDocIndex];
SearchPhaseResult searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); SearchPhaseResult searchResultProvider = resultsLookup.apply(shardDoc.shardIndex);
if (searchResultProvider == null) { if (searchResultProvider == null) {
continue; continue;
} }
@ -375,8 +374,7 @@ public class SearchPhaseController extends AbstractComponent {
} }
private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs, private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs,
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) { Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
List<? extends SearchPhaseResult> fetchResults = fetchResultsArr.asList();
boolean sorted = false; boolean sorted = false;
int sortScoreIndex = -1; int sortScoreIndex = -1;
if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) { if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) {
@ -406,7 +404,7 @@ public class SearchPhaseController extends AbstractComponent {
if (!fetchResults.isEmpty()) { if (!fetchResults.isEmpty()) {
for (int i = 0; i < numSearchHits; i++) { for (int i = 0; i < numSearchHits; i++) {
ScoreDoc shardDoc = sortedDocs[i]; ScoreDoc shardDoc = sortedDocs[i];
SearchPhaseResult fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); SearchPhaseResult fetchResultProvider = resultsLookup.apply(shardDoc.shardIndex);
if (fetchResultProvider == null) { if (fetchResultProvider == null) {
continue; continue;
} }
@ -435,7 +433,7 @@ public class SearchPhaseController extends AbstractComponent {
* Reduces the given query results and consumes all aggregations and profile results. * Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results * @param queryResults a list of non-null query shard results
*/ */
public final ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults) { public ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults) {
return reducedQueryPhase(queryResults, null, 0); return reducedQueryPhase(queryResults, null, 0);
} }
@ -448,7 +446,7 @@ public class SearchPhaseController extends AbstractComponent {
* @see QuerySearchResult#consumeAggs() * @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult() * @see QuerySearchResult#consumeProfileResult()
*/ */
private ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults, private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
List<InternalAggregations> bufferdAggs, int numReducePhases) { List<InternalAggregations> bufferdAggs, int numReducePhases) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase numReducePhases++; // increment for this phase
@ -461,7 +459,7 @@ public class SearchPhaseController extends AbstractComponent {
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null, return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
numReducePhases); numReducePhases);
} }
final QuerySearchResult firstResult = queryResults.get(0).queryResult(); final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null; final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults(); final boolean hasProfileResults = firstResult.hasProfileResults();
final boolean consumeAggs; final boolean consumeAggs;
@ -599,7 +597,7 @@ public class SearchPhaseController extends AbstractComponent {
/** /**
* Creates a new search response from the given merged hits. * Creates a new search response from the given merged hits.
* @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, AtomicArray) * @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, Collection, IntFunction)
*/ */
public InternalSearchResponse buildResponse(SearchHits hits) { public InternalSearchResponse buildResponse(SearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases); return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);

View File

@ -172,9 +172,10 @@ final class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
} }
private void innerFinishHim() throws Exception { private void innerFinishHim() throws Exception {
List<QueryFetchSearchResult> queryFetchSearchResults = queryFetchResults.asList();
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList(), queryFetchResults.length()); ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList(), queryFetchResults.length());
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs,
searchPhaseController.reducedQueryPhase(queryFetchResults.asList()), queryFetchResults); searchPhaseController.reducedQueryPhase(queryFetchSearchResults), queryFetchSearchResults, queryFetchResults::get);
String scrollId = null; String scrollId = null;
if (request.scroll() != null) { if (request.scroll() != null) {
scrollId = request.scrollId(); scrollId = request.scrollId();

View File

@ -222,7 +222,8 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) { private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) {
try { try {
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase, fetchResults); final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase,
fetchResults.asList(), fetchResults::get);
String scrollId = null; String scrollId = null;
if (request.scroll() != null) { if (request.scroll() != null) {
scrollId = request.scrollId(); scrollId = request.scrollId();

View File

@ -119,9 +119,10 @@ public class SearchPhaseControllerTests extends ESTestCase {
} }
} }
ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]); ScoreDoc[] sortedDocs = mergedScoreDocs.toArray(new ScoreDoc[mergedScoreDocs.size()]);
AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards, mergedSearchDocs, mergedSuggest);
InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs, InternalSearchResponse mergedResponse = searchPhaseController.merge(true, sortedDocs,
searchPhaseController.reducedQueryPhase(queryResults.asList()), searchPhaseController.reducedQueryPhase(queryResults.asList()),
generateFetchResults(nShards, mergedSearchDocs, mergedSuggest)); searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get);
assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length)); assertThat(mergedResponse.hits().getHits().length, equalTo(mergedSearchDocs.length));
Suggest suggestResult = mergedResponse.suggest(); Suggest suggestResult = mergedResponse.suggest();
for (Suggest.Suggestion<?> suggestion : mergedSuggest) { for (Suggest.Suggestion<?> suggestion : mergedSuggest) {