diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 642748bd031..f9103f0cddc 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -46,6 +46,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.threadpool.ThreadPool; import java.util.List; @@ -74,7 +75,7 @@ abstract class AbstractSearchAsyncAction protected final AtomicArray firstResults; private volatile AtomicArray shardFailures; private final Object shardFailuresMutex = new Object(); - protected volatile ScoreDoc[] sortedShardList; + protected volatile ScoreDoc[] sortedShardDocs; protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, @@ -321,8 +322,11 @@ abstract class AbstractSearchAsyncAction // we only release search context that we did not fetch from if we are not scrolling if (request.scroll() == null) { for (AtomicArray.Entry entry : queryResults.asList()) { - final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs(); - if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches + QuerySearchResult queryResult = entry.value.queryResult().queryResult(); + final TopDocs topDocs = queryResult.topDocs(); + final Suggest suggest = queryResult.suggest(); + if (((topDocs != null && topDocs.scoreDocs.length > 0) // the shard had matches + ||suggest != null && suggest.hasScoreDocs()) // or had suggest docs && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs try { DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); @@ -343,12 +347,8 @@ abstract class AbstractSearchAsyncAction protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, ScoreDoc[] lastEmittedDocPerShard) { - if (lastEmittedDocPerShard != null) { - ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); - } else { - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value); - } + final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null; + return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); } protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java index e19540e26d5..8614d7b1188 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java @@ -118,8 +118,8 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction(listener) { @Override public void doRun() throws IOException { - sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, + sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults, queryFetchResults); String scrollId = null; if (request.scroll() != null) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index cf3f9716710..9d8305cf6b1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -135,18 +135,17 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = queryResults.get(entry.index); @@ -196,12 +195,10 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction(listener) { @Override public void doRun() throws IOException { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, + final boolean isScrollRequest = request.scroll() != null; + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults); - } + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index 5d55dd468a5..fad4d60275d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -60,14 +60,11 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction(listener) { @Override public void doRun() throws IOException { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, + final boolean isScrollRequest = request.scroll() != null; + sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults, firstResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults); - } + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index a6f9aa26f59..5f90d291dd2 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -68,18 +68,17 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction entry : docIdsToLoad.asList()) { QuerySearchResultProvider queryResult = firstResults.get(entry.index); @@ -129,12 +128,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction(listener) { @Override public void doRun() throws IOException { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, + final boolean isScrollRequest = request.scroll() != null; + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults); - } + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index 94ce1887c34..72154f224d2 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -168,8 +168,8 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { } private void innerFinishHim() throws Exception { - ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, + ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults, queryFetchResults); String scrollId = null; if (request.scroll() != null) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index ac8715eeb9f..d9f649a7a55 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -53,7 +53,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private volatile AtomicArray shardFailures; final AtomicArray queryResults; final AtomicArray fetchResults; - private volatile ScoreDoc[] sortedShardList; + private volatile ScoreDoc[] sortedShardDocs; private final AtomicInteger successfulOps; SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService, @@ -165,9 +165,9 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { } private void executeFetchPhase() throws Exception { - sortedShardList = searchPhaseController.sortDocs(true, queryResults); + sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); AtomicArray docIdsToLoad = new AtomicArray<>(queryResults.length()); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); if (docIdsToLoad.asList().isEmpty()) { finishHim(); @@ -175,7 +175,8 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { } - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length()); + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), + sortedShardDocs, queryResults.length()); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { IntArrayList docIds = entry.value; @@ -216,7 +217,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { } private void innerFinishHim() { - InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index bfcfcb9d4c8..4d618eb057a 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -21,6 +21,7 @@ package org.elasticsearch.search; import com.carrotsearch.hppc.ObjectFloatHashMap; import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -87,6 +88,8 @@ import org.elasticsearch.search.rescore.RescoreBuilder; import org.elasticsearch.search.searchafter.SearchAfterBuilder; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; @@ -94,6 +97,7 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -265,7 +269,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv loadOrExecuteQueryPhase(request, context); - if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) { + if (hasHits(context.queryResult()) == false && context.scrollContext() == null) { freeContext(context.id()); } else { contextProcessedSuccessfully(context); @@ -320,7 +324,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv operationListener.onPreQueryPhase(context); long time = System.nanoTime(); queryPhase.execute(context); - if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) { + if (hasHits(context.queryResult()) == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase freeContext(context.id()); } else { @@ -811,40 +815,55 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } } - private static final int[] EMPTY_DOC_IDS = new int[0]; - /** * Shortcut ids to load, we load only "from" and up to "size". The phase controller * handles this as well since the result is always size * shards for Q_A_F */ private void shortcutDocIdsToLoad(SearchContext context) { + final int[] docIdsToLoad; + int docsOffset = 0; + final Suggest suggest = context.queryResult().suggest(); + int numSuggestDocs = 0; + final List completionSuggestions; + if (suggest != null && suggest.hasScoreDocs()) { + completionSuggestions = suggest.filter(CompletionSuggestion.class); + for (CompletionSuggestion completionSuggestion : completionSuggestions) { + numSuggestDocs += completionSuggestion.getOptions().size(); + } + } else { + completionSuggestions = Collections.emptyList(); + } if (context.request().scroll() != null) { TopDocs topDocs = context.queryResult().topDocs(); - int[] docIdsToLoad = new int[topDocs.scoreDocs.length]; + docIdsToLoad = new int[topDocs.scoreDocs.length + numSuggestDocs]; for (int i = 0; i < topDocs.scoreDocs.length; i++) { - docIdsToLoad[i] = topDocs.scoreDocs[i].doc; + docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc; } - context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); } else { TopDocs topDocs = context.queryResult().topDocs(); if (topDocs.scoreDocs.length < context.from()) { // no more docs... - context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0); - return; - } - int totalSize = context.from() + context.size(); - int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())]; - int counter = 0; - for (int i = context.from(); i < totalSize; i++) { - if (i < topDocs.scoreDocs.length) { - docIdsToLoad[counter] = topDocs.scoreDocs[i].doc; - } else { - break; + docIdsToLoad = new int[numSuggestDocs]; + } else { + int totalSize = context.from() + context.size(); + docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size()) + + numSuggestDocs]; + for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i++) { + docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc; } - counter++; } - context.docIdsToLoad(docIdsToLoad, 0, counter); } + for (CompletionSuggestion completionSuggestion : completionSuggestions) { + for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) { + docIdsToLoad[docsOffset++] = option.getDoc().doc; + } + } + context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); + } + + private static boolean hasHits(final QuerySearchResult searchResult) { + return searchResult.topDocs().scoreDocs.length > 0 || + (searchResult.suggest() != null && searchResult.suggest().hasScoreDocs()); } private void processScroll(InternalScrollSearchRequest request, SearchContext context) { diff --git a/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java index b2ce044e4fc..97f3b191aa9 100644 --- a/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java @@ -30,7 +30,6 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldDocs; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.component.AbstractComponent; @@ -53,18 +52,22 @@ import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; -import org.elasticsearch.search.profile.query.QueryProfileShardResult; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.Suggest.Suggestion; +import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; +import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -154,6 +157,10 @@ public class SearchPhaseController extends AbstractComponent { } /** + * Returns a score doc array of top N search docs across all shards, followed by top suggest docs for each + * named completion suggestion across all shards. If more than one named completion suggestion is specified in the + * request, the suggest docs for a named suggestion are ordered by the suggestion name. + * * @param ignoreFrom Whether to ignore the from and sort all hits in each shard result. * Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase. * @param resultsArr Shard result holder @@ -191,19 +198,40 @@ public class SearchPhaseController extends AbstractComponent { offset = 0; } ScoreDoc[] scoreDocs = result.topDocs().scoreDocs; + ScoreDoc[] docs; + int numSuggestDocs = 0; + final Suggest suggest = result.queryResult().suggest(); + final List completionSuggestions; + if (suggest != null) { + completionSuggestions = suggest.filter(CompletionSuggestion.class); + for (CompletionSuggestion suggestion : completionSuggestions) { + numSuggestDocs += suggestion.getOptions().size(); + } + } else { + completionSuggestions = Collections.emptyList(); + } + int docsOffset = 0; if (scoreDocs.length == 0 || scoreDocs.length < offset) { - return EMPTY_DOCS; + docs = new ScoreDoc[numSuggestDocs]; + } else { + int resultDocsSize = result.size(); + if ((scoreDocs.length - offset) < resultDocsSize) { + resultDocsSize = scoreDocs.length - offset; + } + docs = new ScoreDoc[resultDocsSize + numSuggestDocs]; + for (int i = 0; i < resultDocsSize; i++) { + ScoreDoc scoreDoc = scoreDocs[offset + i]; + scoreDoc.shardIndex = shardIndex; + docs[i] = scoreDoc; + docsOffset++; + } } - - int resultDocsSize = result.size(); - if ((scoreDocs.length - offset) < resultDocsSize) { - resultDocsSize = scoreDocs.length - offset; - } - ScoreDoc[] docs = new ScoreDoc[resultDocsSize]; - for (int i = 0; i < resultDocsSize; i++) { - ScoreDoc scoreDoc = scoreDocs[offset + i]; - scoreDoc.shardIndex = shardIndex; - docs[i] = scoreDoc; + for (CompletionSuggestion suggestion: completionSuggestions) { + for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) { + ScoreDoc doc = option.getDoc(); + doc.shardIndex = shardIndex; + docs[docsOffset++] = doc; + } } return docs; } @@ -213,13 +241,7 @@ public class SearchPhaseController extends AbstractComponent { Arrays.sort(sortedResults, QUERY_RESULT_ORDERING); QuerySearchResultProvider firstResult = sortedResults[0].value; - int topN = firstResult.queryResult().size(); - if (firstResult.includeFetch()) { - // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them... - // this is also important since we shortcut and fetch only docs from "from" and up to "size" - topN *= sortedResults.length; - } - + int topN = topN(results); int from = firstResult.queryResult().from(); if (ignoreFrom) { from = 0; @@ -258,40 +280,86 @@ public class SearchPhaseController extends AbstractComponent { } mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs); } - return mergedTopDocs.scoreDocs; - } - public ScoreDoc[] getLastEmittedDocPerShard(SearchRequest request, ScoreDoc[] sortedShardList, int numShards) { - if (request.scroll() != null) { - return getLastEmittedDocPerShard(sortedShardList, numShards); - } else { - return null; + ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs; + final Map>> groupedCompletionSuggestions = new HashMap<>(); + // group suggestions and assign shard index + for (AtomicArray.Entry sortedResult : sortedResults) { + Suggest shardSuggest = sortedResult.value.queryResult().suggest(); + if (shardSuggest != null) { + for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { + suggestion.setShardIndex(sortedResult.index); + List> suggestions = + groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); + suggestions.add(suggestion); + } + } } + if (groupedCompletionSuggestions.isEmpty() == false) { + int numSuggestDocs = 0; + List>> completionSuggestions = + new ArrayList<>(groupedCompletionSuggestions.size()); + for (List> groupedSuggestions : groupedCompletionSuggestions.values()) { + final CompletionSuggestion completionSuggestion = CompletionSuggestion.reduceTo(groupedSuggestions); + assert completionSuggestion != null; + numSuggestDocs += completionSuggestion.getOptions().size(); + completionSuggestions.add(completionSuggestion); + } + scoreDocs = new ScoreDoc[mergedTopDocs.scoreDocs.length + numSuggestDocs]; + System.arraycopy(mergedTopDocs.scoreDocs, 0, scoreDocs, 0, mergedTopDocs.scoreDocs.length); + int offset = mergedTopDocs.scoreDocs.length; + Suggest suggestions = new Suggest(completionSuggestions); + for (CompletionSuggestion completionSuggestion : suggestions.filter(CompletionSuggestion.class)) { + for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) { + scoreDocs[offset++] = option.getDoc(); + } + } + } + return scoreDocs; } - public ScoreDoc[] getLastEmittedDocPerShard(ScoreDoc[] sortedShardList, int numShards) { + public ScoreDoc[] getLastEmittedDocPerShard(List> queryResults, + ScoreDoc[] sortedScoreDocs, int numShards) { ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; - for (ScoreDoc scoreDoc : sortedShardList) { - lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc; + if (queryResults.isEmpty() == false) { + long fetchHits = 0; + for (AtomicArray.Entry queryResult : queryResults) { + fetchHits += queryResult.value.queryResult().topDocs().scoreDocs.length; + } + // from is always zero as when we use scroll, we ignore from + long size = Math.min(fetchHits, topN(queryResults)); + for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) { + ScoreDoc scoreDoc = sortedScoreDocs[sortedDocsIndex]; + lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc; + } } return lastEmittedDocPerShard; + } /** * Builds an array, with potential null elements, with docs to load. */ - public void fillDocIdsToLoad(AtomicArray docsIdsToLoad, ScoreDoc[] shardDocs) { + public void fillDocIdsToLoad(AtomicArray docIdsToLoad, ScoreDoc[] shardDocs) { for (ScoreDoc shardDoc : shardDocs) { - IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex); - if (list == null) { - list = new IntArrayList(); // can't be shared!, uses unsafe on it later on - docsIdsToLoad.set(shardDoc.shardIndex, list); + IntArrayList shardDocIdsToLoad = docIdsToLoad.get(shardDoc.shardIndex); + if (shardDocIdsToLoad == null) { + shardDocIdsToLoad = new IntArrayList(); // can't be shared!, uses unsafe on it later on + docIdsToLoad.set(shardDoc.shardIndex, shardDocIdsToLoad); } - list.add(shardDoc.doc); + shardDocIdsToLoad.add(shardDoc.doc); } } - public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray queryResultsArr, + /** + * Enriches search hits and completion suggestion hits from sortedDocs using fetchResultsArr, + * merges suggestions, aggregations and profile results + * + * Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named + * completion suggestion ordered by suggestion name + */ + public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, + AtomicArray queryResultsArr, AtomicArray fetchResultsArr) { List> queryResults = queryResultsArr.asList(); @@ -317,6 +385,7 @@ public class SearchPhaseController extends AbstractComponent { // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them)) long totalHits = 0; + long fetchHits = 0; float maxScore = Float.NEGATIVE_INFINITY; boolean timedOut = false; Boolean terminatedEarly = null; @@ -333,6 +402,7 @@ public class SearchPhaseController extends AbstractComponent { } } totalHits += result.topDocs().totalHits; + fetchHits += result.topDocs().scoreDocs.length; if (!Float.isNaN(result.topDocs().getMaxScore())) { maxScore = Math.max(maxScore, result.topDocs().getMaxScore()); } @@ -345,11 +415,13 @@ public class SearchPhaseController extends AbstractComponent { for (AtomicArray.Entry entry : fetchResults) { entry.value.fetchResult().initCounter(); } - + int from = ignoreFrom ? 0 : firstResult.queryResult().from(); + int numSearchHits = (int) Math.min(fetchHits - from, topN(queryResults)); // merge hits List hits = new ArrayList<>(); if (!fetchResults.isEmpty()) { - for (ScoreDoc shardDoc : sortedDocs) { + for (int i = 0; i < numSearchHits; i++) { + ScoreDoc shardDoc = sortedDocs[i]; FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); if (fetchResultProvider == null) { continue; @@ -360,7 +432,6 @@ public class SearchPhaseController extends AbstractComponent { InternalSearchHit searchHit = fetchResult.hits().internalHits()[index]; searchHit.score(shardDoc.score); searchHit.shard(fetchResult.shardTarget()); - if (sorted) { FieldDoc fieldDoc = (FieldDoc) shardDoc; searchHit.sortValues(fieldDoc.fields, firstResult.sortValueFormats()); @@ -368,7 +439,6 @@ public class SearchPhaseController extends AbstractComponent { searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue()); } } - hits.add(searchHit); } } @@ -376,38 +446,72 @@ public class SearchPhaseController extends AbstractComponent { // merge suggest results Suggest suggest = null; - if (!queryResults.isEmpty()) { - final Map> groupedSuggestions = new HashMap<>(); - boolean hasSuggestions = false; - for (AtomicArray.Entry entry : queryResults) { - Suggest shardResult = entry.value.queryResult().queryResult().suggest(); - - if (shardResult == null) { - continue; + if (firstResult.suggest() != null) { + final Map> groupedSuggestions = new HashMap<>(); + for (AtomicArray.Entry queryResult : queryResults) { + Suggest shardSuggest = queryResult.value.queryResult().suggest(); + if (shardSuggest != null) { + for (Suggestion> suggestion : shardSuggest) { + List suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); + suggestionList.add(suggestion); + } + } + } + if (groupedSuggestions.isEmpty() == false) { + suggest = new Suggest(Suggest.reduce(groupedSuggestions)); + if (!fetchResults.isEmpty()) { + int currentOffset = numSearchHits; + for (CompletionSuggestion suggestion : suggest.filter(CompletionSuggestion.class)) { + final List suggestionOptions = suggestion.getOptions(); + for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) { + ScoreDoc shardDoc = sortedDocs[scoreDocIndex]; + FetchSearchResultProvider fetchSearchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); + if (fetchSearchResultProvider == null) { + continue; + } + FetchSearchResult fetchResult = fetchSearchResultProvider.fetchResult(); + int fetchResultIndex = fetchResult.counterGetAndIncrement(); + if (fetchResultIndex < fetchResult.hits().internalHits().length) { + InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex]; + CompletionSuggestion.Entry.Option suggestOption = + suggestionOptions.get(scoreDocIndex - currentOffset); + hit.score(shardDoc.score); + hit.shard(fetchResult.shardTarget()); + suggestOption.setHit(hit); + } + } + currentOffset += suggestionOptions.size(); + } + assert currentOffset == sortedDocs.length : "expected no more score doc slices"; } - hasSuggestions = true; - Suggest.group(groupedSuggestions, shardResult); } - - suggest = hasSuggestions ? new Suggest(Suggest.reduce(groupedSuggestions)) : null; } - // merge addAggregation + // merge Aggregation InternalAggregations aggregations = null; - if (!queryResults.isEmpty()) { - if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) { - List aggregationsList = new ArrayList<>(queryResults.size()); - for (AtomicArray.Entry entry : queryResults) { - aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); + if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) { + List aggregationsList = new ArrayList<>(queryResults.size()); + for (AtomicArray.Entry entry : queryResults) { + aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); + } + ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state()); + aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); + List pipelineAggregators = firstResult.pipelineAggregators(); + if (pipelineAggregators != null) { + List newAggs = StreamSupport.stream(aggregations.spliterator(), false) + .map((p) -> (InternalAggregation) p) + .collect(Collectors.toList()); + for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { + InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); + newAggs.add(newAgg); } - ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state()); - aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); + aggregations = new InternalAggregations(newAggs); } } //Collect profile results SearchProfileShardResults shardResults = null; - if (!queryResults.isEmpty() && firstResult.profileResults() != null) { + if (firstResult.profileResults() != null) { Map profileResults = new HashMap<>(queryResults.size()); for (AtomicArray.Entry entry : queryResults) { String key = entry.value.queryResult().shardTarget().toString(); @@ -416,24 +520,22 @@ public class SearchPhaseController extends AbstractComponent { shardResults = new SearchProfileShardResults(profileResults); } - if (aggregations != null) { - List pipelineAggregators = firstResult.pipelineAggregators(); - if (pipelineAggregators != null) { - List newAggs = StreamSupport.stream(aggregations.spliterator(), false).map((p) -> { - return (InternalAggregation) p; - }).collect(Collectors.toList()); - for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { - ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state()); - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); - newAggs.add(newAgg); - } - aggregations = new InternalAggregations(newAggs); - } - } - InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore); return new InternalSearchResponse(searchHits, aggregations, suggest, shardResults, timedOut, terminatedEarly); } + /** + * returns the number of top results to be considered across all shards + */ + private static int topN(List> queryResults) { + QuerySearchResultProvider firstResult = queryResults.get(0).value; + int topN = firstResult.queryResult().size(); + if (firstResult.includeFetch()) { + // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them... + // this is also important since we shortcut and fetch only docs from "from" and up to "size" + topN *= queryResults.size(); + } + return topN; + } } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java index d908aca0fc8..f6738f99725 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java @@ -39,10 +39,7 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice private OriginalIndices originalIndices; public ShardFetchSearchRequest() { - } - public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list) { - this(request, id, list, null); } public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) { diff --git a/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java index 59225f93a61..17f5e5ac705 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java @@ -43,7 +43,9 @@ public final class MatchedQueriesFetchSubPhase implements FetchSubPhase { @Override public void hitsExecute(SearchContext context, InternalSearchHit[] hits) { - if (hits.length == 0) { + if (hits.length == 0 || + // in case the request has only suggest, parsed query is null + context.parsedQuery() == null) { return; } hits = hits.clone(); // don't modify the incoming hits diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java index 191537b4de5..e1d46dd5fd2 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java @@ -415,8 +415,8 @@ public class InternalSearchHit implements SearchHit { static final String INNER_HITS = "inner_hits"; } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // public because we render hit as part of completion suggestion option + public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) throws IOException { List metaFields = new ArrayList<>(); List otherFields = new ArrayList<>(); if (fields != null && !fields.isEmpty()) { @@ -432,7 +432,6 @@ public class InternalSearchHit implements SearchHit { } } - builder.startObject(); // For inner_hit hits shard is null and that is ok, because the parent search hit has all this information. // Even if this was included in the inner_hit hits this would be the same, so better leave it out. if (explanation() != null && shard != null) { @@ -516,7 +515,6 @@ public class InternalSearchHit implements SearchHit { } builder.endObject(); } - builder.endObject(); return builder; } @@ -533,6 +531,15 @@ public class InternalSearchHit implements SearchHit { builder.endArray(); } builder.endObject(); + + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + toInnerXContent(builder, params); + builder.endObject(); + return builder; } public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java index f8fbdaf969e..95612693f8b 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Top level suggest result, containing the result for each suggestion. @@ -48,18 +49,16 @@ public class Suggest implements Iterable COMPARATOR = new Comparator() { - @Override - public int compare(Option first, Option second) { - int cmp = Float.compare(second.getScore(), first.getScore()); - if (cmp != 0) { - return cmp; - } - return first.getText().compareTo(second.getText()); - } - }; + public static final Comparator