Merge pull request #18127 from jimferenczi/breadth_first_needs_score

Add the ability to use the breadth_first mode with nested aggregations (such as `top_hits`) which require access to score information.
This commit is contained in:
Jim Ferenczi 2016-05-04 15:57:08 +02:00
commit 52eb6f3830
7 changed files with 64 additions and 39 deletions

View File

@ -638,31 +638,6 @@ public class Lucene {
}
}
/**
* Return a Scorer that throws an ElasticsearchIllegalStateException
* on all operations with the given message.
*/
public static Scorer illegalScorer(final String message) {
return new Scorer(null) {
@Override
public float score() throws IOException {
throw new IllegalStateException(message);
}
@Override
public int freq() throws IOException {
throw new IllegalStateException(message);
}
@Override
public int docID() {
throw new IllegalStateException(message);
}
@Override
public DocIdSetIterator iterator() {
throw new IllegalStateException(message);
}
};
}
private static final class CommitPoint extends IndexCommit {
private String segmentsFileName;
private final Collection<String> files;

View File

@ -165,7 +165,7 @@ public abstract class AggregatorBase extends Aggregator {
public DeferringBucketCollector getDeferringCollector() {
// Default impl is a collector that selects the best buckets
// but an alternative defer policy may be based on best docs.
return new BestBucketsDeferringCollector();
return new BestBucketsDeferringCollector(context());
}
/**

View File

@ -20,6 +20,9 @@
package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;
import org.elasticsearch.common.lucene.Lucene;
@ -30,6 +33,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException;
import java.util.ArrayList;
@ -56,6 +60,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
final List<Entry> entries = new ArrayList<>();
BucketCollector collector;
final AggregationContext aggContext;
LeafReaderContext context;
PackedLongValues.Builder docDeltas;
PackedLongValues.Builder buckets;
@ -64,7 +69,8 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
LongHash selectedBuckets;
/** Sole constructor. */
public BestBucketsDeferringCollector() {
public BestBucketsDeferringCollector(AggregationContext context) {
this.aggContext = context;
}
@Override
@ -139,19 +145,34 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
this.selectedBuckets = hash;
collector.preCollection();
if (collector.needsScores()) {
throw new IllegalStateException("Cannot defer if scores are needed");
boolean needsScores = collector.needsScores();
Weight weight = null;
if (needsScores) {
weight = aggContext.searchContext().searcher()
.createNormalizedWeight(aggContext.searchContext().query(), true);
}
for (Entry entry : entries) {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
leafCollector.setScorer(Lucene.illegalScorer("A limitation of the " + SubAggCollectionMode.BREADTH_FIRST
+ " collection mode is that scores cannot be buffered along with document IDs"));
DocIdSetIterator docIt = null;
if (needsScores && entry.docDeltas.size() > 0) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
docIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
}
// aggregations should only be replayed on matching documents
assert docIt.docID() == doc;
}
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {

View File

@ -199,7 +199,6 @@ public abstract class TermsAggregator extends BucketsAggregator {
@Override
protected boolean shouldDefer(Aggregator aggregator) {
return collectMode == SubAggCollectionMode.BREADTH_FIRST
&& aggregator.needsScores() == false
&& !aggsUsedForSorting.contains(aggregator);
}

View File

@ -351,8 +351,7 @@ public class TopHitsIT extends ESIntegTestCase {
}
public void testBreadthFirst() throws Exception {
// breadth_first will be ignored since we need scores
public void testBreadthFirstWithScoreNeeded() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
@ -382,6 +381,38 @@ public class TopHitsIT extends ESIntegTestCase {
}
}
public void testBreadthFirstWithAggOrderAndScoreNeeded() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.collectMode(SubAggCollectionMode.BREADTH_FIRST)
.field(TERMS_AGGS_FIELD)
.order(Terms.Order.aggregation("max", false))
.subAggregation(max("max").field(SORT_FIELD))
.subAggregation(topHits("hits").size(3))
).get();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.getBuckets().size(), equalTo(5));
int id = 4;
for (Terms.Bucket bucket : terms.getBuckets()) {
assertThat(bucket, notNullValue());
assertThat(key(bucket), equalTo("val" + id));
assertThat(bucket.getDocCount(), equalTo(10L));
TopHits topHits = bucket.getAggregations().get("hits");
SearchHits hits = topHits.getHits();
assertThat(hits.totalHits(), equalTo(10L));
assertThat(hits.getHits().length, equalTo(3));
assertThat(hits.getAt(0).sourceAsMap().size(), equalTo(4));
id --;
}
}
public void testBasicsGetProperty() throws Exception {
SearchResponse searchResponse = client().prepareSearch("idx").setQuery(matchAllQuery())
.addAggregation(global("global").subAggregation(topHits("hits"))).execute().actionGet();

View File

@ -164,7 +164,7 @@ public class ProfileTests extends ESTestCase {
final LeafCollector leafCollector = profileCollector.getLeafCollector(reader.leaves().get(0));
assertThat(profileCollector.getTime(), greaterThan(0L));
long time = profileCollector.getTime();
leafCollector.setScorer(Lucene.illegalScorer("dummy scorer"));
leafCollector.setScorer(null);
assertThat(profileCollector.getTime(), greaterThan(time));
time = profileCollector.getTime();
leafCollector.collect(0);

View File

@ -635,9 +635,8 @@ elasticsearch will always use the `depth_first` collect_mode unless explicitly i
Note that the `order` parameter can still be used to refer to data from a child aggregation when using the `breadth_first` setting - the parent
aggregation understands that this child aggregation will need to be called first before any of the other child aggregations.
WARNING: It is not possible to nest aggregations such as `top_hits` which require access to match score information under an aggregation that uses
the `breadth_first` collection mode. This is because this would require a RAM buffer to hold the float score value for every document and
this would typically be too costly in terms of RAM.
WARNING: Nested aggregations such as `top_hits` which require access to score information under an aggregation that uses the `breadth_first`
collection mode need to replay the query on the second pass but only for the documents belonging to the top buckets.
[[search-aggregations-bucket-terms-aggregation-execution-hint]]
==== Execution hint