Cut over to Lucene's TopDocs#merge for shard topdocs sorting.

Closes #6197
This commit is contained in:
Martijn van Groningen 2014-05-06 13:51:52 +07:00
parent 1b996defcf
commit 16e5cdf8d0
11 changed files with 52 additions and 370 deletions

View File

@ -133,7 +133,8 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
}
void innerFinishHim() throws Exception {
sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, queryFetchResults);
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {

View File

@ -140,7 +140,8 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
}
void innerExecuteFetchPhase() throws Exception {
sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, queryResults);
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {

View File

@ -85,7 +85,8 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
}
private void innerFinishHim() throws IOException {
sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, firstResults);
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
String scrollId = null;
if (request.scroll() != null) {

View File

@ -82,7 +82,8 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
@Override
protected void moveToSecondPhase() throws Exception {
sortedShardList = searchPhaseController.sortDocs(request, useSlowScroll, firstResults);
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {

View File

@ -198,12 +198,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
}
private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardList;
if (useSlowScroll) {
sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
} else {
sortedShardList = searchPhaseController.sortDocsForScroll(queryFetchResults);
}
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(!useSlowScroll, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {

View File

@ -199,11 +199,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
private void executeFetchPhase() throws Exception {
if (useSlowScroll) {
sortedShardList = searchPhaseController.sortDocs(queryResults);
} else {
sortedShardList = searchPhaseController.sortDocsForScroll(queryResults);
}
sortedShardList = searchPhaseController.sortDocs(!useSlowScroll, queryResults);
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);

View File

@ -56,6 +56,8 @@ public class Lucene {
public static final ScoreDoc[] EMPTY_SCORE_DOCS = new ScoreDoc[0];
public static final TopDocs EMPTY_TOP_DOCS = new TopDocs(0, EMPTY_SCORE_DOCS, 0.0f);
@SuppressWarnings("deprecation")
public static Version parseVersion(@Nullable String version, Version defaultVersion, ESLogger logger) {
if (version == null) {

View File

@ -1,45 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.controller;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.util.PriorityQueue;
/**
* <p>Same as lucene {@link org.apache.lucene.search.HitQueue}.
*/
public class ScoreDocQueue extends PriorityQueue<ScoreDoc> {
public ScoreDocQueue(int size) {
super(size);
}
protected final boolean lessThan(ScoreDoc hitA, ScoreDoc hitB) {
if (hitA.score == hitB.score) {
int c = hitA.shardIndex - hitB.shardIndex;
if (c == 0) {
return hitA.doc > hitB.doc;
}
return c > 0;
} else {
return hitA.score < hitB.score;
}
}
}

View File

@ -24,12 +24,12 @@ import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import com.google.common.collect.Lists;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
@ -139,15 +139,11 @@ public class SearchPhaseController extends AbstractComponent {
return Math.min(left, right) == -1 ? -1 : left + right;
}
public ScoreDoc[] sortDocs(SearchRequest request, boolean useClassicSort, AtomicArray<? extends QuerySearchResultProvider> firstResults) throws IOException {
if (!useClassicSort && request.scroll() != null) {
return sortDocsForScroll(firstResults);
} else {
return sortDocs(firstResults);
}
}
public ScoreDoc[] sortDocsForScroll(AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
/**
* @param scrollSort Whether to ignore the from and sort all hits in each shard result. Only used for scroll search
* @param resultsArr Shard result holder
*/
public ScoreDoc[] sortDocs(boolean scrollSort, AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException {
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
if (results.isEmpty()) {
return EMPTY_DOCS;
@ -217,13 +213,17 @@ public class SearchPhaseController extends AbstractComponent {
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = topDocs;
}
// TopDocs#merge can't deal with empty shard TopDocs
int from = firstResult.queryResult().from();
if (scrollSort) {
from = 0;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; i++) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = new TopDocs(0, EMPTY_DOCS, 0.0f);
shardTopDocs[i] = Lucene.EMPTY_TOP_DOCS;
}
}
TopDocs mergedTopDocs = TopDocs.merge(sort, topN, shardTopDocs);
TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
return mergedTopDocs.scoreDocs;
}
@ -243,165 +243,6 @@ public class SearchPhaseController extends AbstractComponent {
return lastEmittedDocPerShard;
}
public ScoreDoc[] sortDocs(AtomicArray<? extends QuerySearchResultProvider> resultsArr) {
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
if (results.isEmpty()) {
return EMPTY_DOCS;
}
if (optimizeSingleShard) {
boolean canOptimize = false;
QuerySearchResult result = null;
int shardIndex = -1;
if (results.size() == 1) {
canOptimize = true;
result = results.get(0).value.queryResult();
shardIndex = results.get(0).index;
} else {
// lets see if we only got hits from a single shard, if so, we can optimize...
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
if (entry.value.queryResult().topDocs().scoreDocs.length > 0) {
if (result != null) { // we already have one, can't really optimize
canOptimize = false;
break;
}
canOptimize = true;
result = entry.value.queryResult();
shardIndex = entry.index;
}
}
}
if (canOptimize) {
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
if (scoreDocs.length < result.from()) {
return EMPTY_DOCS;
}
int resultDocsSize = result.size();
if ((scoreDocs.length - result.from()) < resultDocsSize) {
resultDocsSize = scoreDocs.length - result.from();
}
int offset = result.from();
if (result.topDocs() instanceof TopFieldDocs) {
ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
for (int i = 0; i < resultDocsSize; i++) {
ScoreDoc scoreDoc = scoreDocs[offset + i];
scoreDoc.shardIndex = shardIndex;
docs[i] = scoreDoc;
}
return docs;
} else {
ScoreDoc[] docs = new ScoreDoc[resultDocsSize];
for (int i = 0; i < resultDocsSize; i++) {
ScoreDoc scoreDoc = scoreDocs[offset + i];
scoreDoc.shardIndex = shardIndex;
docs[i] = scoreDoc;
}
return docs;
}
}
}
@SuppressWarnings("unchecked")
AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
QuerySearchResultProvider firstResult = sortedResults[0].value;
int totalNumDocs = 0;
int queueSize = firstResult.queryResult().from() + firstResult.queryResult().size();
if (firstResult.includeFetch()) {
// if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
// this is also important since we shortcut and fetch only docs from "from" and up to "size"
queueSize *= sortedResults.length;
}
// we don't use TopDocs#merge here because with TopDocs#merge, when pagination, we need to ask for "from + size" topN
// hits, which ends up creating a "from + size" ScoreDoc[], while in our implementation, we can actually get away with
// just create "size" ScoreDoc (the reverse order in the queue). would be nice to improve TopDocs#merge to allow for
// it in which case we won't need this logic...
PriorityQueue queue;
if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) {
// sorting, first if the type is a String, chance CUSTOM to STRING so we handle nulls properly (since our CUSTOM String sorting might return null)
TopFieldDocs fieldDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
for (int i = 0; i < fieldDocs.fields.length; i++) {
boolean allValuesAreNull = true;
boolean resolvedField = false;
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
for (ScoreDoc doc : entry.value.queryResult().topDocs().scoreDocs) {
FieldDoc fDoc = (FieldDoc) doc;
if (fDoc.fields[i] != null) {
allValuesAreNull = false;
if (fDoc.fields[i] instanceof String) {
fieldDocs.fields[i] = new SortField(fieldDocs.fields[i].getField(), SortField.Type.STRING, fieldDocs.fields[i].getReverse());
}
resolvedField = true;
break;
}
}
if (resolvedField) {
break;
}
}
if (!resolvedField && allValuesAreNull && fieldDocs.fields[i].getField() != null) {
// we did not manage to resolve a field (and its not score or doc, which have no field), and all the fields are null (which can only happen for STRING), make it a STRING
fieldDocs.fields[i] = new SortField(fieldDocs.fields[i].getField(), SortField.Type.STRING, fieldDocs.fields[i].getReverse());
}
}
queue = new ShardFieldDocSortedHitQueue(fieldDocs.fields, queueSize);
// we need to accumulate for all and then filter the from
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
QuerySearchResult result = entry.value.queryResult();
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
totalNumDocs += scoreDocs.length;
for (ScoreDoc doc : scoreDocs) {
doc.shardIndex = entry.index;
if (queue.insertWithOverflow(doc) == doc) {
// filled the queue, break
break;
}
}
}
} else {
queue = new ScoreDocQueue(queueSize); // we need to accumulate for all and then filter the from
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
QuerySearchResult result = entry.value.queryResult();
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
totalNumDocs += scoreDocs.length;
for (ScoreDoc doc : scoreDocs) {
doc.shardIndex = entry.index;
if (queue.insertWithOverflow(doc) == doc) {
// filled the queue, break
break;
}
}
}
}
int resultDocsSize = firstResult.queryResult().size();
if (firstResult.includeFetch()) {
// if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
resultDocsSize *= sortedResults.length;
}
if (totalNumDocs < queueSize) {
resultDocsSize = totalNumDocs - firstResult.queryResult().from();
}
if (resultDocsSize <= 0) {
return EMPTY_DOCS;
}
// we only pop the first, this handles "from" nicely since the "from" are down the queue
// that we already fetched, so we are actually popping the "from" and up to "size"
ScoreDoc[] shardDocs = new ScoreDoc[resultDocsSize];
for (int i = resultDocsSize - 1; i >= 0; i--) { // put docs in array
shardDocs[i] = (ScoreDoc) queue.pop();
}
return shardDocs;
}
/**
* Builds an array, with potential null elements, with docs to load.
*/

View File

@ -1,137 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.controller;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.ElasticsearchIllegalStateException;
import java.io.IOException;
/**
*
*/
// LUCENE TRACK, Had to copy over in order ot improve same order tie break to take shards into account
public class ShardFieldDocSortedHitQueue extends PriorityQueue<FieldDoc> {
volatile SortField[] fields = null;
// used in the case where the fields are sorted by locale
// based strings
//volatile Collator[] collators = null;
FieldComparator[] comparators = null;
/**
* Creates a hit queue sorted by the given list of fields.
*
* @param fields Fieldable names, in priority order (highest priority first).
* @param size The number of hits to retain. Must be greater than zero.
*/
public ShardFieldDocSortedHitQueue(SortField[] fields, int size) {
super(size);
setFields(fields);
}
/**
* Allows redefinition of sort fields if they are <code>null</code>.
* This is to handle the case using ParallelMultiSearcher where the
* original list contains AUTO and we don't know the actual sort
* type until the values come back. The fields can only be set once.
* This method should be synchronized external like all other PQ methods.
*
* @param fields
*/
public void setFields(SortField[] fields) {
this.fields = fields;
//this.collators = hasCollators(fields);
try {
comparators = new FieldComparator[fields.length];
for (int fieldIDX = 0; fieldIDX < fields.length; fieldIDX++) {
comparators[fieldIDX] = fields[fieldIDX].getComparator(1, fieldIDX);
}
} catch (IOException e) {
throw new ElasticsearchIllegalStateException("failed to get comparator", e);
}
}
/**
* Returns the fields being used to sort.
*/
SortField[] getFields() {
return fields;
}
/**
* Returns whether <code>a</code> is less relevant than <code>b</code>.
*
* @param docA ScoreDoc
* @param docB ScoreDoc
* @return <code>true</code> if document <code>a</code> should be sorted after document <code>b</code>.
*/
@SuppressWarnings("unchecked")
@Override
protected final boolean lessThan(final FieldDoc docA, final FieldDoc docB) {
final int n = fields.length;
int c = 0;
for (int i = 0; i < n && c == 0; ++i) {
final SortField.Type type = fields[i].getType();
if (type == SortField.Type.STRING) {
final BytesRef s1 = (BytesRef) docA.fields[i];
final BytesRef s2 = (BytesRef) docB.fields[i];
// null values need to be sorted first, because of how FieldCache.getStringIndex()
// works - in that routine, any documents without a value in the given field are
// put first. If both are null, the next SortField is used
if (s1 == null) {
c = (s2 == null) ? 0 : -1;
} else if (s2 == null) {
c = 1;
} else { //if (fields[i].getLocale() == null) {
c = s1.compareTo(s2);
}
// } else {
// c = collators[i].compare(s1, s2);
// }
} else {
c = comparators[i].compareValues(docA.fields[i], docB.fields[i]);
}
// reverse sort
if (fields[i].getReverse()) {
c = -c;
}
}
// avoid random sort order that could lead to duplicates (bug #31241):
if (c == 0) {
// CHANGE: Add shard base tie breaking
c = docA.shardIndex - docB.shardIndex;
if (c == 0) {
return docA.doc > docB.doc;
}
}
return c > 0;
}
}

View File

@ -41,6 +41,9 @@ import org.elasticsearch.search.rescore.RescoreBuilder.QueryRescorer;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.Arrays;
import java.util.Comparator;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
@ -192,6 +195,23 @@ public class QueryRescorerTests extends ElasticsearchIntegrationTest {
assertThirdHit(searchResponse, hasId("3"));
}
// Comparator that sorts hits and rescored hits in the same way.
// The rescore uses the docId as tie, while regular search uses the slot the hit is in as a tie if score
// and shard id are equal during merging shard results.
// This comparator uses a custom tie in case the scores are equal, so that both regular hits and rescored hits
// are sorted equally. This is fine since tests only care about the fact the scores should be equal, not ordering.
private final static Comparator<SearchHit> searchHitsComparator = new Comparator<SearchHit>() {
@Override
public int compare(SearchHit hit1, SearchHit hit2) {
int cmp = Float.compare(hit2.getScore(), hit1.getScore());
if (cmp == 0) {
return hit1.id().compareTo(hit2.id());
} else {
return cmp;
}
}
};
private static void assertEquivalent(String query, SearchResponse plain, SearchResponse rescored) {
assertNoFailures(plain);
assertNoFailures(rescored);
@ -201,6 +221,8 @@ public class QueryRescorerTests extends ElasticsearchIntegrationTest {
assertThat(leftHits.getHits().length, equalTo(rightHits.getHits().length));
SearchHit[] hits = leftHits.getHits();
SearchHit[] rHits = rightHits.getHits();
Arrays.sort(hits, searchHitsComparator);
Arrays.sort(rHits, searchHitsComparator);
for (int i = 0; i < hits.length; i++) {
assertThat("query: " + query, hits[i].getScore(), equalTo(rHits[i].getScore()));
}
@ -213,6 +235,8 @@ public class QueryRescorerTests extends ElasticsearchIntegrationTest {
}
private static void assertEquivalentOrSubstringMatch(String query, SearchResponse plain, SearchResponse rescored) {
assertNoFailures(plain);
assertNoFailures(rescored);
SearchHits leftHits = plain.getHits();
SearchHits rightHits = rescored.getHits();
assertThat(leftHits.getTotalHits(), equalTo(rightHits.getTotalHits()));
@ -222,6 +246,8 @@ public class QueryRescorerTests extends ElasticsearchIntegrationTest {
if (!hits[0].getId().equals(otherHits[0].getId())) {
assertThat(((String) otherHits[0].sourceAsMap().get("field1")).contains(query), equalTo(true));
} else {
Arrays.sort(hits, searchHitsComparator);
Arrays.sort(otherHits, searchHitsComparator);
for (int i = 0; i < hits.length; i++) {
if (hits[i].getScore() == hits[hits.length-1].getScore()) {
return; // we need to cut off here since this is the tail of the queue and we might not have fetched enough docs
@ -239,7 +265,7 @@ public class QueryRescorerTests extends ElasticsearchIntegrationTest {
final int iters = scaledRandomIntBetween(50, 100);
for (int i = 0; i < iters; i++) {
int resultSize = between(5, 30);
int resultSize = numDocs;
int rescoreWindow = between(1, 3) * resultSize;
String intToEnglish = English.intToEnglish(between(0, numDocs-1));
String query = intToEnglish.split(" ")[0];