From c09773a76ec015cc87027db6098f0c571fb5c710 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 26 Feb 2019 11:41:22 +0100 Subject: [PATCH] Completion suggestions to be reduced once instead of twice (#39255) We have been calling `reduce` against completion suggestions twice, once in `SearchPhaseController#reducedQueryPhase` where all suggestions get reduced, and once more in `SearchPhaseController#sortDocs` where we add the top completion suggestions to the `TopDocs` so their docs can be fetched. There is no need to do reduction twice. All suggestions can be reduced in one call, then we can filter the result and pass only the already reduced completion suggestions over to `sortDocs`. The small important detail is that `shardIndex`, which is currently used only to fetch suggestions hits, needs to be set before the first reduction, hence outside of `sortDocs` where we have been doing it until now. --- .../action/search/SearchPhaseController.java | 46 +++--- .../elasticsearch/search/suggest/Suggest.java | 2 +- .../completion/CompletionSuggestion.java | 14 +- .../search/SearchPhaseControllerTests.java | 135 ++++++++++++++++-- .../completion/CompletionSuggestionTests.java | 6 +- 5 files changed, 150 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index d7387ac69bd..d370f60d98c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -54,7 +54,6 @@ import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest.Suggestion; -import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import java.util.ArrayList; @@ -155,12 +154,12 @@ public final class SearchPhaseController { * @param size the number of hits to return from the merged top docs */ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection results, - final Collection bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) { + final Collection bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size, + List reducedCompletionSuggestions) { if (results.isEmpty()) { return SortedTopDocs.EMPTY; } final Collection topDocs = bufferedTopDocs == null ? new ArrayList<>() : bufferedTopDocs; - final Map>> groupedCompletionSuggestions = new HashMap<>(); for (SearchPhaseResult sortedResult : results) { // TODO we can move this loop into the reduce call to only loop over this once /* We loop over all results once, group together the completion suggestions if there are any and collect relevant * top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road @@ -177,36 +176,22 @@ public final class SearchPhaseController { topDocs.add(td.topDocs); } } - if (queryResult.hasSuggestHits()) { - Suggest shardSuggest = queryResult.suggest(); - for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { - suggestion.setShardIndex(sortedResult.getShardIndex()); - List> suggestions = - groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); - suggestions.add(suggestion); - } - } } - final boolean hasHits = (groupedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false; + final boolean hasHits = (reducedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false; if (hasHits) { final TopDocs mergedTopDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from); final ScoreDoc[] mergedScoreDocs = mergedTopDocs == null ? EMPTY_DOCS : mergedTopDocs.scoreDocs; ScoreDoc[] scoreDocs = mergedScoreDocs; - if (groupedCompletionSuggestions.isEmpty() == false) { + if (reducedCompletionSuggestions.isEmpty() == false) { int numSuggestDocs = 0; - List>> completionSuggestions = - new ArrayList<>(groupedCompletionSuggestions.size()); - for (List> groupedSuggestions : groupedCompletionSuggestions.values()) { - final CompletionSuggestion completionSuggestion = CompletionSuggestion.reduceTo(groupedSuggestions); + for (CompletionSuggestion completionSuggestion : reducedCompletionSuggestions) { assert completionSuggestion != null; numSuggestDocs += completionSuggestion.getOptions().size(); - completionSuggestions.add(completionSuggestion); } scoreDocs = new ScoreDoc[mergedScoreDocs.length + numSuggestDocs]; System.arraycopy(mergedScoreDocs, 0, scoreDocs, 0, mergedScoreDocs.length); int offset = mergedScoreDocs.length; - Suggest suggestions = new Suggest(completionSuggestions); - for (CompletionSuggestion completionSuggestion : suggestions.filter(CompletionSuggestion.class)) { + for (CompletionSuggestion completionSuggestion : reducedCompletionSuggestions) { for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) { scoreDocs[offset++] = option.getDoc(); } @@ -479,6 +464,10 @@ public final class SearchPhaseController { for (Suggestion> suggestion : result.suggest()) { List suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); suggestionList.add(suggestion); + if (suggestion instanceof CompletionSuggestion) { + CompletionSuggestion completionSuggestion = (CompletionSuggestion) suggestion; + completionSuggestion.setShardIndex(result.getShardIndex()); + } } } if (consumeAggs) { @@ -489,15 +478,24 @@ public final class SearchPhaseController { profileResults.put(key, result.consumeProfileResult()); } } - final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); + final Suggest reducedSuggest; + final List reducedCompletionSuggestions; + if (groupedSuggestions.isEmpty()) { + reducedSuggest = null; + reducedCompletionSuggestions = Collections.emptyList(); + } else { + reducedSuggest = new Suggest(Suggest.reduce(groupedSuggestions)); + reducedCompletionSuggestions = reducedSuggest.filter(CompletionSuggestion.class); + } ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce); final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, firstResult.pipelineAggregators(), reduceContext); final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); - final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size); + final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size, + reducedCompletionSuggestions); final TotalHits totalHits = topDocsStats.getTotalHits(); return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(), - topDocsStats.timedOut, topDocsStats.terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs, + topDocsStats.timedOut, topDocsStats.terminatedEarly, reducedSuggest, aggregations, shardResults, sortedTopDocs, firstResult.sortValueFormats(), numReducePhases, size, from, false); } diff --git a/server/src/main/java/org/elasticsearch/search/suggest/Suggest.java b/server/src/main/java/org/elasticsearch/search/suggest/Suggest.java index 8d78116e15a..d17f00fdc95 100644 --- a/server/src/main/java/org/elasticsearch/search/suggest/Suggest.java +++ b/server/src/main/java/org/elasticsearch/search/suggest/Suggest.java @@ -202,7 +202,7 @@ public class Suggest implements Iterable>> reduce(Map> groupedSuggestions) { List>> reduced = new ArrayList<>(groupedSuggestions.size()); - for (java.util.Map.Entry> unmergedResults : groupedSuggestions.entrySet()) { + for (Map.Entry> unmergedResults : groupedSuggestions.entrySet()) { List value = unmergedResults.getValue(); Class suggestionClass = null; for (Suggestion suggestion : value) { diff --git a/server/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java b/server/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java index bc2e2abdb7d..8b80e3326a8 100644 --- a/server/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java +++ b/server/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestion.java @@ -169,11 +169,8 @@ public final class CompletionSuggestion extends Suggest.SuggestiontoReduce - */ - public static CompletionSuggestion reduceTo(List> toReduce) { + @Override + public CompletionSuggestion reduce(List> toReduce) { if (toReduce.isEmpty()) { return null; } else { @@ -209,7 +206,7 @@ public final class CompletionSuggestion extends Suggest.Suggestion= size) { break; @@ -223,11 +220,6 @@ public final class CompletionSuggestion extends Suggest.Suggestion reduce(List> toReduce) { - return reduceTo(toReduce); - } - public void setShardIndex(int shardIndex) { if (entries.isEmpty() == false) { for (Entry.Option option : getOptions()) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 9107b75db17..1d6b6cd30ed 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -51,12 +51,16 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.suggest.SortBy; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; +import org.elasticsearch.search.suggest.phrase.PhraseSuggestion; +import org.elasticsearch.search.suggest.term.TermSuggestion; import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -87,7 +91,7 @@ public class SearchPhaseControllerTests extends ESTestCase { }); } - public void testSort() { + public void testSortDocs() { List suggestions = new ArrayList<>(); for (int i = 0; i < randomIntBetween(1, 5); i++) { suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20), false)); @@ -102,16 +106,18 @@ public class SearchPhaseControllerTests extends ESTestCase { size = first.get().queryResult().size(); } int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); - ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, - new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE), from, size).scoreDocs; - for (Suggest.Suggestion suggestion : reducedSuggest(results)) { + List reducedCompletionSuggestions = reducedSuggest(results); + for (Suggest.Suggestion suggestion : reducedCompletionSuggestions) { int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); accumulatedLength += suggestionSize; } + ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, + new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE), from, size, + reducedCompletionSuggestions).scoreDocs; assertThat(sortedDocs.length, equalTo(accumulatedLength)); } - public void testSortIsIdempotent() throws Exception { + public void testSortDocsIsIdempotent() throws Exception { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); long randomSeed = randomLong(); @@ -126,12 +132,14 @@ public class SearchPhaseControllerTests extends ESTestCase { size = first.get().queryResult().size(); } SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE); - ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs; + ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size, + Collections.emptyList()).scoreDocs; results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, useConstantScore); SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(SearchContext.TRACK_TOTAL_HITS_ACCURATE); - ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs; + ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size, + Collections.emptyList()).scoreDocs; assertEquals(sortedDocs.length, sortedDocs2.length); for (int i = 0; i < sortedDocs.length; i++) { assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc); @@ -204,9 +212,16 @@ public class SearchPhaseControllerTests extends ESTestCase { } } - private static AtomicArray generateQueryResults(int nShards, - List suggestions, - int searchHitsSize, boolean useConstantScore) { + /** + * Generate random query results received from the provided number of shards, including the provided + * number of search hits and randomly generated completion suggestions based on the name and size of the provided ones. + * Note that shardIndex is already set to the generated completion suggestions to simulate what + * {@link SearchPhaseController#reducedQueryPhase(Collection, boolean, int, boolean)} does, meaning that the returned query results + * can be fed directly to + * {@link SearchPhaseController#sortDocs(boolean, Collection, Collection, SearchPhaseController.TopDocsStats, int, int, List)} + */ + private static AtomicArray generateQueryResults(int nShards, List suggestions, + int searchHitsSize, boolean useConstantScore) { AtomicArray queryResults = new AtomicArray<>(nShards); for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { String clusterAlias = randomBoolean() ? null : "remote"; @@ -265,17 +280,17 @@ public class SearchPhaseControllerTests extends ESTestCase { return resultCount; } - private static Suggest reducedSuggest(AtomicArray results) { + private static List reducedSuggest(AtomicArray results) { Map>> groupedSuggestion = new HashMap<>(); for (SearchPhaseResult entry : results.asList()) { for (Suggest.Suggestion suggestion : entry.queryResult().suggest()) { List> suggests = groupedSuggestion.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); - suggests.add((Suggest.Suggestion) suggestion); + suggests.add((CompletionSuggestion) suggestion); } } - return new Suggest(groupedSuggestion.values().stream().map(CompletionSuggestion::reduceTo) - .collect(Collectors.toList())); + CompletionSuggestion completionSuggestion = new CompletionSuggestion(null, -1, randomBoolean()); + return groupedSuggestion.values().stream().map(completionSuggestion::reduce).collect(Collectors.toList()); } private static AtomicArray generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { @@ -645,4 +660,96 @@ public class SearchPhaseControllerTests extends ESTestCase { assertEquals("field", reduce.sortedTopDocs.collapseField); assertArrayEquals(collapseValues, reduce.sortedTopDocs.collapseValues); } + + public void testConsumerSuggestions() { + int expectedNumResults = randomIntBetween(1, 100); + int bufferSize = randomIntBetween(2, 200); + SearchRequest request = randomSearchRequest(); + request.setBatchedReduceSize(bufferSize); + InitialSearchPhase.ArraySearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + int maxScoreTerm = -1; + int maxScorePhrase = -1; + int maxScoreCompletion = -1; + for (int i = 0; i < expectedNumResults; i++) { + QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), + null, OriginalIndices.NONE)); + List>> suggestions = + new ArrayList<>(); + { + TermSuggestion termSuggestion = new TermSuggestion("term", 1, SortBy.SCORE); + TermSuggestion.Entry entry = new TermSuggestion.Entry(new Text("entry"), 0, 10); + int numOptions = randomIntBetween(1, 10); + for (int j = 0; j < numOptions; j++) { + int score = numOptions - j; + maxScoreTerm = Math.max(maxScoreTerm, score); + entry.addOption(new TermSuggestion.Entry.Option(new Text("option"), randomInt(), score)); + } + termSuggestion.addTerm(entry); + suggestions.add(termSuggestion); + } + { + PhraseSuggestion phraseSuggestion = new PhraseSuggestion("phrase", 1); + PhraseSuggestion.Entry entry = new PhraseSuggestion.Entry(new Text("entry"), 0, 10); + int numOptions = randomIntBetween(1, 10); + for (int j = 0; j < numOptions; j++) { + int score = numOptions - j; + maxScorePhrase = Math.max(maxScorePhrase, score); + entry.addOption(new PhraseSuggestion.Entry.Option(new Text("option"), new Text("option"), score)); + } + phraseSuggestion.addTerm(entry); + suggestions.add(phraseSuggestion); + } + { + CompletionSuggestion completionSuggestion = new CompletionSuggestion("completion", 1, false); + CompletionSuggestion.Entry entry = new CompletionSuggestion.Entry(new Text("entry"), 0, 10); + int numOptions = randomIntBetween(1, 10); + for (int j = 0; j < numOptions; j++) { + int score = numOptions - j; + maxScoreCompletion = Math.max(maxScoreCompletion, score); + CompletionSuggestion.Entry.Option option = new CompletionSuggestion.Entry.Option(j, + new Text("option"), score, Collections.emptyMap()); + entry.addOption(option); + } + completionSuggestion.addTerm(entry); + suggestions.add(completionSuggestion); + } + result.suggest(new Suggest(suggestions)); + result.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]); + result.setShardIndex(i); + result.size(0); + consumer.consumeResult(result); + } + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertEquals(3, reduce.suggest.size()); + { + TermSuggestion term = reduce.suggest.getSuggestion("term"); + assertEquals(1, term.getEntries().size()); + assertEquals(1, term.getEntries().get(0).getOptions().size()); + assertEquals(maxScoreTerm, term.getEntries().get(0).getOptions().get(0).getScore(), 0f); + } + { + PhraseSuggestion phrase = reduce.suggest.getSuggestion("phrase"); + assertEquals(1, phrase.getEntries().size()); + assertEquals(1, phrase.getEntries().get(0).getOptions().size()); + assertEquals(maxScorePhrase, phrase.getEntries().get(0).getOptions().get(0).getScore(), 0f); + } + { + CompletionSuggestion completion = reduce.suggest.getSuggestion("completion"); + assertEquals(1, completion.getSize()); + assertEquals(1, completion.getOptions().size()); + CompletionSuggestion.Entry.Option option = completion.getOptions().get(0); + assertEquals(maxScoreCompletion, option.getScore(), 0f); + } + assertFinalReduction(request); + assertEquals(1, reduce.sortedTopDocs.scoreDocs.length); + assertEquals(maxScoreCompletion, reduce.sortedTopDocs.scoreDocs[0].score, 0f); + assertEquals(0, reduce.sortedTopDocs.scoreDocs[0].doc); + assertNotEquals(-1, reduce.sortedTopDocs.scoreDocs[0].shardIndex); + assertEquals(0, reduce.totalHits.value); + assertFalse(reduce.sortedTopDocs.isSortedByField); + assertNull(reduce.sortedTopDocs.sortFields); + assertNull(reduce.sortedTopDocs.collapseField); + assertNull(reduce.sortedTopDocs.collapseValues); + } } diff --git a/server/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java b/server/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java index 8f7b731f7ae..65f59b241e0 100644 --- a/server/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java +++ b/server/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java @@ -34,7 +34,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; public class CompletionSuggestionTests extends ESTestCase { - public void testToReduce() throws Exception { + public void testToReduce() { List> shardSuggestions = new ArrayList<>(); int nShards = randomIntBetween(1, 10); String name = randomAlphaOfLength(10); @@ -54,7 +54,7 @@ public class CompletionSuggestionTests extends ESTestCase { maxScore - i, Collections.emptyMap())); } } - CompletionSuggestion reducedSuggestion = CompletionSuggestion.reduceTo(shardSuggestions); + CompletionSuggestion reducedSuggestion = (CompletionSuggestion) shardSuggestions.get(0).reduce(shardSuggestions); assertNotNull(reducedSuggestion); assertThat(reducedSuggestion.getOptions().size(), lessThanOrEqualTo(size)); int count = 0; @@ -95,7 +95,7 @@ public class CompletionSuggestionTests extends ESTestCase { .distinct() .limit(size) .collect(Collectors.toList()); - CompletionSuggestion reducedSuggestion = CompletionSuggestion.reduceTo(shardSuggestions); + CompletionSuggestion reducedSuggestion = (CompletionSuggestion) shardSuggestions.get(0).reduce(shardSuggestions); assertNotNull(reducedSuggestion); assertThat(reducedSuggestion.getOptions().size(), lessThanOrEqualTo(size)); assertEquals(expected, reducedSuggestion.getOptions());