From 6b16b32174063a17d9755960479766358bfc3700 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 3 Apr 2015 11:13:19 +0200 Subject: [PATCH] Aggregations: Fix multi-level breadth-first aggregations. The refactoring in #9544 introduced a regression that broke multi-level aggregations using breadth-first. This was due to sub-aggregators creating deferred collectors before their parent aggregator and then the parent aggregator trying to collect sub aggregators directly instead of going through the deferred wrapper. This commit fixes the issue but we should try to simplify all the pre/post collection logic that we have. Also `breadth_first` is now automatically ignored if the sub aggregators need scores (just like we ignore `execution_mode` when the value does not make sense like using ordinals on a script). Close #9823 --- .../percolator/QueryCollector.java | 1 + .../search/aggregations/AggregationPhase.java | 25 ++++---- .../search/aggregations/AggregatorBase.java | 8 ++- .../aggregations/AggregatorFactories.java | 13 +--- .../bucket/DeferringBucketCollector.java | 5 +- .../bucket/terms/TermsAggregator.java | 8 ++- ...RandomTests.java => EquivalenceTests.java} | 57 ++++++++++++++++- .../aggregations/bucket/TopHitsTests.java | 64 +++++++++---------- .../bucket/nested/NestedAggregatorTest.java | 1 + 9 files changed, 122 insertions(+), 60 deletions(-) rename src/test/java/org/elasticsearch/search/aggregations/{RandomTests.java => EquivalenceTests.java} (87%) diff --git a/src/main/java/org/elasticsearch/percolator/QueryCollector.java b/src/main/java/org/elasticsearch/percolator/QueryCollector.java index 2653c2de1b7..f289e188167 100644 --- a/src/main/java/org/elasticsearch/percolator/QueryCollector.java +++ b/src/main/java/org/elasticsearch/percolator/QueryCollector.java @@ -92,6 +92,7 @@ abstract class QueryCollector extends SimpleCollector { context.aggregations().aggregators(aggregators); } aggregatorCollector = BucketCollector.wrap(aggregatorCollectors); + aggregatorCollector.preCollection(); } public void postMatch(int doc) throws IOException { diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 9d627310142..387d365c62d 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -76,18 +76,20 @@ public class AggregationPhase implements SearchPhase { Aggregator[] aggregators; try { aggregators = context.aggregations().factories().createTopLevelAggregators(aggregationContext); + for (int i = 0; i < aggregators.length; i++) { + if (aggregators[i] instanceof GlobalAggregator == false) { + collectors.add(aggregators[i]); + } + } + context.aggregations().aggregators(aggregators); + if (!collectors.isEmpty()) { + final BucketCollector collector = BucketCollector.wrap(collectors); + collector.preCollection(); + context.searcher().queryCollectors().put(AggregationPhase.class, collector); + } } catch (IOException e) { throw new AggregationInitializationException("Could not initialize aggregators", e); } - for (int i = 0; i < aggregators.length; i++) { - if (aggregators[i] instanceof GlobalAggregator == false) { - collectors.add(aggregators[i]); - } - } - context.aggregations().aggregators(aggregators); - if (!collectors.isEmpty()) { - context.searcher().queryCollectors().put(AggregationPhase.class, (BucketCollector.wrap(collectors))); - } } } @@ -113,14 +115,15 @@ public class AggregationPhase implements SearchPhase { // optimize the global collector based execution if (!globals.isEmpty()) { - BucketCollector collector = BucketCollector.wrap(globals); + BucketCollector globalsCollector = BucketCollector.wrap(globals); Query query = new ConstantScoreQuery(Queries.MATCH_ALL_FILTER); Filter searchFilter = context.searchFilter(context.types()); if (searchFilter != null) { query = new FilteredQuery(query, searchFilter); } try { - context.searcher().search(query, collector); + globalsCollector.preCollection(); + context.searcher().search(query, globalsCollector); } catch (Exception e) { throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 874ad71cfa8..4d83603a088 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -99,7 +99,12 @@ public abstract class AggregatorBase extends Aggregator { */ @Override public boolean needsScores() { - return collectableSubAggregators.needsScores(); + for (Aggregator agg : subAggregators) { + if (agg.needsScores()) { + return true; + } + } + return false; } public Map metaData() { @@ -145,6 +150,7 @@ public abstract class AggregatorBase extends Aggregator { } collectableSubAggregators = BucketCollector.wrap(collectors); doPreCollection(); + collectableSubAggregators.preCollection(); } /** diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 10ea7f74c2c..98cc7e39e1a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -44,15 +44,6 @@ public class AggregatorFactories { this.factories = factories; } - private static Aggregator createAndRegisterContextAware(AggregationContext context, AggregatorFactory factory, Aggregator parent, boolean collectsFromSingleBucket) throws IOException { - final Aggregator aggregator = factory.create(context, parent, collectsFromSingleBucket); - // Once the aggregator is fully constructed perform any initialisation - - // can't do everything in constructors if Aggregator base class needs - // to delegate to subclasses as part of construction. - aggregator.preCollection(); - return aggregator; - } - /** * Create all aggregators so that they can be consumed with multiple buckets. */ @@ -64,7 +55,7 @@ public class AggregatorFactories { // propagate the fact that only bucket 0 will be collected with single-bucket // aggs final boolean collectsFromSingleBucket = false; - aggregators[i] = createAndRegisterContextAware(parent.context(), factories[i], parent, collectsFromSingleBucket); + aggregators[i] = factories[i].create(parent.context(), parent, collectsFromSingleBucket); } return aggregators; } @@ -75,7 +66,7 @@ public class AggregatorFactories { for (int i = 0; i < factories.length; i++) { // top-level aggs only get called with bucket 0 final boolean collectsFromSingleBucket = true; - aggregators[i] = createAndRegisterContextAware(ctx, factories[i], null, collectsFromSingleBucket); + aggregators[i] = factories[i].create(ctx, null, collectsFromSingleBucket); } return aggregators; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java index 9249da7334b..09686e662d5 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java @@ -73,7 +73,7 @@ public final class DeferringBucketCollector extends BucketCollector { if (collector == null) { throw new ElasticsearchIllegalStateException(); } - return collector.needsScores(); + return false; } /** Set the deferred collectors. */ @@ -138,6 +138,9 @@ public final class DeferringBucketCollector extends BucketCollector { this.selectedBuckets = hash; collector.preCollection(); + if (collector.needsScores()) { + throw new ElasticsearchIllegalStateException("Cannot defer if scores are needed"); + } for (Entry entry : entries) { final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index ef254bb0594..4cfe549a452 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -43,7 +43,7 @@ public abstract class TermsAggregator extends BucketsAggregator { private Explicit shardMinDocCount; private Explicit requiredSize; private Explicit shardSize; - + public BucketCountThresholds(long minDocCount, long shardMinDocCount, int requiredSize, int shardSize) { this.minDocCount = new Explicit<>(minDocCount, false); this.shardMinDocCount = new Explicit<>(shardMinDocCount, false); @@ -157,7 +157,9 @@ public abstract class TermsAggregator extends BucketsAggregator { @Override protected boolean shouldDefer(Aggregator aggregator) { - return (collectMode == SubAggCollectionMode.BREADTH_FIRST) && (!aggsUsedForSorting.contains(aggregator)); + return collectMode == SubAggCollectionMode.BREADTH_FIRST + && aggregator.needsScores() == false + && !aggsUsedForSorting.contains(aggregator); } - + } diff --git a/src/test/java/org/elasticsearch/search/aggregations/RandomTests.java b/src/test/java/org/elasticsearch/search/aggregations/EquivalenceTests.java similarity index 87% rename from src/test/java/org/elasticsearch/search/aggregations/RandomTests.java rename to src/test/java/org/elasticsearch/search/aggregations/EquivalenceTests.java index c87f00bdadc..5079e6730dd 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/RandomTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/EquivalenceTests.java @@ -42,7 +42,10 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -67,7 +70,7 @@ import static org.hamcrest.core.IsNull.notNullValue; * the growth of dynamic arrays is tested. */ @Slow -public class RandomTests extends ElasticsearchIntegrationTest { +public class EquivalenceTests extends ElasticsearchIntegrationTest { // Make sure that unordered, reversed, disjoint and/or overlapping ranges are supported // Duel with filters @@ -380,4 +383,56 @@ public class RandomTests extends ElasticsearchIntegrationTest { assertEquals(value >= 6 ? value : 0, sum.getValue(), 0d); } + private void assertEquals(Terms t1, Terms t2) { + List t1Buckets = t1.getBuckets(); + List t2Buckets = t1.getBuckets(); + assertEquals(t1Buckets.size(), t2Buckets.size()); + for (Iterator it1 = t1Buckets.iterator(), it2 = t2Buckets.iterator(); it1.hasNext(); ) { + final Terms.Bucket b1 = it1.next(); + final Terms.Bucket b2 = it2.next(); + assertEquals(b1.getDocCount(), b2.getDocCount()); + assertEquals(b1.getKey(), b2.getKey()); + } + } + + public void testDuelDepthBreadthFirst() throws Exception { + createIndex("idx"); + final int numDocs = randomIntBetween(100, 500); + List reqs = new ArrayList<>(); + for (int i = 0; i < numDocs; ++i) { + final int v1 = randomInt(1 << randomInt(7)); + final int v2 = randomInt(1 << randomInt(7)); + final int v3 = randomInt(1 << randomInt(7)); + reqs.add(client().prepareIndex("idx", "type").setSource("f1", v1, "f2", v2, "f3", v3)); + } + indexRandom(true, reqs); + + final SearchResponse r1 = client().prepareSearch("idx").addAggregation( + terms("f1").field("f1").collectMode(SubAggCollectionMode.DEPTH_FIRST) + .subAggregation(terms("f2").field("f2").collectMode(SubAggCollectionMode.DEPTH_FIRST) + .subAggregation(terms("f3").field("f3").collectMode(SubAggCollectionMode.DEPTH_FIRST)))).get(); + assertSearchResponse(r1); + final SearchResponse r2 = client().prepareSearch("idx").addAggregation( + terms("f1").field("f1").collectMode(SubAggCollectionMode.BREADTH_FIRST) + .subAggregation(terms("f2").field("f2").collectMode(SubAggCollectionMode.BREADTH_FIRST) + .subAggregation(terms("f3").field("f3").collectMode(SubAggCollectionMode.BREADTH_FIRST)))).get(); + assertSearchResponse(r2); + + final Terms t1 = r1.getAggregations().get("f1"); + final Terms t2 = r2.getAggregations().get("f1"); + assertEquals(t1, t2); + for (Terms.Bucket b1 : t1.getBuckets()) { + final Terms.Bucket b2 = t2.getBucketByKey(b1.getKeyAsString()); + final Terms sub1 = b1.getAggregations().get("f2"); + final Terms sub2 = b2.getAggregations().get("f2"); + assertEquals(sub1, sub2); + for (Terms.Bucket subB1 : sub1.getBuckets()) { + final Terms.Bucket subB2 = sub2.getBucketByKey(subB1.getKeyAsString()); + final Terms subSub1 = subB1.getAggregations().get("f3"); + final Terms subSub2 = subB2.getAggregations().get("f3"); + assertEquals(subSub1, subSub2); + } + } + } + } diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java index 3e51fe9ccd2..0b8b2ba810f 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java @@ -46,7 +46,6 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.smileBuilder; @@ -271,6 +270,38 @@ public class TopHitsTests extends ElasticsearchIntegrationTest { } } + @Test + public void testBreadthFirst() throws Exception { + // breadth_first will be ignored since we need scores + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms") + .executionHint(randomExecutionHint()) + .collectMode(SubAggCollectionMode.BREADTH_FIRST) + .field(TERMS_AGGS_FIELD) + .subAggregation(topHits("hits").setSize(3)) + ).get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.getBuckets().size(), equalTo(5)); + + for (int i = 0; i < 5; i++) { + Terms.Bucket bucket = terms.getBucketByKey("val" + i); + assertThat(bucket, notNullValue()); + assertThat(key(bucket), equalTo("val" + i)); + assertThat(bucket.getDocCount(), equalTo(10l)); + TopHits topHits = bucket.getAggregations().get("hits"); + SearchHits hits = topHits.getHits(); + assertThat(hits.totalHits(), equalTo(10l)); + assertThat(hits.getHits().length, equalTo(3)); + + assertThat(hits.getAt(0).sourceAsMap().size(), equalTo(4)); + } + } + @Test public void testBasics_getProperty() throws Exception { SearchResponse searchResponse = client().prepareSearch("idx").setQuery(matchAllQuery()) @@ -531,37 +562,6 @@ public class TopHitsTests extends ElasticsearchIntegrationTest { assertThat(e.getMessage(), containsString("Aggregator [top_tags_hits] of type [top_hits] cannot accept sub-aggregations")); } } - - @Test - public void testFailDeferredOnlyWhenScorerIsUsed() throws Exception { - // No track_scores or score based sort defined in top_hits agg, so don't fail: - SearchResponse response = client().prepareSearch("idx") - .setTypes("type") - .addAggregation( - terms("terms").executionHint(randomExecutionHint()).field(TERMS_AGGS_FIELD) - .collectMode(SubAggCollectionMode.BREADTH_FIRST) - .subAggregation(topHits("hits").addSort(SortBuilders.fieldSort(SORT_FIELD).order(SortOrder.DESC)))) - .get(); - assertSearchResponse(response); - - // Score based, so fail with deferred aggs: - try { - client().prepareSearch("idx") - .setTypes("type") - .addAggregation( - terms("terms").executionHint(randomExecutionHint()).field(TERMS_AGGS_FIELD) - .collectMode(SubAggCollectionMode.BREADTH_FIRST) - .subAggregation(topHits("hits"))) - .get(); - fail(); - } catch (Exception e) { - // It is considered a parse failure if the search request asks for top_hits - // under an aggregation with collect_mode set to breadth_first as this would - // require the buffering of scores alongside each document ID and that is a - // a RAM cost we are not willing to pay - assertThat(e.getMessage(), containsString("ElasticsearchIllegalStateException")); - } - } @Test public void testEmptyIndex() throws Exception { diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTest.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTest.java index 7cdff38d7c8..cea6efd8747 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTest.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTest.java @@ -125,6 +125,7 @@ public class NestedAggregatorTest extends ElasticsearchSingleNodeLuceneTestCase searchContext.aggregations(new SearchContextAggregations(factories)); Aggregator[] aggs = factories.createTopLevelAggregators(context); BucketCollector collector = BucketCollector.wrap(Arrays.asList(aggs)); + collector.preCollection(); // A regular search always exclude nested docs, so we use NonNestedDocsFilter.INSTANCE here (otherwise MatchAllDocsQuery would be sufficient) // We exclude root doc with uid type#2, this will trigger the bug if we don't reset the root doc when we process a new segment, because // root doc type#3 and root doc type#1 have the same segment docid