Remove unnecessary result sorting in SearchPhaseController (#23321)

In oder to use lucene's utilities to merge top docs the results
need to be passed in a dense array where the index corresponds to the shard index in
the result list. Yet, we were sorting results before merging them just to order them
in the incoming order again for the above mentioned reason. This change removes the
obsolet sort and prevents unnecessary materializing of results.
This commit is contained in:
Simon Willnauer 2017-02-23 13:48:54 +01:00 committed by GitHub
parent 771fd1f4ea
commit 2f3f9b9961
2 changed files with 53 additions and 53 deletions

View File

@ -71,14 +71,6 @@ import java.util.stream.StreamSupport;
public class SearchPhaseController extends AbstractComponent {
private static final Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = (o1, o2) -> {
int i = o1.value.shardTarget().getIndex().compareTo(o2.value.shardTarget().getIndex());
if (i == 0) {
i = o1.value.shardTarget().getShardId().id() - o2.value.shardTarget().getShardId().id();
}
return i;
};
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
private final BigArrays bigArrays;
@ -150,6 +142,9 @@ public class SearchPhaseController extends AbstractComponent {
* named completion suggestion across all shards. If more than one named completion suggestion is specified in the
* request, the suggest docs for a named suggestion are ordered by the suggestion name.
*
* Note: The order of the sorted score docs depends on the shard index in the result array if the merge process needs to disambiguate
* the result. In oder to obtain stable results the shard index (index of the result in the result array) must be the same.
*
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param resultsArr Shard result holder
@ -160,26 +155,31 @@ public class SearchPhaseController extends AbstractComponent {
return EMPTY_DOCS;
}
final QuerySearchResult result;
boolean canOptimize = false;
QuerySearchResult result = null;
int shardIndex = -1;
if (results.size() == 1) {
canOptimize = true;
result = results.get(0).value.queryResult();
shardIndex = results.get(0).index;
} else {
boolean hasResult = false;
QuerySearchResult resultToOptimize = null;
// lets see if we only got hits from a single shard, if so, we can optimize...
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
if (entry.value.queryResult().hasHits()) {
if (result != null) { // we already have one, can't really optimize
if (hasResult) { // we already have one, can't really optimize
canOptimize = false;
break;
}
canOptimize = true;
result = entry.value.queryResult();
hasResult = true;
resultToOptimize = entry.value.queryResult();
shardIndex = entry.index;
}
}
result = canOptimize ? resultToOptimize : results.get(0).value.queryResult();
assert result != null;
}
if (canOptimize) {
int offset = result.from();
@ -225,74 +225,62 @@ public class SearchPhaseController extends AbstractComponent {
return docs;
}
@SuppressWarnings("unchecked")
AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
QuerySearchResultProvider firstResult = sortedResults[0].value;
int topN = firstResult.queryResult().size();
int from = firstResult.queryResult().from();
if (ignoreFrom) {
from = 0;
}
final int topN = result.queryResult().size();
final int from = ignoreFrom ? 0 : result.queryResult().from();
final TopDocs mergedTopDocs;
int numShards = resultsArr.length();
if (firstResult.queryResult().topDocs() instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) firstResult.queryResult().topDocs();
final int numShards = resultsArr.length();
if (result.queryResult().topDocs() instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) result.queryResult().topDocs();
final Sort sort = new Sort(firstTopDocs.fields);
final CollapseTopFieldDocs[] shardTopDocs = new CollapseTopFieldDocs[numShards];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
final CollapseTopFieldDocs empty = new CollapseTopFieldDocs(firstTopDocs.field, 0, new FieldDoc[0],
sort.getSort(), new Object[0], Float.NaN);
Arrays.fill(shardTopDocs, empty);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = (CollapseTopFieldDocs) topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = new CollapseTopFieldDocs(firstTopDocs.field, 0, new FieldDoc[0],
sort.getSort(), new Object[0], Float.NaN);
}
}
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs);
} else if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) {
TopFieldDocs firstTopDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
} else if (result.queryResult().topDocs() instanceof TopFieldDocs) {
TopFieldDocs firstTopDocs = (TopFieldDocs) result.queryResult().topDocs();
final Sort sort = new Sort(firstTopDocs.fields);
final TopFieldDocs[] shardTopDocs = new TopFieldDocs[resultsArr.length()];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
final TopFieldDocs empty = new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN);
Arrays.fill(shardTopDocs, empty);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = (TopFieldDocs) topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN);
}
}
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
} else {
final TopDocs[] shardTopDocs = new TopDocs[resultsArr.length()];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
Arrays.fill(shardTopDocs, Lucene.EMPTY_TOP_DOCS);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = Lucene.EMPTY_TOP_DOCS;
}
}
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs);
}
ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs;
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
// group suggestions and assign shard index
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
Suggest shardSuggest = sortedResult.value.queryResult().suggest();
if (shardSuggest != null) {
for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {

View File

@ -73,7 +73,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize, false);
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results);
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
@ -83,6 +83,18 @@ public class SearchPhaseControllerTests extends ESTestCase {
assertThat(sortedDocs.length, equalTo(accumulatedLength));
}
public void testSortIsIdempotent() throws IOException {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize,
randomBoolean() || true);
boolean ignoreFrom = randomBoolean();
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results);
ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results);
assertArrayEquals(sortedDocs, sortedDocs2);
}
public void testMerge() throws IOException {
List<CompletionSuggestion> suggestions = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
@ -90,7 +102,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize);
AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);
// calculate offsets and score doc array
List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
@ -127,7 +139,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
private AtomicArray<QuerySearchResultProvider> generateQueryResults(int nShards,
List<CompletionSuggestion> suggestions,
int searchHitsSize) {
int searchHitsSize, boolean useConstantScore) {
AtomicArray<QuerySearchResultProvider> queryResults = new AtomicArray<>(nShards);
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex,
@ -138,7 +150,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
ScoreDoc[] scoreDocs = new ScoreDoc[nDocs];
float maxScore = 0F;
for (int i = 0; i < nDocs; i++) {
float score = Math.abs(randomFloat());
float score = useConstantScore ? 1.0F : Math.abs(randomFloat());
scoreDocs[i] = new ScoreDoc(i, score);
if (score > maxScore) {
maxScore = score;