Simplify sorted top docs merging in SearchPhaseController (#23881)
Today we have several code paths to merge top docs based on the number of search results returned from the shards. If there is a only a single shard holding any hits we go a different code path with quite some complexity while if there are more than one the code is basically duplicated to safe the creation of a dense array of top docs which can be large if there are many results. This commit removes the need of the dense array and in-turn the justification for the optimization. This commit introduces a single code path to merge top docs.
This commit is contained in:
parent
75b4f408e0
commit
adccdbb3cf
|
@ -98,7 +98,7 @@ final class FetchSearchPhase extends SearchPhase {
|
|||
final int numShards = context.getNumShards();
|
||||
final boolean isScrollSearch = context.getRequest().scroll() != null;
|
||||
List<SearchPhaseResult> phaseResults = queryResults.asList();
|
||||
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults, context.getNumShards());
|
||||
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults);
|
||||
String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
|
||||
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
|
||||
final boolean queryAndFetchOptimization = queryResults.length() == 1;
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.lucene.search.TopFieldDocs;
|
|||
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
|
||||
import org.elasticsearch.common.collect.HppcMaps;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
@ -147,156 +146,109 @@ public final class SearchPhaseController extends AbstractComponent {
|
|||
*
|
||||
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
|
||||
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
|
||||
* @param results Shard result holder
|
||||
* @param results the search phase results to obtain the sort docs from
|
||||
*/
|
||||
public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results, int numShards) throws IOException {
|
||||
public ScoreDoc[] sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results) throws IOException {
|
||||
if (results.isEmpty()) {
|
||||
return EMPTY_DOCS;
|
||||
}
|
||||
|
||||
final QuerySearchResult result;
|
||||
boolean canOptimize = false;
|
||||
int shardIndex = -1;
|
||||
if (results.size() == 1) {
|
||||
canOptimize = true;
|
||||
result = results.stream().findFirst().get().queryResult();
|
||||
shardIndex = result.getShardIndex();
|
||||
} else {
|
||||
boolean hasResult = false;
|
||||
QuerySearchResult resultToOptimize = null;
|
||||
// lets see if we only got hits from a single shard, if so, we can optimize...
|
||||
for (SearchPhaseResult entry : results) {
|
||||
if (entry.queryResult().hasHits()) {
|
||||
if (hasResult) { // we already have one, can't really optimize
|
||||
canOptimize = false;
|
||||
break;
|
||||
}
|
||||
canOptimize = true;
|
||||
hasResult = true;
|
||||
resultToOptimize = entry.queryResult();
|
||||
shardIndex = resultToOptimize.getShardIndex();
|
||||
}
|
||||
}
|
||||
result = canOptimize ? resultToOptimize : results.stream().findFirst().get().queryResult();
|
||||
assert result != null;
|
||||
}
|
||||
if (canOptimize) {
|
||||
int offset = result.from();
|
||||
if (ignoreFrom) {
|
||||
offset = 0;
|
||||
}
|
||||
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
|
||||
ScoreDoc[] docs;
|
||||
int numSuggestDocs = 0;
|
||||
final Suggest suggest = result.queryResult().suggest();
|
||||
final List<CompletionSuggestion> completionSuggestions;
|
||||
if (suggest != null) {
|
||||
completionSuggestions = suggest.filter(CompletionSuggestion.class);
|
||||
for (CompletionSuggestion suggestion : completionSuggestions) {
|
||||
numSuggestDocs += suggestion.getOptions().size();
|
||||
}
|
||||
} else {
|
||||
completionSuggestions = Collections.emptyList();
|
||||
}
|
||||
int docsOffset = 0;
|
||||
if (scoreDocs.length == 0 || scoreDocs.length < offset) {
|
||||
docs = new ScoreDoc[numSuggestDocs];
|
||||
} else {
|
||||
int resultDocsSize = result.size();
|
||||
if ((scoreDocs.length - offset) < resultDocsSize) {
|
||||
resultDocsSize = scoreDocs.length - offset;
|
||||
}
|
||||
docs = new ScoreDoc[resultDocsSize + numSuggestDocs];
|
||||
for (int i = 0; i < resultDocsSize; i++) {
|
||||
ScoreDoc scoreDoc = scoreDocs[offset + i];
|
||||
scoreDoc.shardIndex = shardIndex;
|
||||
docs[i] = scoreDoc;
|
||||
docsOffset++;
|
||||
}
|
||||
}
|
||||
for (CompletionSuggestion suggestion: completionSuggestions) {
|
||||
for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
|
||||
ScoreDoc doc = option.getDoc();
|
||||
doc.shardIndex = shardIndex;
|
||||
docs[docsOffset++] = doc;
|
||||
}
|
||||
}
|
||||
return docs;
|
||||
}
|
||||
|
||||
final int topN = result.queryResult().size();
|
||||
final int from = ignoreFrom ? 0 : result.queryResult().from();
|
||||
|
||||
final TopDocs mergedTopDocs;
|
||||
if (result.queryResult().topDocs() instanceof CollapseTopFieldDocs) {
|
||||
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) result.queryResult().topDocs();
|
||||
final Sort sort = new Sort(firstTopDocs.fields);
|
||||
final CollapseTopFieldDocs[] shardTopDocs = new CollapseTopFieldDocs[numShards];
|
||||
fillTopDocs(shardTopDocs, results, new CollapseTopFieldDocs(firstTopDocs.field, 0, new FieldDoc[0],
|
||||
sort.getSort(), new Object[0], Float.NaN));
|
||||
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs, true);
|
||||
} else if (result.queryResult().topDocs() instanceof TopFieldDocs) {
|
||||
TopFieldDocs firstTopDocs = (TopFieldDocs) result.queryResult().topDocs();
|
||||
final Sort sort = new Sort(firstTopDocs.fields);
|
||||
final TopFieldDocs[] shardTopDocs = new TopFieldDocs[numShards];
|
||||
fillTopDocs(shardTopDocs, results, new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN));
|
||||
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs, true);
|
||||
} else {
|
||||
final TopDocs[] shardTopDocs = new TopDocs[numShards];
|
||||
fillTopDocs(shardTopDocs, results, Lucene.EMPTY_TOP_DOCS);
|
||||
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, true);
|
||||
}
|
||||
|
||||
ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs;
|
||||
final Collection<TopDocs> topDocs = new ArrayList<>();
|
||||
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
|
||||
// group suggestions and assign shard index
|
||||
int from = -1;
|
||||
int size = -1;
|
||||
for (SearchPhaseResult sortedResult : results) {
|
||||
Suggest shardSuggest = sortedResult.queryResult().suggest();
|
||||
if (shardSuggest != null) {
|
||||
for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
|
||||
suggestion.setShardIndex(sortedResult.getShardIndex());
|
||||
List<Suggestion<CompletionSuggestion.Entry>> suggestions =
|
||||
groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
|
||||
suggestions.add(suggestion);
|
||||
/* We loop over all results once, group together the completion suggestions if there are any and collect relevant
|
||||
* top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road
|
||||
* this allowed to remove a single shared optimization code here since now we don't materialized a dense array of
|
||||
* top docs anymore but instead only pass relevant results / top docs to the merge method*/
|
||||
QuerySearchResult queryResult = sortedResult.queryResult();
|
||||
if (queryResult.hasHits()) {
|
||||
from = queryResult.from();
|
||||
size = queryResult.size();
|
||||
TopDocs td = queryResult.topDocs();
|
||||
if (td != null && td.scoreDocs.length > 0) {
|
||||
setShardIndex(td, queryResult.getShardIndex());
|
||||
topDocs.add(td);
|
||||
}
|
||||
Suggest shardSuggest = queryResult.suggest();
|
||||
if (shardSuggest != null) {
|
||||
for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
|
||||
suggestion.setShardIndex(sortedResult.getShardIndex());
|
||||
List<Suggestion<CompletionSuggestion.Entry>> suggestions =
|
||||
groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
|
||||
suggestions.add(suggestion);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (groupedCompletionSuggestions.isEmpty() == false) {
|
||||
int numSuggestDocs = 0;
|
||||
List<Suggestion<? extends Entry<? extends Entry.Option>>> completionSuggestions =
|
||||
new ArrayList<>(groupedCompletionSuggestions.size());
|
||||
for (List<Suggestion<CompletionSuggestion.Entry>> groupedSuggestions : groupedCompletionSuggestions.values()) {
|
||||
final CompletionSuggestion completionSuggestion = CompletionSuggestion.reduceTo(groupedSuggestions);
|
||||
assert completionSuggestion != null;
|
||||
numSuggestDocs += completionSuggestion.getOptions().size();
|
||||
completionSuggestions.add(completionSuggestion);
|
||||
}
|
||||
scoreDocs = new ScoreDoc[mergedTopDocs.scoreDocs.length + numSuggestDocs];
|
||||
System.arraycopy(mergedTopDocs.scoreDocs, 0, scoreDocs, 0, mergedTopDocs.scoreDocs.length);
|
||||
int offset = mergedTopDocs.scoreDocs.length;
|
||||
Suggest suggestions = new Suggest(completionSuggestions);
|
||||
for (CompletionSuggestion completionSuggestion : suggestions.filter(CompletionSuggestion.class)) {
|
||||
for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
|
||||
scoreDocs[offset++] = option.getDoc();
|
||||
if (size != -1) {
|
||||
final ScoreDoc[] mergedScoreDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from);
|
||||
ScoreDoc[] scoreDocs = mergedScoreDocs;
|
||||
if (groupedCompletionSuggestions.isEmpty() == false) {
|
||||
int numSuggestDocs = 0;
|
||||
List<Suggestion<? extends Entry<? extends Entry.Option>>> completionSuggestions =
|
||||
new ArrayList<>(groupedCompletionSuggestions.size());
|
||||
for (List<Suggestion<CompletionSuggestion.Entry>> groupedSuggestions : groupedCompletionSuggestions.values()) {
|
||||
final CompletionSuggestion completionSuggestion = CompletionSuggestion.reduceTo(groupedSuggestions);
|
||||
assert completionSuggestion != null;
|
||||
numSuggestDocs += completionSuggestion.getOptions().size();
|
||||
completionSuggestions.add(completionSuggestion);
|
||||
}
|
||||
scoreDocs = new ScoreDoc[mergedScoreDocs.length + numSuggestDocs];
|
||||
System.arraycopy(mergedScoreDocs, 0, scoreDocs, 0, mergedScoreDocs.length);
|
||||
int offset = mergedScoreDocs.length;
|
||||
Suggest suggestions = new Suggest(completionSuggestions);
|
||||
for (CompletionSuggestion completionSuggestion : suggestions.filter(CompletionSuggestion.class)) {
|
||||
for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
|
||||
scoreDocs[offset++] = option.getDoc();
|
||||
}
|
||||
}
|
||||
}
|
||||
return scoreDocs;
|
||||
} else {
|
||||
// no relevant docs - just return an empty array
|
||||
return EMPTY_DOCS;
|
||||
}
|
||||
return scoreDocs;
|
||||
}
|
||||
|
||||
static <T extends TopDocs> void fillTopDocs(T[] shardTopDocs,
|
||||
Collection<? extends SearchPhaseResult> results, T empytTopDocs) {
|
||||
if (results.size() != shardTopDocs.length) {
|
||||
// TopDocs#merge can't deal with null shard TopDocs
|
||||
Arrays.fill(shardTopDocs, empytTopDocs);
|
||||
private ScoreDoc[] mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
|
||||
if (results.isEmpty()) {
|
||||
return EMPTY_DOCS;
|
||||
}
|
||||
for (SearchPhaseResult resultProvider : results) {
|
||||
final T topDocs = (T) resultProvider.queryResult().topDocs();
|
||||
assert topDocs != null : "top docs must not be null in a valid result";
|
||||
// the 'index' field is the position in the resultsArr atomic array
|
||||
shardTopDocs[resultProvider.getShardIndex()] = topDocs;
|
||||
final boolean setShardIndex = false;
|
||||
final TopDocs topDocs = results.stream().findFirst().get();
|
||||
final TopDocs mergedTopDocs;
|
||||
final int numShards = results.size();
|
||||
if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them.
|
||||
return topDocs.scoreDocs;
|
||||
} else if (topDocs instanceof CollapseTopFieldDocs) {
|
||||
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) topDocs;
|
||||
final Sort sort = new Sort(firstTopDocs.fields);
|
||||
final CollapseTopFieldDocs[] shardTopDocs = results.toArray(new CollapseTopFieldDocs[numShards]);
|
||||
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs, setShardIndex);
|
||||
} else if (topDocs instanceof TopFieldDocs) {
|
||||
TopFieldDocs firstTopDocs = (TopFieldDocs) topDocs;
|
||||
final Sort sort = new Sort(firstTopDocs.fields);
|
||||
final TopFieldDocs[] shardTopDocs = results.toArray(new TopFieldDocs[numShards]);
|
||||
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs, setShardIndex);
|
||||
} else {
|
||||
final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]);
|
||||
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, setShardIndex);
|
||||
}
|
||||
return mergedTopDocs.scoreDocs;
|
||||
}
|
||||
|
||||
private static void setShardIndex(TopDocs topDocs, int shardIndex) {
|
||||
for (ScoreDoc doc : topDocs.scoreDocs) {
|
||||
if (doc.shardIndex != -1) {
|
||||
// once there is a single shard index initialized all others will be initialized too
|
||||
// there are many asserts down in lucene land that this is actually true. we can shortcut it here.
|
||||
return;
|
||||
}
|
||||
doc.shardIndex = shardIndex;
|
||||
}
|
||||
}
|
||||
|
||||
public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
|
||||
ScoreDoc[] sortedScoreDocs, int numShards) {
|
||||
ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
|
||||
|
|
|
@ -173,7 +173,7 @@ final class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
|
|||
|
||||
private void innerFinishHim() throws Exception {
|
||||
List<QueryFetchSearchResult> queryFetchSearchResults = queryFetchResults.asList();
|
||||
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList(), queryFetchResults.length());
|
||||
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList());
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs,
|
||||
searchPhaseController.reducedQueryPhase(queryFetchSearchResults), queryFetchSearchResults, queryFetchResults::get);
|
||||
String scrollId = null;
|
||||
|
|
|
@ -171,7 +171,7 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
|
|||
}
|
||||
|
||||
private void executeFetchPhase() throws Exception {
|
||||
sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList(), queryResults.length());
|
||||
sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList());
|
||||
if (sortedShardDocs.length == 0) {
|
||||
finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList()));
|
||||
return;
|
||||
|
|
|
@ -75,7 +75,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
int nShards = randomIntBetween(1, 20);
|
||||
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
|
||||
AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, suggestions, queryResultSize, false);
|
||||
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), nShards);
|
||||
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList());
|
||||
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
|
||||
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
|
||||
int suggestionSize = suggestion.getEntries().get(0).getOptions().size();
|
||||
|
@ -90,9 +90,9 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize,
|
||||
randomBoolean() || true);
|
||||
boolean ignoreFrom = randomBoolean();
|
||||
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), nShards);
|
||||
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList());
|
||||
|
||||
ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), nShards);
|
||||
ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList());
|
||||
assertArrayEquals(sortedDocs, sortedDocs2);
|
||||
}
|
||||
|
||||
|
@ -354,31 +354,4 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testFillTopDocs() {
|
||||
final int maxIters = randomIntBetween(5, 15);
|
||||
for (int iters = 0; iters < maxIters; iters++) {
|
||||
TopDocs[] topDocs = new TopDocs[randomIntBetween(2, 100)];
|
||||
int numShards = topDocs.length;
|
||||
AtomicArray<SearchPhaseResult> resultProviderAtomicArray = generateQueryResults(numShards, Collections.emptyList(),
|
||||
2, randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
int maxNull = randomIntBetween(1, topDocs.length - 1);
|
||||
for (int i = 0; i < maxNull; i++) {
|
||||
resultProviderAtomicArray.set(randomIntBetween(0, resultProviderAtomicArray.length() - 1), null);
|
||||
}
|
||||
}
|
||||
SearchPhaseController.fillTopDocs(topDocs, resultProviderAtomicArray.asList(), Lucene.EMPTY_TOP_DOCS);
|
||||
for (int i = 0; i < topDocs.length; i++) {
|
||||
assertNotNull(topDocs[i]);
|
||||
if (topDocs[i] == Lucene.EMPTY_TOP_DOCS) {
|
||||
assertNull(resultProviderAtomicArray.get(i));
|
||||
} else {
|
||||
assertNotNull(resultProviderAtomicArray.get(i));
|
||||
assertNotNull(resultProviderAtomicArray.get(i).queryResult());
|
||||
assertSame(resultProviderAtomicArray.get(i).queryResult().topDocs(), topDocs[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue