diff --git a/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 8508a8a2e40..fcc443c57b6 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -638,31 +638,6 @@ public class Lucene { } } - /** - * Return a Scorer that throws an ElasticsearchIllegalStateException - * on all operations with the given message. - */ - public static Scorer illegalScorer(final String message) { - return new Scorer(null) { - @Override - public float score() throws IOException { - throw new IllegalStateException(message); - } - @Override - public int freq() throws IOException { - throw new IllegalStateException(message); - } - @Override - public int docID() { - throw new IllegalStateException(message); - } - @Override - public DocIdSetIterator iterator() { - throw new IllegalStateException(message); - } - }; - } - private static final class CommitPoint extends IndexCommit { private String segmentsFileName; private final Collection files; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index e70780ec48c..04023b04977 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -165,7 +165,7 @@ public abstract class AggregatorBase extends Aggregator { public DeferringBucketCollector getDeferringCollector() { // Default impl is a collector that selects the best buckets // but an alternative defer policy may be based on best docs. - return new BestBucketsDeferringCollector(); + return new BestBucketsDeferringCollector(context()); } /** diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java index 8a379d1ad82..43cefdca290 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java @@ -20,6 +20,9 @@ package org.elasticsearch.search.aggregations.bucket; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedLongValues; import org.elasticsearch.common.lucene.Lucene; @@ -30,6 +33,7 @@ import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.support.AggregationContext; import java.io.IOException; import java.util.ArrayList; @@ -56,6 +60,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector { final List entries = new ArrayList<>(); BucketCollector collector; + final AggregationContext aggContext; LeafReaderContext context; PackedLongValues.Builder docDeltas; PackedLongValues.Builder buckets; @@ -64,7 +69,8 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector { LongHash selectedBuckets; /** Sole constructor. */ - public BestBucketsDeferringCollector() { + public BestBucketsDeferringCollector(AggregationContext context) { + this.aggContext = context; } @Override @@ -139,19 +145,34 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector { this.selectedBuckets = hash; collector.preCollection(); - if (collector.needsScores()) { - throw new IllegalStateException("Cannot defer if scores are needed"); + boolean needsScores = collector.needsScores(); + Weight weight = null; + if (needsScores) { + weight = aggContext.searchContext().searcher() + .createNormalizedWeight(aggContext.searchContext().query(), true); } - for (Entry entry : entries) { final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); - leafCollector.setScorer(Lucene.illegalScorer("A limitation of the " + SubAggCollectionMode.BREADTH_FIRST - + " collection mode is that scores cannot be buffered along with document IDs")); + DocIdSetIterator docIt = null; + if (needsScores && entry.docDeltas.size() > 0) { + Scorer scorer = weight.scorer(entry.context); + // We don't need to check if the scorer is null + // since we are sure that there are documents to replay (entry.docDeltas it not empty). + docIt = scorer.iterator(); + leafCollector.setScorer(scorer); + } final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); final PackedLongValues.Iterator buckets = entry.buckets.iterator(); int doc = 0; for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { doc += docDeltaIterator.next(); + if (needsScores) { + if (docIt.docID() < doc) { + docIt.advance(doc); + } + // aggregations should only be replayed on matching documents + assert docIt.docID() == doc; + } final long bucket = buckets.next(); final long rebasedBucket = hash.find(bucket); if (rebasedBucket != -1) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index 1c59711b646..eefaf63c62b 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -199,7 +199,6 @@ public abstract class TermsAggregator extends BucketsAggregator { @Override protected boolean shouldDefer(Aggregator aggregator) { return collectMode == SubAggCollectionMode.BREADTH_FIRST - && aggregator.needsScores() == false && !aggsUsedForSorting.contains(aggregator); } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java index 36d021a5f3f..2cd1fed7fbe 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java @@ -351,8 +351,7 @@ public class TopHitsIT extends ESIntegTestCase { } - public void testBreadthFirst() throws Exception { - // breadth_first will be ignored since we need scores + public void testBreadthFirstWithScoreNeeded() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") .addAggregation(terms("terms") .executionHint(randomExecutionHint()) @@ -382,6 +381,38 @@ public class TopHitsIT extends ESIntegTestCase { } } + public void testBreadthFirstWithAggOrderAndScoreNeeded() throws Exception { + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms") + .executionHint(randomExecutionHint()) + .collectMode(SubAggCollectionMode.BREADTH_FIRST) + .field(TERMS_AGGS_FIELD) + .order(Terms.Order.aggregation("max", false)) + .subAggregation(max("max").field(SORT_FIELD)) + .subAggregation(topHits("hits").size(3)) + ).get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.getBuckets().size(), equalTo(5)); + int id = 4; + for (Terms.Bucket bucket : terms.getBuckets()) { + assertThat(bucket, notNullValue()); + assertThat(key(bucket), equalTo("val" + id)); + assertThat(bucket.getDocCount(), equalTo(10L)); + TopHits topHits = bucket.getAggregations().get("hits"); + SearchHits hits = topHits.getHits(); + assertThat(hits.totalHits(), equalTo(10L)); + assertThat(hits.getHits().length, equalTo(3)); + + assertThat(hits.getAt(0).sourceAsMap().size(), equalTo(4)); + id --; + } + } + public void testBasicsGetProperty() throws Exception { SearchResponse searchResponse = client().prepareSearch("idx").setQuery(matchAllQuery()) .addAggregation(global("global").subAggregation(topHits("hits"))).execute().actionGet(); diff --git a/core/src/test/java/org/elasticsearch/search/profile/ProfileTests.java b/core/src/test/java/org/elasticsearch/search/profile/ProfileTests.java index 83f6efaa150..afc1c7b3cbb 100644 --- a/core/src/test/java/org/elasticsearch/search/profile/ProfileTests.java +++ b/core/src/test/java/org/elasticsearch/search/profile/ProfileTests.java @@ -164,7 +164,7 @@ public class ProfileTests extends ESTestCase { final LeafCollector leafCollector = profileCollector.getLeafCollector(reader.leaves().get(0)); assertThat(profileCollector.getTime(), greaterThan(0L)); long time = profileCollector.getTime(); - leafCollector.setScorer(Lucene.illegalScorer("dummy scorer")); + leafCollector.setScorer(null); assertThat(profileCollector.getTime(), greaterThan(time)); time = profileCollector.getTime(); leafCollector.collect(0); diff --git a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc index 5d79c5580cb..14dec039435 100644 --- a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc @@ -635,9 +635,8 @@ elasticsearch will always use the `depth_first` collect_mode unless explicitly i Note that the `order` parameter can still be used to refer to data from a child aggregation when using the `breadth_first` setting - the parent aggregation understands that this child aggregation will need to be called first before any of the other child aggregations. -WARNING: It is not possible to nest aggregations such as `top_hits` which require access to match score information under an aggregation that uses -the `breadth_first` collection mode. This is because this would require a RAM buffer to hold the float score value for every document and -this would typically be too costly in terms of RAM. +WARNING: Nested aggregations such as `top_hits` which require access to score information under an aggregation that uses the `breadth_first` +collection mode need to replay the query on the second pass but only for the documents belonging to the top buckets. [[search-aggregations-bucket-terms-aggregation-execution-hint]] ==== Execution hint