Minor search controller changes (#36479)

This commit contains a few minor changes to our search code:

- adjust the visibility of a couple of methods in our search code to package private from public or protected.
- make some of the `SearchPhaseController` methods static where possible
- rename one of the `SearchPhaseController#reducedQueryPhase` methods (used only for scroll requests) to `reducedScrollQueryPhase` without the `isScrollRequest` argument which was always set to `true`
- replace leniency in `SearchPhaseController#setShardIndex` with an assert to make sure that we never set the shard index twice
- remove two null checks where the checked field can never be null
- resolve an unchecked warning
- replace `List#toArray` invocation that creates an array providing the true size with array creation of length 0
- correct a couple of typos in comments
This commit is contained in:
Luca Cavanna 2018-12-11 20:24:29 +01:00 committed by GitHub
parent fb18b35347
commit dafea3cc23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 27 additions and 42 deletions

View File

@ -72,7 +72,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,

View File

@ -44,7 +44,7 @@ import java.util.stream.Stream;
* and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later
* referred to as the {@code shardIndex}.
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of
* distributed frequencies
*/
abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends SearchPhase {
@ -327,7 +327,6 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
}
/**
* Executed once all shard results have been received and processed
* @see #onShardFailure(int, SearchShardTarget, Exception)
@ -367,7 +366,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
abstract static class SearchPhaseResults<Result extends SearchPhaseResult> {
private final int numShards;
protected SearchPhaseResults(int numShards) {
SearchPhaseResults(int numShards) {
this.numShards = numShards;
}
/**

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.search;
import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.FieldDoc;
@ -154,7 +153,7 @@ public final class SearchPhaseController {
* @param from the offset into the search results top docs
* @param size the number of hits to return from the merged top docs
*/
public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
if (results.isEmpty()) {
return SortedTopDocs.EMPTY;
@ -214,7 +213,7 @@ public final class SearchPhaseController {
}
final boolean isSortedByField;
final SortField[] sortFields;
if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) {
if (mergedTopDocs instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
@ -230,11 +229,10 @@ public final class SearchPhaseController {
}
}
TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
if (results.isEmpty()) {
return null;
}
assert results.isEmpty() == false;
final boolean setShardIndex = false;
final TopDocs topDocs = results.stream().findFirst().get();
final TopDocs mergedTopDocs;
@ -259,12 +257,8 @@ public final class SearchPhaseController {
}
private static void setShardIndex(TopDocs topDocs, int shardIndex) {
assert topDocs.scoreDocs.length == 0 || topDocs.scoreDocs[0].shardIndex == -1 : "shardIndex is already set";
for (ScoreDoc doc : topDocs.scoreDocs) {
if (doc.shardIndex != -1) {
// once there is a single shard index initialized all others will be initialized too
// there are many asserts down in lucene land that this is actually true. we can shortcut it here.
return;
}
doc.shardIndex = shardIndex;
}
}
@ -283,7 +277,6 @@ public final class SearchPhaseController {
}
}
return lastEmittedDocPerShard;
}
/**
@ -402,15 +395,15 @@ public final class SearchPhaseController {
hits.add(searchHit);
}
}
return new SearchHits(hits.toArray(new SearchHit[hits.size()]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
}
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
return reducedQueryPhase(queryResults, isScrollRequest, true);
public ReducedQueryPhase reducedScrollQueryPhase(Collection<? extends SearchPhaseResult> queryResults) {
return reducedQueryPhase(queryResults, true, true);
}
/**
@ -422,7 +415,6 @@ public final class SearchPhaseController {
return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest);
}
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
@ -507,15 +499,13 @@ public final class SearchPhaseController {
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final SortedTopDocs scoreDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
firstResult != null ? firstResult.sortValueFormats() : null,
numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null);
firstResult.sortValueFormats(), numReducePhases, scoreDocs.isSortedByField, size, from, false);
}
/**
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
@ -526,7 +516,7 @@ public final class SearchPhaseController {
null, reduceContext);
}
private InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
if (pipelineAggregators != null) {
@ -657,7 +647,6 @@ public final class SearchPhaseController {
this.hasTopDocs = hasTopDocs;
this.hasAggs = hasAggs;
this.bufferSize = bufferSize;
}
@Override
@ -675,10 +664,9 @@ public final class SearchPhaseController {
aggsBuffer[0] = reducedAggs;
}
if (hasTopDocs) {
TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer),
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
querySearchResult.from() + querySearchResult.size()
, 0);
querySearchResult.from() + querySearchResult.size(), 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
}
@ -692,7 +680,7 @@ public final class SearchPhaseController {
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs);
SearchPhaseController.setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
}
@ -705,7 +693,6 @@ public final class SearchPhaseController {
return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null;
}
@Override
public ReducedQueryPhase reduce() {
return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
@ -739,7 +726,7 @@ public final class SearchPhaseController {
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
}
}
return new InitialSearchPhase.ArraySearchPhaseResults(numShards) {
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
public ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);

View File

@ -52,7 +52,7 @@ final class SearchScrollQueryAndFetchAsyncAction extends SearchScrollAsyncAction
@Override
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
return sendResponsePhase(searchPhaseController.reducedQueryPhase(queryFetchResults.asList(), true), queryFetchResults);
return sendResponsePhase(searchPhaseController.reducedScrollQueryPhase(queryFetchResults.asList()), queryFetchResults);
}
@Override

View File

@ -69,8 +69,8 @@ final class SearchScrollQueryThenFetchAsyncAction extends SearchScrollAsyncActio
return new SearchPhase("fetch") {
@Override
public void run() throws IOException {
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(
queryResults.asList(), true);
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(
queryResults.asList());
if (reducedQueryPhase.scoreDocs.length == 0) {
sendResponse(reducedQueryPhase, fetchResults);
return;

View File

@ -25,7 +25,7 @@ import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.TransportResponse;
/**
* This class is a base class for all search releated results. It contains the shard target it
* This class is a base class for all search related results. It contains the shard target it
* was executed against, a shard index used to reference the result on the coordinating node
* and a request ID that is used to reference the request context on the executing node. The
* request ID is particularly important since it is used to reference and maintain a context

View File

@ -65,7 +65,6 @@ public class InternalTopHits extends InternalAggregation implements TopHits {
from = in.readVInt();
size = in.readVInt();
topDocs = Lucene.readTopDocs(in);
assert topDocs != null;
searchHits = SearchHits.readSearchHits(in);
}

View File

@ -30,6 +30,8 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
@ -38,8 +40,6 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.Suggest;
@ -73,7 +73,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
}
public void testSort() throws Exception {
public void testSort() {
List<CompletionSuggestion> suggestions = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20), false));
@ -88,7 +88,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
size = first.get().queryResult().size();
}
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
from, size)
.scoreDocs;
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
@ -113,12 +113,12 @@ public class SearchPhaseControllerTests extends ESTestCase {
size = first.get().queryResult().size();
}
SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats();
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
useConstantScore);
SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats();
ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
assertEquals(sortedDocs.length, sortedDocs2.length);
for (int i = 0; i < sortedDocs.length; i++) {
assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc);