don't use guava ordering sorted copy, since it does 2 copies (instead of 1)
simply use regular toArray and sort on it (we do need to copy it)
This commit is contained in:
parent
69c9a2dca1
commit
da3b4a3bf0
|
@ -100,9 +100,7 @@ public class AtomicArray<E> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Copies the content of the underlying atomic array to a normal one. If the supplied array is too small a new one will be allocated.
|
||||
* If the supplied array's length is longer than needed, the element in the array immediately following the end of the collection is set to
|
||||
* <tt>null</tt>. All in similar fashion to {@link ArrayList#toArray}
|
||||
* Copies the content of the underlying atomic array to a normal one.
|
||||
*/
|
||||
public E[] toArray(E[] a) {
|
||||
if (a.length != array.length()) {
|
||||
|
|
|
@ -20,12 +20,10 @@
|
|||
package org.elasticsearch.search.controller;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.util.PriorityQueue;
|
||||
import org.elasticsearch.cache.recycler.CacheRecycler;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.XMaps;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -46,19 +44,16 @@ import org.elasticsearch.search.query.QuerySearchResult;
|
|||
import org.elasticsearch.search.query.QuerySearchResultProvider;
|
||||
import org.elasticsearch.search.suggest.Suggest;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class SearchPhaseController extends AbstractComponent {
|
||||
|
||||
public static Ordering<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = new Ordering<AtomicArray.Entry<? extends QuerySearchResultProvider>>() {
|
||||
public static Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = new Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>>() {
|
||||
@Override
|
||||
public int compare(@Nullable AtomicArray.Entry<? extends QuerySearchResultProvider> o1, @Nullable AtomicArray.Entry<? extends QuerySearchResultProvider> o2) {
|
||||
public int compare(AtomicArray.Entry<? extends QuerySearchResultProvider> o1, AtomicArray.Entry<? extends QuerySearchResultProvider> o2) {
|
||||
int i = o1.value.shardTarget().index().compareTo(o2.value.shardTarget().index());
|
||||
if (i == 0) {
|
||||
i = o1.value.shardTarget().shardId() - o2.value.shardTarget().shardId();
|
||||
|
@ -130,8 +125,9 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
return Math.min(left, right) == -1 ? -1 : left + right;
|
||||
}
|
||||
|
||||
public ScoreDoc[] sortDocs(AtomicArray<? extends QuerySearchResultProvider> results1) {
|
||||
if (results1.asList().isEmpty()) {
|
||||
public ScoreDoc[] sortDocs(AtomicArray<? extends QuerySearchResultProvider> resultsArr) {
|
||||
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
|
||||
if (results.isEmpty()) {
|
||||
return EMPTY_DOCS;
|
||||
}
|
||||
|
||||
|
@ -139,13 +135,13 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
boolean canOptimize = false;
|
||||
QuerySearchResult result = null;
|
||||
int shardIndex = -1;
|
||||
if (results1.asList().size() == 1) {
|
||||
if (results.size() == 1) {
|
||||
canOptimize = true;
|
||||
result = results1.asList().get(0).value.queryResult();
|
||||
shardIndex = results1.asList().get(0).index;
|
||||
result = results.get(0).value.queryResult();
|
||||
shardIndex = results.get(0).index;
|
||||
} else {
|
||||
// lets see if we only got hits from a single shard, if so, we can optimize...
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results1.asList()) {
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
|
||||
if (entry.value.queryResult().topDocs().scoreDocs.length > 0) {
|
||||
if (result != null) { // we already have one, can't really optimize
|
||||
canOptimize = false;
|
||||
|
@ -186,8 +182,10 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = QUERY_RESULT_ORDERING.sortedCopy(results1.asList());
|
||||
QuerySearchResultProvider firstResult = results.get(0).value;
|
||||
@SuppressWarnings("unchecked")
|
||||
AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
|
||||
Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
|
||||
QuerySearchResultProvider firstResult = sortedResults[0].value;
|
||||
|
||||
int totalNumDocs = 0;
|
||||
|
||||
|
@ -195,7 +193,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
if (firstResult.includeFetch()) {
|
||||
// if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
|
||||
// this is also important since we shortcut and fetch only docs from "from" and up to "size"
|
||||
queueSize *= results.size();
|
||||
queueSize *= sortedResults.length;
|
||||
}
|
||||
|
||||
// we don't use TopDocs#merge here because with TopDocs#merge, when pagination, we need to ask for "from + size" topN
|
||||
|
@ -210,7 +208,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
for (int i = 0; i < fieldDocs.fields.length; i++) {
|
||||
boolean allValuesAreNull = true;
|
||||
boolean resolvedField = false;
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
|
||||
for (ScoreDoc doc : entry.value.queryResult().topDocs().scoreDocs) {
|
||||
FieldDoc fDoc = (FieldDoc) doc;
|
||||
if (fDoc.fields[i] != null) {
|
||||
|
@ -234,7 +232,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
queue = new ShardFieldDocSortedHitQueue(fieldDocs.fields, queueSize);
|
||||
|
||||
// we need to accumulate for all and then filter the from
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
|
||||
QuerySearchResult result = entry.value.queryResult();
|
||||
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
|
||||
totalNumDocs += scoreDocs.length;
|
||||
|
@ -248,7 +246,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
}
|
||||
} else {
|
||||
queue = new ScoreDocQueue(queueSize); // we need to accumulate for all and then filter the from
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : sortedResults) {
|
||||
QuerySearchResult result = entry.value.queryResult();
|
||||
ScoreDoc[] scoreDocs = result.topDocs().scoreDocs;
|
||||
totalNumDocs += scoreDocs.length;
|
||||
|
@ -266,7 +264,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
int resultDocsSize = firstResult.queryResult().size();
|
||||
if (firstResult.includeFetch()) {
|
||||
// if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
|
||||
resultDocsSize *= results.size();
|
||||
resultDocsSize *= sortedResults.length;
|
||||
}
|
||||
if (totalNumDocs < queueSize) {
|
||||
resultDocsSize = totalNumDocs - firstResult.queryResult().from();
|
||||
|
@ -298,20 +296,22 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResults, AtomicArray<? extends FetchSearchResultProvider> fetchResults) {
|
||||
public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr, AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
|
||||
|
||||
boolean sorted = false;
|
||||
int sortScoreIndex = -1;
|
||||
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
|
||||
List<? extends AtomicArray.Entry<? extends FetchSearchResultProvider>> fetchResults = fetchResultsArr.asList();
|
||||
|
||||
if (queryResults.asList().isEmpty()) {
|
||||
if (queryResults.isEmpty()) {
|
||||
return InternalSearchResponse.EMPTY;
|
||||
}
|
||||
|
||||
QuerySearchResult querySearchResult = queryResults.asList().get(0).value.queryResult();
|
||||
QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
|
||||
|
||||
if (querySearchResult.topDocs() instanceof TopFieldDocs) {
|
||||
boolean sorted = false;
|
||||
int sortScoreIndex = -1;
|
||||
if (firstResult.topDocs() instanceof TopFieldDocs) {
|
||||
sorted = true;
|
||||
TopFieldDocs fieldDocs = (TopFieldDocs) querySearchResult.queryResult().topDocs();
|
||||
TopFieldDocs fieldDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
|
||||
for (int i = 0; i < fieldDocs.fields.length; i++) {
|
||||
if (fieldDocs.fields[i].getType() == SortField.Type.SCORE) {
|
||||
sortScoreIndex = i;
|
||||
|
@ -321,15 +321,15 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
|
||||
// merge facets
|
||||
InternalFacets facets = null;
|
||||
if (!queryResults.asList().isEmpty()) {
|
||||
if (!queryResults.isEmpty()) {
|
||||
// we rely on the fact that the order of facets is the same on all query results
|
||||
if (querySearchResult.facets() != null && querySearchResult.facets().facets() != null && !querySearchResult.facets().facets().isEmpty()) {
|
||||
if (firstResult.facets() != null && firstResult.facets().facets() != null && !firstResult.facets().facets().isEmpty()) {
|
||||
List<Facet> aggregatedFacets = Lists.newArrayList();
|
||||
List<Facet> namedFacets = Lists.newArrayList();
|
||||
for (Facet facet : querySearchResult.facets()) {
|
||||
for (Facet facet : firstResult.facets()) {
|
||||
// aggregate each facet name into a single list, and aggregate it
|
||||
namedFacets.clear();
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
|
||||
for (Facet facet1 : entry.value.queryResult().facets()) {
|
||||
if (facet.getName().equals(facet1.getName())) {
|
||||
namedFacets.add(facet1);
|
||||
|
@ -349,7 +349,7 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
long totalHits = 0;
|
||||
float maxScore = Float.NEGATIVE_INFINITY;
|
||||
boolean timedOut = false;
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
|
||||
QuerySearchResult result = entry.value.queryResult();
|
||||
if (result.searchTimedOut()) {
|
||||
timedOut = true;
|
||||
|
@ -364,15 +364,15 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
}
|
||||
|
||||
// clean the fetch counter
|
||||
for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults.asList()) {
|
||||
for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
|
||||
entry.value.fetchResult().initCounter();
|
||||
}
|
||||
|
||||
// merge hits
|
||||
List<InternalSearchHit> hits = new ArrayList<InternalSearchHit>();
|
||||
if (!fetchResults.asList().isEmpty()) {
|
||||
if (!fetchResults.isEmpty()) {
|
||||
for (ScoreDoc shardDoc : sortedDocs) {
|
||||
FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardIndex);
|
||||
FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
|
||||
if (fetchResultProvider == null) {
|
||||
continue;
|
||||
}
|
||||
|
@ -398,10 +398,10 @@ public class SearchPhaseController extends AbstractComponent {
|
|||
|
||||
// merge suggest results
|
||||
Suggest suggest = null;
|
||||
if (!queryResults.asList().isEmpty()) {
|
||||
if (!queryResults.isEmpty()) {
|
||||
final Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<String, List<Suggest.Suggestion>>();
|
||||
boolean hasSuggestions = false;
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
|
||||
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
|
||||
Suggest shardResult = entry.value.queryResult().queryResult().suggest();
|
||||
|
||||
if (shardResult == null) {
|
||||
|
|
Loading…
Reference in New Issue