diff --git a/src/main/java/org/elasticsearch/percolator/QueryCollector.java b/src/main/java/org/elasticsearch/percolator/QueryCollector.java index 2653c2de1b7..f289e188167 100644 --- a/src/main/java/org/elasticsearch/percolator/QueryCollector.java +++ b/src/main/java/org/elasticsearch/percolator/QueryCollector.java @@ -92,6 +92,7 @@ abstract class QueryCollector extends SimpleCollector { context.aggregations().aggregators(aggregators); } aggregatorCollector = BucketCollector.wrap(aggregatorCollectors); + aggregatorCollector.preCollection(); } public void postMatch(int doc) throws IOException { diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 9d627310142..387d365c62d 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -76,18 +76,20 @@ public class AggregationPhase implements SearchPhase { Aggregator[] aggregators; try { aggregators = context.aggregations().factories().createTopLevelAggregators(aggregationContext); + for (int i = 0; i < aggregators.length; i++) { + if (aggregators[i] instanceof GlobalAggregator == false) { + collectors.add(aggregators[i]); + } + } + context.aggregations().aggregators(aggregators); + if (!collectors.isEmpty()) { + final BucketCollector collector = BucketCollector.wrap(collectors); + collector.preCollection(); + context.searcher().queryCollectors().put(AggregationPhase.class, collector); + } } catch (IOException e) { throw new AggregationInitializationException("Could not initialize aggregators", e); } - for (int i = 0; i < aggregators.length; i++) { - if (aggregators[i] instanceof GlobalAggregator == false) { - collectors.add(aggregators[i]); - } - } - context.aggregations().aggregators(aggregators); - if (!collectors.isEmpty()) { - context.searcher().queryCollectors().put(AggregationPhase.class, (BucketCollector.wrap(collectors))); - } } } @@ -113,14 +115,15 @@ public class AggregationPhase implements SearchPhase { // optimize the global collector based execution if (!globals.isEmpty()) { - BucketCollector collector = BucketCollector.wrap(globals); + BucketCollector globalsCollector = BucketCollector.wrap(globals); Query query = new ConstantScoreQuery(Queries.MATCH_ALL_FILTER); Filter searchFilter = context.searchFilter(context.types()); if (searchFilter != null) { query = new FilteredQuery(query, searchFilter); } try { - context.searcher().search(query, collector); + globalsCollector.preCollection(); + context.searcher().search(query, globalsCollector); } catch (Exception e) { throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 874ad71cfa8..4d83603a088 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -99,7 +99,12 @@ public abstract class AggregatorBase extends Aggregator { */ @Override public boolean needsScores() { - return collectableSubAggregators.needsScores(); + for (Aggregator agg : subAggregators) { + if (agg.needsScores()) { + return true; + } + } + return false; } public Map metaData() { @@ -145,6 +150,7 @@ public abstract class AggregatorBase extends Aggregator { } collectableSubAggregators = BucketCollector.wrap(collectors); doPreCollection(); + collectableSubAggregators.preCollection(); } /** diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 10ea7f74c2c..98cc7e39e1a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -44,15 +44,6 @@ public class AggregatorFactories { this.factories = factories; } - private static Aggregator createAndRegisterContextAware(AggregationContext context, AggregatorFactory factory, Aggregator parent, boolean collectsFromSingleBucket) throws IOException { - final Aggregator aggregator = factory.create(context, parent, collectsFromSingleBucket); - // Once the aggregator is fully constructed perform any initialisation - - // can't do everything in constructors if Aggregator base class needs - // to delegate to subclasses as part of construction. - aggregator.preCollection(); - return aggregator; - } - /** * Create all aggregators so that they can be consumed with multiple buckets. */ @@ -64,7 +55,7 @@ public class AggregatorFactories { // propagate the fact that only bucket 0 will be collected with single-bucket // aggs final boolean collectsFromSingleBucket = false; - aggregators[i] = createAndRegisterContextAware(parent.context(), factories[i], parent, collectsFromSingleBucket); + aggregators[i] = factories[i].create(parent.context(), parent, collectsFromSingleBucket); } return aggregators; } @@ -75,7 +66,7 @@ public class AggregatorFactories { for (int i = 0; i < factories.length; i++) { // top-level aggs only get called with bucket 0 final boolean collectsFromSingleBucket = true; - aggregators[i] = createAndRegisterContextAware(ctx, factories[i], null, collectsFromSingleBucket); + aggregators[i] = factories[i].create(ctx, null, collectsFromSingleBucket); } return aggregators; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java index 9249da7334b..09686e662d5 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java @@ -73,7 +73,7 @@ public final class DeferringBucketCollector extends BucketCollector { if (collector == null) { throw new ElasticsearchIllegalStateException(); } - return collector.needsScores(); + return false; } /** Set the deferred collectors. */ @@ -138,6 +138,9 @@ public final class DeferringBucketCollector extends BucketCollector { this.selectedBuckets = hash; collector.preCollection(); + if (collector.needsScores()) { + throw new ElasticsearchIllegalStateException("Cannot defer if scores are needed"); + } for (Entry entry : entries) { final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index ef254bb0594..4cfe549a452 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -43,7 +43,7 @@ public abstract class TermsAggregator extends BucketsAggregator { private Explicit shardMinDocCount; private Explicit requiredSize; private Explicit shardSize; - + public BucketCountThresholds(long minDocCount, long shardMinDocCount, int requiredSize, int shardSize) { this.minDocCount = new Explicit<>(minDocCount, false); this.shardMinDocCount = new Explicit<>(shardMinDocCount, false); @@ -157,7 +157,9 @@ public abstract class TermsAggregator extends BucketsAggregator { @Override protected boolean shouldDefer(Aggregator aggregator) { - return (collectMode == SubAggCollectionMode.BREADTH_FIRST) && (!aggsUsedForSorting.contains(aggregator)); + return collectMode == SubAggCollectionMode.BREADTH_FIRST + && aggregator.needsScores() == false + && !aggsUsedForSorting.contains(aggregator); } - + } diff --git a/src/test/java/org/elasticsearch/search/aggregations/RandomTests.java b/src/test/java/org/elasticsearch/search/aggregations/EquivalenceTests.java similarity index 87% rename from src/test/java/org/elasticsearch/search/aggregations/RandomTests.java rename to src/test/java/org/elasticsearch/search/aggregations/EquivalenceTests.java index c87f00bdadc..5079e6730dd 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/RandomTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/EquivalenceTests.java @@ -42,7 +42,10 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -67,7 +70,7 @@ import static org.hamcrest.core.IsNull.notNullValue; * the growth of dynamic arrays is tested. */ @Slow -public class RandomTests extends ElasticsearchIntegrationTest { +public class EquivalenceTests extends ElasticsearchIntegrationTest { // Make sure that unordered, reversed, disjoint and/or overlapping ranges are supported // Duel with filters @@ -380,4 +383,56 @@ public class RandomTests extends ElasticsearchIntegrationTest { assertEquals(value >= 6 ? value : 0, sum.getValue(), 0d); } + private void assertEquals(Terms t1, Terms t2) { + List t1Buckets = t1.getBuckets(); + List t2Buckets = t1.getBuckets(); + assertEquals(t1Buckets.size(), t2Buckets.size()); + for (Iterator it1 = t1Buckets.iterator(), it2 = t2Buckets.iterator(); it1.hasNext(); ) { + final Terms.Bucket b1 = it1.next(); + final Terms.Bucket b2 = it2.next(); + assertEquals(b1.getDocCount(), b2.getDocCount()); + assertEquals(b1.getKey(), b2.getKey()); + } + } + + public void testDuelDepthBreadthFirst() throws Exception { + createIndex("idx"); + final int numDocs = randomIntBetween(100, 500); + List reqs = new ArrayList<>(); + for (int i = 0; i < numDocs; ++i) { + final int v1 = randomInt(1 << randomInt(7)); + final int v2 = randomInt(1 << randomInt(7)); + final int v3 = randomInt(1 << randomInt(7)); + reqs.add(client().prepareIndex("idx", "type").setSource("f1", v1, "f2", v2, "f3", v3)); + } + indexRandom(true, reqs); + + final SearchResponse r1 = client().prepareSearch("idx").addAggregation( + terms("f1").field("f1").collectMode(SubAggCollectionMode.DEPTH_FIRST) + .subAggregation(terms("f2").field("f2").collectMode(SubAggCollectionMode.DEPTH_FIRST) + .subAggregation(terms("f3").field("f3").collectMode(SubAggCollectionMode.DEPTH_FIRST)))).get(); + assertSearchResponse(r1); + final SearchResponse r2 = client().prepareSearch("idx").addAggregation( + terms("f1").field("f1").collectMode(SubAggCollectionMode.BREADTH_FIRST) + .subAggregation(terms("f2").field("f2").collectMode(SubAggCollectionMode.BREADTH_FIRST) + .subAggregation(terms("f3").field("f3").collectMode(SubAggCollectionMode.BREADTH_FIRST)))).get(); + assertSearchResponse(r2); + + final Terms t1 = r1.getAggregations().get("f1"); + final Terms t2 = r2.getAggregations().get("f1"); + assertEquals(t1, t2); + for (Terms.Bucket b1 : t1.getBuckets()) { + final Terms.Bucket b2 = t2.getBucketByKey(b1.getKeyAsString()); + final Terms sub1 = b1.getAggregations().get("f2"); + final Terms sub2 = b2.getAggregations().get("f2"); + assertEquals(sub1, sub2); + for (Terms.Bucket subB1 : sub1.getBuckets()) { + final Terms.Bucket subB2 = sub2.getBucketByKey(subB1.getKeyAsString()); + final Terms subSub1 = subB1.getAggregations().get("f3"); + final Terms subSub2 = subB2.getAggregations().get("f3"); + assertEquals(subSub1, subSub2); + } + } + } + } diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java index 3e51fe9ccd2..0b8b2ba810f 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java @@ -46,7 +46,6 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.smileBuilder; @@ -271,6 +270,38 @@ public class TopHitsTests extends ElasticsearchIntegrationTest { } } + @Test + public void testBreadthFirst() throws Exception { + // breadth_first will be ignored since we need scores + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms") + .executionHint(randomExecutionHint()) + .collectMode(SubAggCollectionMode.BREADTH_FIRST) + .field(TERMS_AGGS_FIELD) + .subAggregation(topHits("hits").setSize(3)) + ).get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.getBuckets().size(), equalTo(5)); + + for (int i = 0; i < 5; i++) { + Terms.Bucket bucket = terms.getBucketByKey("val" + i); + assertThat(bucket, notNullValue()); + assertThat(key(bucket), equalTo("val" + i)); + assertThat(bucket.getDocCount(), equalTo(10l)); + TopHits topHits = bucket.getAggregations().get("hits"); + SearchHits hits = topHits.getHits(); + assertThat(hits.totalHits(), equalTo(10l)); + assertThat(hits.getHits().length, equalTo(3)); + + assertThat(hits.getAt(0).sourceAsMap().size(), equalTo(4)); + } + } + @Test public void testBasics_getProperty() throws Exception { SearchResponse searchResponse = client().prepareSearch("idx").setQuery(matchAllQuery()) @@ -531,37 +562,6 @@ public class TopHitsTests extends ElasticsearchIntegrationTest { assertThat(e.getMessage(), containsString("Aggregator [top_tags_hits] of type [top_hits] cannot accept sub-aggregations")); } } - - @Test - public void testFailDeferredOnlyWhenScorerIsUsed() throws Exception { - // No track_scores or score based sort defined in top_hits agg, so don't fail: - SearchResponse response = client().prepareSearch("idx") - .setTypes("type") - .addAggregation( - terms("terms").executionHint(randomExecutionHint()).field(TERMS_AGGS_FIELD) - .collectMode(SubAggCollectionMode.BREADTH_FIRST) - .subAggregation(topHits("hits").addSort(SortBuilders.fieldSort(SORT_FIELD).order(SortOrder.DESC)))) - .get(); - assertSearchResponse(response); - - // Score based, so fail with deferred aggs: - try { - client().prepareSearch("idx") - .setTypes("type") - .addAggregation( - terms("terms").executionHint(randomExecutionHint()).field(TERMS_AGGS_FIELD) - .collectMode(SubAggCollectionMode.BREADTH_FIRST) - .subAggregation(topHits("hits"))) - .get(); - fail(); - } catch (Exception e) { - // It is considered a parse failure if the search request asks for top_hits - // under an aggregation with collect_mode set to breadth_first as this would - // require the buffering of scores alongside each document ID and that is a - // a RAM cost we are not willing to pay - assertThat(e.getMessage(), containsString("ElasticsearchIllegalStateException")); - } - } @Test public void testEmptyIndex() throws Exception { diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTest.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTest.java index 7cdff38d7c8..cea6efd8747 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTest.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTest.java @@ -125,6 +125,7 @@ public class NestedAggregatorTest extends ElasticsearchSingleNodeLuceneTestCase searchContext.aggregations(new SearchContextAggregations(factories)); Aggregator[] aggs = factories.createTopLevelAggregators(context); BucketCollector collector = BucketCollector.wrap(Arrays.asList(aggs)); + collector.preCollection(); // A regular search always exclude nested docs, so we use NonNestedDocsFilter.INSTANCE here (otherwise MatchAllDocsQuery would be sufficient) // We exclude root doc with uid type#2, this will trigger the bug if we don't reset the root doc when we process a new segment, because // root doc type#3 and root doc type#1 have the same segment docid