From 4d5be7c81778c2e32f8d6b63c28dbc871e876593 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 27 May 2020 12:03:28 -0400 Subject: [PATCH] Save memory on numeric sig terms when not top (backport of #56789) (#57221) This saves memory when running numeric significant terms which are not at the top level by merging its collection into numeric terms and relying on the optimization that we made in #55873. --- docs/reference/search/profile.asciidoc | 10 +- .../test/search.aggregation/20_terms.yml | 3 +- .../SignificantTermsSignificanceScoreIT.java | 7 +- .../bucket/terms/DoubleTerms.java | 2 +- .../bucket/terms/DoubleTermsAggregator.java | 78 --- .../bucket/terms/LongTermsAggregator.java | 196 ------ .../bucket/terms/NumericTermsAggregator.java | 655 ++++++++++++++++++ .../terms/SignificantLongTermsAggregator.java | 136 ---- .../SignificantTermsAggregatorFactory.java | 49 +- .../SignificantTermsAggregatorSupplier.java | 3 + .../bucket/terms/TermsAggregatorFactory.java | 26 +- .../bucket/nested/NestedAggregatorTests.java | 4 +- .../nested/ReverseNestedAggregatorTests.java | 4 +- .../SignificantTermsAggregatorTests.java | 50 ++ .../terms/SignificantTextAggregatorTests.java | 2 + 15 files changed, 772 insertions(+), 453 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java delete mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java delete mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java diff --git a/docs/reference/search/profile.asciidoc b/docs/reference/search/profile.asciidoc index 342ca2c6a7b..63486c87c56 100644 --- a/docs/reference/search/profile.asciidoc +++ b/docs/reference/search/profile.asciidoc @@ -782,7 +782,7 @@ This yields the following aggregation profile output: { "aggregations" : [ { - "type" : "LongTermsAggregator", + "type" : "NumericTermsAggregator", "description" : "my_scoped_agg", "time_in_nanos" : 195386, "breakdown" : { @@ -796,6 +796,7 @@ This yields the following aggregation profile output: "collect_count" : 4 }, "debug": { + "result_strategy": "long_terms", "total_buckets": 4 } }, @@ -815,7 +816,7 @@ This yields the following aggregation profile output: }, "children" : [ { - "type" : "LongTermsAggregator", + "type" : "NumericTermsAggregator", "description" : "my_level_agg", "time_in_nanos" : 160329, "breakdown" : { @@ -829,6 +830,7 @@ This yields the following aggregation profile output: "collect_count" : 4, }, "debug": { + "result_strategy": "long_terms", "total_buckets": 4 } } @@ -845,10 +847,10 @@ This yields the following aggregation profile output: // TESTRESPONSE[s/"id": "\[P6-vulHtQRWuD4YnubWb7A\]\[test\]\[0\]"/"id": $body.profile.shards.0.id/] From the profile structure we can see that the `my_scoped_agg` is internally -being run as a `LongTermsAggregator` (because the field it is aggregating, +being run as a `NumericTermsAggregator` (because the field it is aggregating, `likes`, is a numeric field). At the same level, we see a `GlobalAggregator` which comes from `my_global_agg`. That aggregation then has a child -`LongTermsAggregator` which comes from the second term's aggregation on `likes`. +`NumericTermsAggregator` which comes from the second term's aggregation on `likes`. The `time_in_nanos` field shows the time executed by each aggregation, and is inclusive of all children. While the overall time is useful, the `breakdown` diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml index 03e9f850775..8ba07f1a6bf 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml @@ -837,7 +837,8 @@ setup: field: number - match: { aggregations.n_terms.buckets.0.key: 1 } - match: { aggregations.n_terms.buckets.1.key: 3 } - - match: { profile.shards.0.aggregations.0.type: LongTermsAggregator } + - match: { profile.shards.0.aggregations.0.type: NumericTermsAggregator } - match: { profile.shards.0.aggregations.0.description: n_terms } - match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 } + - match: { profile.shards.0.aggregations.0.debug.result_strategy: long_terms } - match: { profile.shards.0.aggregations.0.debug.total_buckets: 2 } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java index b8c98b477e9..f1eba9623f0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java @@ -196,7 +196,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase { } - public void testDeletesIssue7951() throws Exception { + public void testPopularTermManyDeletedDocs() throws Exception { String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}"; assertAcked(prepareCreate(INDEX_NAME).setSettings(settings, XContentType.JSON) .addMapping("_doc", "text", "type=keyword", CLASS_FIELD, "type=keyword")); @@ -238,7 +238,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase { significantTerms("sig_terms") .field(TEXT_FIELD) .minDocCount(1))); - }else + } else { request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE) .addAggregation( @@ -478,8 +478,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase { .subAggregation(significantText("mySignificantTerms", TEXT_FIELD) .significanceHeuristic(scriptHeuristic) .minDocCount(1).shardSize(2).size(2))); - }else - { + } else { request = client().prepareSearch(INDEX_NAME) .addAggregation(terms("class").field(CLASS_FIELD) .subAggregation(significantTerms("mySignificantTerms") diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index b2362360466..cf0a0642363 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -39,7 +39,7 @@ public class DoubleTerms extends InternalMappedTerms { - private final double term; + double term; Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError, DocValueFormat format) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java deleted file mode 100644 index 6f42cbcf6b7..00000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.util.NumericUtils; -import org.elasticsearch.index.fielddata.FieldData; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.bucket.terms.LongTerms.Bucket; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class DoubleTermsAggregator extends LongTermsAggregator { - - DoubleTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format, - BucketOrder order, BucketCountThresholds bucketCountThresholds, SearchContext aggregationContext, Aggregator parent, - SubAggCollectionMode collectionMode, boolean showTermDocCountError, IncludeExclude.LongFilter longFilter, - boolean collectsFromSingleBucket, Map metadata) throws IOException { - super(name, factories, valuesSource, format, order, bucketCountThresholds, aggregationContext, parent, collectionMode, - showTermDocCountError, longFilter, collectsFromSingleBucket, metadata); - } - - @Override - protected SortedNumericDocValues getValues(Numeric valuesSource, LeafReaderContext ctx) throws IOException { - return FieldData.toSortableLongBits(valuesSource.doubleValues(ctx)); - } - - @Override - protected InternalAggregation buildResult(long otherDocCount, List buckets) { - return convertToDouble((LongTerms) super.buildResult(otherDocCount, buckets)); - } - - @Override - public DoubleTerms buildEmptyAggregation() { - final LongTerms terms = (LongTerms) super.buildEmptyAggregation(); - return convertToDouble(terms); - } - - private static DoubleTerms convertToDouble(LongTerms terms) { - List buckets = terms.buckets.stream().map(DoubleTermsAggregator::convertToDouble).collect(Collectors.toList()); - return new DoubleTerms(terms.getName(), terms.order, terms.requiredSize, terms.minDocCount, - terms.getMetadata(), terms.format, terms.shardSize, terms.showTermDocCountError, terms.otherDocCount, buckets, - terms.docCountError); - } - - private static DoubleTerms.Bucket convertToDouble(LongTerms.Bucket bucket) { - double value = NumericUtils.sortableLongToDouble(bucket.term); - return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError, - bucket.format); - } -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java deleted file mode 100644 index 109a318f67f..00000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.search.ScoreMode; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalOrder; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter; -import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; -import org.elasticsearch.search.aggregations.bucket.terms.LongTerms.Bucket; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; - -import static java.util.Collections.emptyList; - -public class LongTermsAggregator extends TermsAggregator { - - protected final ValuesSource.Numeric valuesSource; - protected final LongKeyedBucketOrds bucketOrds; - private boolean showTermDocCountError; - private LongFilter longFilter; - - public LongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format, - BucketOrder order, BucketCountThresholds bucketCountThresholds, SearchContext aggregationContext, Aggregator parent, - SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, IncludeExclude.LongFilter longFilter, - boolean collectsFromSingleBucket, Map metadata) throws IOException { - super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata); - this.valuesSource = valuesSource; - this.showTermDocCountError = showTermDocCountError; - this.longFilter = longFilter; - bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); - } - - @Override - public ScoreMode scoreMode() { - if (valuesSource != null && valuesSource.needsScores()) { - return ScoreMode.COMPLETE; - } - return super.scoreMode(); - } - - protected SortedNumericDocValues getValues(ValuesSource.Numeric valuesSource, LeafReaderContext ctx) throws IOException { - return valuesSource.longValues(ctx); - } - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - final SortedNumericDocValues values = getValues(valuesSource, ctx); - return new LeafBucketCollectorBase(sub, values) { - @Override - public void collect(int doc, long owningBucketOrd) throws IOException { - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - - long previous = Long.MAX_VALUE; - for (int i = 0; i < valuesCount; ++i) { - final long val = values.nextValue(); - if (previous != val || i == 0) { - if ((longFilter == null) || (longFilter.accept(val))) { - long bucketOrdinal = bucketOrds.add(owningBucketOrd, val); - if (bucketOrdinal < 0) { // already seen - bucketOrdinal = -1 - bucketOrdinal; - collectExistingBucket(sub, doc, bucketOrdinal); - } else { - collectBucket(sub, doc, bucketOrdinal); - } - } - - previous = val; - } - } - } - } - }; - } - - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - LongTerms.Bucket[][] topBucketsPerOrd = new LongTerms.Bucket[owningBucketOrds.length][]; - long[] otherDocCounts = new long[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - if (bucketCountThresholds.getMinDocCount() == 0 && (InternalOrder.isCountDesc(order) == false || - bucketsInOrd < bucketCountThresholds.getRequiredSize())) { - // we need to fill-in the blanks - for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) { - final SortedNumericDocValues values = getValues(valuesSource, ctx); - for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { - if (values.advanceExact(docId)) { - final int valueCount = values.docValueCount(); - for (int v = 0; v < valueCount; ++v) { - long value = values.nextValue(); - if (longFilter == null || longFilter.accept(value)) { - bucketOrds.add(owningBucketOrds[ordIdx], value); - } - } - } - } - } - bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - } - - final int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); - LongTerms.Bucket spare = null; - BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - if (spare == null) { - spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format); - } - spare.term = ordsEnum.value(); - spare.docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += spare.docCount; - spare.bucketOrd = ordsEnum.ord(); - if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) { - spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } - } - } - - // Get the top buckets - LongTerms.Bucket[] list = topBucketsPerOrd[ordIdx] = new LongTerms.Bucket[ordered.size()]; - for (int b = ordered.size() - 1; b >= 0; --b) { - list[b] = ordered.pop(); - list[b].docCountError = 0; - otherDocCounts[ordIdx] -= list[b].docCount; - } - } - - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - result[ordIdx] = buildResult(otherDocCounts[ordIdx], Arrays.asList(topBucketsPerOrd[ordIdx])); - } - return result; - } - - protected InternalAggregation buildResult(long otherDocCount, List buckets) { - return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, - buckets, 0); - } - - @Override - public InternalAggregation buildEmptyAggregation() { - return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0); - } - - @Override - public void doClose() { - super.doClose(); - Releasables.close(bucketOrds); - } - - @Override - public void collectDebugInfo(BiConsumer add) { - super.collectDebugInfo(add); - add.accept("total_buckets", bucketOrds.size()); - } -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java new file mode 100644 index 00000000000..ec7d7727901 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -0,0 +1,655 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.terms; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.NumericUtils; +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.collect.List; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.index.fielddata.FieldData; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.InternalOrder; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter; +import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; +import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import static java.util.Collections.emptyList; + +public class NumericTermsAggregator extends TermsAggregator { + private final ResultStrategy resultStrategy; + private final ValuesSource.Numeric valuesSource; + private final LongKeyedBucketOrds bucketOrds; + private final LongFilter longFilter; + + public NumericTermsAggregator( + String name, + AggregatorFactories factories, + Function> resultStrategy, + ValuesSource.Numeric valuesSource, + DocValueFormat format, + BucketOrder order, + BucketCountThresholds bucketCountThresholds, + SearchContext aggregationContext, + Aggregator parent, + SubAggCollectionMode subAggCollectMode, + IncludeExclude.LongFilter longFilter, + boolean collectsFromSingleBucket, + Map metadata + ) + throws IOException { + super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata); + this.resultStrategy = resultStrategy.apply(this); + this.valuesSource = valuesSource; + this.longFilter = longFilter; + bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); + } + + @Override + public ScoreMode scoreMode() { + if (valuesSource != null && valuesSource.needsScores()) { + return ScoreMode.COMPLETE; + } + return super.scoreMode(); + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + SortedNumericDocValues values = resultStrategy.getValues(ctx); + return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + if (values.advanceExact(doc)) { + int valuesCount = values.docValueCount(); + + long previous = Long.MAX_VALUE; + for (int i = 0; i < valuesCount; ++i) { + long val = values.nextValue(); + if (previous != val || i == 0) { + if ((longFilter == null) || (longFilter.accept(val))) { + long bucketOrdinal = bucketOrds.add(owningBucketOrd, val); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = -1 - bucketOrdinal; + collectExistingBucket(sub, doc, bucketOrdinal); + } else { + collectBucket(sub, doc, bucketOrdinal); + } + } + + previous = val; + } + } + } + } + }); + } + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + return resultStrategy.buildAggregations(owningBucketOrds); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return resultStrategy.buildEmptyResult(); + } + + @Override + public void doClose() { + Releasables.close(super::doClose, bucketOrds, resultStrategy); + } + + @Override + public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); + add.accept("result_strategy", resultStrategy.describe()); + add.accept("total_buckets", bucketOrds.size()); + } + + /** + * Strategy for building results. + */ + abstract class ResultStrategy + implements + Releasable { + private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length); + long[] otherDocCounts = new long[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); + long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); + + int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + PriorityQueue ordered = buildPriorityQueue(size); + B spare = null; + BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); + Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); + while (ordsEnum.next()) { + if (spare == null) { + spare = emptyBucketBuilder.get(); + } + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts[ordIdx] += docCount; + if (bucketCountThresholds.getShardMinDocCount() <= docCount) { + updateBucket(spare, ordsEnum, docCount); + spare = ordered.insertWithOverflow(spare); + if (spare == null) { + consumeBucketsAndMaybeBreak(1); + } + } + } + + // Get the top buckets + B[] bucketsForOrd = buildBuckets(ordered.size()); + topBucketsPerOrd[ordIdx] = bucketsForOrd; + for (int b = ordered.size() - 1; b >= 0; --b) { + topBucketsPerOrd[ordIdx][b] = ordered.pop(); + otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount(); + } + } + + buildSubAggs(topBucketsPerOrd); + + InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]); + } + return result; + } + + /** + * Short description of the collection mechanism added to the profile + * output to help with debugging. + */ + abstract String describe(); + + /** + * Resolve the doc values to collect results of this type. + */ + abstract SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException; + + /** + * Wrap the "standard" numeric terms collector to collect any more + * information that this result type may need. + */ + abstract LeafBucketCollector wrapCollector(LeafBucketCollector primary); + + /** + * Build an array to hold the "top" buckets for each ordinal. + */ + abstract B[][] buildTopBucketsPerOrd(int size); + + /** + * Build an array of buckets for a particular ordinal. These arrays + * are asigned to the value returned by {@link #buildTopBucketsPerOrd}. + */ + abstract B[] buildBuckets(int size); + + /** + * Build a {@linkplain Supplier} that can be used to build "empty" + * buckets. Those buckets will then be {@link #updateBucket updated} + * for each collected bucket. + */ + abstract Supplier emptyBucketBuilder(long owningBucketOrd); + + /** + * Update fields in {@code spare} to reflect information collected for + * this bucket ordinal. + */ + abstract void updateBucket(B spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException; + + /** + * Build a {@link PriorityQueue} to sort the buckets. After we've + * collected all of the buckets we'll collect all entries in the queue. + */ + abstract PriorityQueue buildPriorityQueue(int size); + + /** + * Build the sub-aggregations into the buckets. This will usually + * delegate to {@link #buildSubAggsForAllBuckets}. + */ + abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException; + + /** + * Collect extra entries for "zero" hit documents if they were requested + * and required. + */ + abstract void collectZeroDocEntriesIfNeeded(long ord) throws IOException; + + /** + * Turn the buckets into an aggregation result. + */ + abstract R buildResult(long owningBucketOrd, long otherDocCounts, B[] topBuckets); + + /** + * Build an "empty" result. Only called if there isn't any data on this + * shard. + */ + abstract R buildEmptyResult(); + } + + abstract class StandardTermsResultStrategy, B extends InternalTerms.Bucket> extends + ResultStrategy { + protected final boolean showTermDocCountError; + + StandardTermsResultStrategy(boolean showTermDocCountError) { + this.showTermDocCountError = showTermDocCountError; + } + + @Override + final LeafBucketCollector wrapCollector(LeafBucketCollector primary) { + return primary; + } + + @Override + final PriorityQueue buildPriorityQueue(int size) { + return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); + } + + @Override + final void buildSubAggs(B[][] topBucketsPerOrd) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + } + + @Override + Supplier emptyBucketBuilder(long owningBucketOrd) { + return this::buildEmptyBucket; + } + + abstract B buildEmptyBucket(); + + @Override + final void collectZeroDocEntriesIfNeeded(long ord) throws IOException { + if (bucketCountThresholds.getMinDocCount() != 0) { + return; + } + if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(ord) >= bucketCountThresholds.getRequiredSize()) { + return; + } + // we need to fill-in the blanks + for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) { + SortedNumericDocValues values = getValues(ctx); + for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { + if (values.advanceExact(docId)) { + int valueCount = values.docValueCount(); + for (int v = 0; v < valueCount; ++v) { + long value = values.nextValue(); + if (longFilter == null || longFilter.accept(value)) { + bucketOrds.add(ord, value); + } + } + } + } + } + } + + @Override + public final void close() {} + } + + class LongTermsResults extends StandardTermsResultStrategy { + LongTermsResults(boolean showTermDocCountError) { + super(showTermDocCountError); + } + + @Override + String describe() { + return "long_terms"; + } + + @Override + SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException { + return valuesSource.longValues(ctx); + } + + @Override + LongTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new LongTerms.Bucket[size][]; + } + + @Override + LongTerms.Bucket[] buildBuckets(int size) { + return new LongTerms.Bucket[size]; + } + + @Override + LongTerms.Bucket buildEmptyBucket() { + return new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format); + } + + @Override + void updateBucket(LongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) { + spare.term = ordsEnum.value(); + spare.docCount = docCount; + spare.bucketOrd = ordsEnum.ord(); + } + + @Override + LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket[] topBuckets) { + return new LongTerms( + name, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + otherDocCount, + List.of(topBuckets), + 0 + ); + } + + @Override + LongTerms buildEmptyResult() { + return new LongTerms( + name, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + 0, + emptyList(), + 0 + ); + } + } + + class DoubleTermsResults extends StandardTermsResultStrategy { + DoubleTermsResults(boolean showTermDocCountError) { + super(showTermDocCountError); + } + + @Override + String describe() { + return "double_terms"; + } + + @Override + SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException { + return FieldData.toSortableLongBits(valuesSource.doubleValues(ctx)); + } + + @Override + DoubleTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new DoubleTerms.Bucket[size][]; + } + + @Override + DoubleTerms.Bucket[] buildBuckets(int size) { + return new DoubleTerms.Bucket[size]; + } + + @Override + DoubleTerms.Bucket buildEmptyBucket() { + return new DoubleTerms.Bucket(0, 0, null, showTermDocCountError, 0, format); + } + + @Override + void updateBucket(DoubleTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) { + spare.term = NumericUtils.sortableLongToDouble(ordsEnum.value()); + spare.docCount = docCount; + spare.bucketOrd = ordsEnum.ord(); + } + + @Override + DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bucket[] topBuckets) { + return new DoubleTerms( + name, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + otherDocCount, + List.of(topBuckets), + 0 + ); + } + + @Override + DoubleTerms buildEmptyResult() { + return new DoubleTerms( + name, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + 0, + emptyList(), + 0 + ); + } + } + + class SignificantLongTermsResults extends ResultStrategy { + private final BackgroundFrequencies backgroundFrequencies; + private final long supersetSize; + private final SignificanceHeuristic significanceHeuristic; + private LongArray subsetSizes; + + SignificantLongTermsResults( + SignificantTermsAggregatorFactory termsAggFactory, + SignificanceHeuristic significanceHeuristic, + boolean collectsFromSingleBucket + ) { + LookupBackgroundFrequencies lookup = new LookupBackgroundFrequencies(termsAggFactory); + backgroundFrequencies = collectsFromSingleBucket ? lookup : new CacheBackgroundFrequencies(lookup, context.bigArrays()); + supersetSize = termsAggFactory.getSupersetNumDocs(); + this.significanceHeuristic = significanceHeuristic; + subsetSizes = context.bigArrays().newLongArray(1, true); + } + + @Override + SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException { + return valuesSource.longValues(ctx); + } + + @Override + String describe() { + return "significant_terms"; + } + + @Override + LeafBucketCollector wrapCollector(LeafBucketCollector primary) { + return new LeafBucketCollectorBase(primary, null) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + super.collect(doc, owningBucketOrd); + subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1); + subsetSizes.increment(owningBucketOrd, 1); + } + }; + } + + @Override + SignificantLongTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new SignificantLongTerms.Bucket[size][]; + } + + @Override + SignificantLongTerms.Bucket[] buildBuckets(int size) { + return new SignificantLongTerms.Bucket[size]; + } + + @Override + Supplier emptyBucketBuilder(long owningBucketOrd) { + long subsetSize = subsetSizes.get(owningBucketOrd); + return () -> new SignificantLongTerms.Bucket(0, subsetSize, 0, supersetSize, 0, null, format, 0); + } + + @Override + void updateBucket(SignificantLongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException { + spare.term = ordsEnum.value(); + spare.subsetDf = docCount; + spare.supersetDf = backgroundFrequencies.freq(spare.term); + spare.bucketOrd = ordsEnum.ord(); + // During shard-local down-selection we use subset/superset stats that are for this shard only + // Back at the central reducer these properties will be updated with global stats + spare.updateScore(significanceHeuristic); + } + + @Override + PriorityQueue buildPriorityQueue(int size) { + return new BucketSignificancePriorityQueue<>(size); + } + + @Override + void buildSubAggs(SignificantLongTerms.Bucket[][] topBucketsPerOrd) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + } + + @Override + void collectZeroDocEntriesIfNeeded(long ord) throws IOException {} + + @Override + SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCounts, SignificantLongTerms.Bucket[] topBuckets) { + return new SignificantLongTerms( + name, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + subsetSizes.get(owningBucketOrd), + supersetSize, + significanceHeuristic, + List.of(topBuckets) + ); + } + + @Override + SignificantLongTerms buildEmptyResult() { + // We need to account for the significance of a miss in our global stats - provide corpus size as context + ContextIndexSearcher searcher = context.searcher(); + IndexReader topReader = searcher.getIndexReader(); + int supersetSize = topReader.numDocs(); + return new SignificantLongTerms( + name, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + 0, + supersetSize, + significanceHeuristic, + emptyList() + ); + } + + @Override + public void close() { + Releasables.close(backgroundFrequencies, subsetSizes); + } + } + + /** + * Lookup frequencies for terms. + */ + private interface BackgroundFrequencies extends Releasable { + long freq(long term) throws IOException; + } + + /** + * Lookup frequencies for terms. + */ + private static class LookupBackgroundFrequencies implements BackgroundFrequencies { + // TODO a reference to the factory is weird - probably should be reference to what we need from it. + private final SignificantTermsAggregatorFactory termsAggFactory; + + LookupBackgroundFrequencies(SignificantTermsAggregatorFactory termsAggFactory) { + this.termsAggFactory = termsAggFactory; + } + + @Override + public long freq(long term) throws IOException { + return termsAggFactory.getBackgroundFrequency(term); + } + + @Override + public void close() { + termsAggFactory.close(); + } + } + + /** + * Lookup and cache background frequencies for terms. + */ + private static class CacheBackgroundFrequencies implements BackgroundFrequencies { + private final LookupBackgroundFrequencies lookup; + private final BigArrays bigArrays; + private final LongHash termToPosition; + private LongArray positionToFreq; + + CacheBackgroundFrequencies(LookupBackgroundFrequencies lookup, BigArrays bigArrays) { + this.lookup = lookup; + this.bigArrays = bigArrays; + termToPosition = new LongHash(1, bigArrays); + positionToFreq = bigArrays.newLongArray(1, false); + } + + @Override + public long freq(long term) throws IOException { + long position = termToPosition.add(term); + if (position < 0) { + return positionToFreq.get(-1 - position); + } + long freq = lookup.freq(term); + positionToFreq = bigArrays.grow(positionToFreq, position + 1); + positionToFreq.set(position, freq); + return freq; + } + + @Override + public void close() { + Releasables.close(lookup, termToPosition, positionToFreq); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java deleted file mode 100644 index 0d21206515b..00000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; -import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; -import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.internal.ContextIndexSearcher; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; - -import static java.util.Collections.emptyList; - -public class SignificantLongTermsAggregator extends LongTermsAggregator { - - public SignificantLongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, - DocValueFormat format, BucketCountThresholds bucketCountThresholds, SearchContext context, Aggregator parent, - SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggFactory, - IncludeExclude.LongFilter includeExclude, Map metadata) throws IOException { - - super(name, factories, valuesSource, format, null, bucketCountThresholds, context, parent, - SubAggCollectionMode.BREADTH_FIRST, false, includeExclude, false, metadata); - this.significanceHeuristic = significanceHeuristic; - this.termsAggFactory = termsAggFactory; - } - - protected long numCollectedDocs; - private final SignificantTermsAggregatorFactory termsAggFactory; - private final SignificanceHeuristic significanceHeuristic; - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) { - @Override - public void collect(int doc, long bucket) throws IOException { - super.collect(doc, bucket); - numCollectedDocs++; - } - }; - } - - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - - long bucketsInOrd = bucketOrds.bucketsInOrd(0); - final int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - - long supersetSize = termsAggFactory.getSupersetNumDocs(); - long subsetSize = numCollectedDocs; - - BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); - SignificantLongTerms.Bucket spare = null; - BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0); - while (ordsEnum.next()) { - final int docCount = bucketDocCount(ordsEnum.ord()); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0); - } - spare.term = ordsEnum.value(); - spare.subsetDf = docCount; - spare.subsetSize = subsetSize; - spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.term); - spare.supersetSize = supersetSize; - // During shard-local down-selection we use subset/superset stats that are for this shard only - // Back at the central reducer these properties will be updated with global stats - spare.updateScore(significanceHeuristic); - - spare.bucketOrd = ordsEnum.ord(); - spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } - } - - SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = ordered.pop(); - } - - buildSubAggsForBuckets(list, bucket -> bucket.bucketOrd, (bucket, aggs) -> bucket.aggregations = aggs); - - return new InternalAggregation[] { - new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list)) - }; - } - - @Override - public SignificantLongTerms buildEmptyAggregation() { - // We need to account for the significance of a miss in our global stats - provide corpus size as context - ContextIndexSearcher searcher = context.searcher(); - IndexReader topReader = searcher.getIndexReader(); - int supersetSize = topReader.numDocs(); - return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, 0, supersetSize, significanceHeuristic, emptyList()); - } - - @Override - public void doClose() { - Releasables.close(bucketOrds, termsAggFactory); - } - -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index 235d4f35001..e826929e507 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -59,8 +60,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; -public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory - implements Releasable { +public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory implements Releasable { private static final DeprecationLogger deprecationLogger = new DeprecationLogger( LogManager.getLogger(SignificantTermsAggregatorFactory.class)); @@ -103,8 +103,10 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory sigTermsFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException { + assert collectsFromSingleBucket; ExecutionMode execution = null; if (executionHint != null) { execution = ExecutionMode.fromString(executionHint, deprecationLogger); @@ -124,7 +126,11 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac return execution.create(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent, significanceHeuristic, sigTermsFactory, metadata); + } + @Override + public boolean needsToCollectFromSingleBucket() { + return true; } }; } @@ -147,6 +153,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory sigTermsFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException { if ((includeExclude != null) && (includeExclude.isRegexBased())) { @@ -155,7 +162,8 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac "values for include/exclude clauses used to filter numeric fields"); } - if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) { + ValuesSource.Numeric numericValuesSource = (ValuesSource.Numeric) valuesSource; + if (numericValuesSource.isFloatingPoint()) { throw new UnsupportedOperationException("No support for examining floating point numerics"); } @@ -164,10 +172,15 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac longFilter = includeExclude.convertToLongFilter(format); } - return new SignificantLongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, - bucketCountThresholds, context, parent, significanceHeuristic, sigTermsFactory, longFilter, - metadata); + return new NumericTermsAggregator(name, factories, + agg -> agg.new SignificantLongTermsResults(sigTermsFactory, significanceHeuristic, collectsFromSingleBucket), + numericValuesSource, format, null, bucketCountThresholds, context, parent, SubAggCollectionMode.BREADTH_FIRST, + longFilter, collectsFromSingleBucket, metadata); + } + @Override + public boolean needsToCollectFromSingleBucket() { + return false; } }; } @@ -203,11 +216,12 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac ? null : filterBuilder.toQuery(queryShardContext); IndexSearcher searcher = queryShardContext.searcher(); - this.supersetNumDocs = filter == null - // Important - need to use the doc count that includes deleted docs - // or we have this issue: https://github.com/elastic/elasticsearch/issues/7951 - ? searcher.getIndexReader().maxDoc() - : searcher.count(filter); + /* + * We need to use a superset size that includes deleted docs or we + * could end up blowing up with bad statistics that cause us to blow + * up later on. + */ + this.supersetNumDocs = filter == null ? searcher.getIndexReader().maxDoc() : searcher.count(filter); this.bucketCountThresholds = bucketCountThresholds; this.significanceHeuristic = significanceHeuristic; } @@ -220,6 +234,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac } private FilterableTermsEnum getTermsEnum(String field) throws IOException { + // TODO this method helps because of asMultiBucketAggregator. Once we remove it we can move this logic into the aggregators. if (termsEnum != null) { return termsEnum; } @@ -285,16 +300,16 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, boolean collectsFromSingleBucket, Map metadata) throws IOException { - if (collectsFromSingleBucket == false) { - return asMultiBucketAggregator(this, searchContext, parent); - } - AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(), SignificantTermsAggregationBuilder.NAME); if (aggregatorSupplier instanceof SignificantTermsAggregatorSupplier == false) { throw new AggregationExecutionException("Registry miss-match - expected SignificantTermsAggregatorSupplier, found [" + aggregatorSupplier.getClass().toString() + "]"); } + SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier; + if (collectsFromSingleBucket == false && sigTermsAggregatorSupplier.needsToCollectFromSingleBucket()) { + return asMultiBucketAggregator(this, searchContext, parent); + } numberOfAggregatorsCreated++; BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds); @@ -314,12 +329,10 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize())); } - SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier; - // TODO we should refactor so that we don't need to use this Factory as a singleton (e.g. stop passing `this` to the aggregators) return sigTermsAggregatorSupplier.build(name, factories, valuesSource, config.format(), bucketCountThresholds, includeExclude, executionHint, searchContext, parent, - significanceHeuristic, this, metadata); + significanceHeuristic, this, collectsFromSingleBucket, metadata); } public enum ExecutionMode { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java index adb94ebd553..24f630ca4b6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java @@ -41,5 +41,8 @@ interface SignificantTermsAggregatorSupplier extends AggregatorSupplier { Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory sigTermsFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException; + + boolean needsToCollectFromSingleBucket(); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 30235ce0ef1..e56a0393856 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder; import org.elasticsearch.search.aggregations.NonCollectingAggregator; import org.elasticsearch.search.aggregations.bucket.BucketUtils; +import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator.ResultStrategy; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds; import org.elasticsearch.search.aggregations.support.AggregatorSupplier; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; @@ -48,6 +49,7 @@ import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.function.Function; public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(TermsAggregatorFactory.class)); @@ -162,7 +164,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { + "include/exclude clauses used to filter numeric fields"); } - IncludeExclude.LongFilter longFilter = null; if (subAggCollectMode == null) { // TODO can we remove concept of AggregatorFactories.EMPTY? if (factories != AggregatorFactories.EMPTY) { @@ -171,20 +172,23 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { subAggCollectMode = SubAggCollectionMode.DEPTH_FIRST; } } - if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) { + + ValuesSource.Numeric numericValuesSource = (ValuesSource.Numeric) valuesSource; + IncludeExclude.LongFilter longFilter = null; + Function> resultStrategy; + if (numericValuesSource.isFloatingPoint()) { if (includeExclude != null) { longFilter = includeExclude.convertToDoubleFilter(); } - return new DoubleTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, order, - bucketCountThresholds, context, parent, subAggCollectMode, showTermDocCountError, longFilter, - collectsFromSingleBucket, metadata); + resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError); + } else { + if (includeExclude != null) { + longFilter = includeExclude.convertToLongFilter(format); + } + resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError); } - if (includeExclude != null) { - longFilter = includeExclude.convertToLongFilter(format); - } - return new LongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, order, - bucketCountThresholds, context, parent, subAggCollectMode, showTermDocCountError, longFilter, - collectsFromSingleBucket, metadata); + return new NumericTermsAggregator(name, factories, resultStrategy, numericValuesSource, format, order, + bucketCountThresholds, context, parent, subAggCollectMode, longFilter, collectsFromSingleBucket, metadata); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java index 569744b182d..6d3ba2e1087 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java @@ -65,7 +65,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -815,7 +815,7 @@ public class NestedAggregatorTests extends AggregatorTestCase { } /** - * {@link LongTermsAggregator} is the first complex bucking aggregation + * {@link NumericTermsAggregator} is the first complex bucking aggregation * that stopped wrapping itself in {@link AggregatorFactory#asMultiBucketAggregator} * so this tests that nested works properly inside of it. */ diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregatorTests.java index 7d7c5b48fb2..410c3fd081f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregatorTests.java @@ -38,7 +38,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; @@ -247,7 +247,7 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase { } /** - * {@link LongTermsAggregator} is the first complex bucking aggregation + * {@link NumericTermsAggregator} is the first complex bucking aggregation * that stopped wrapping itself in {@link AggregatorFactory#asMultiBucketAggregator} * so this tests that nested works properly inside of it. */ diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java index 3300bbc8ed7..1ea1a840030 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java @@ -23,14 +23,17 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; @@ -60,6 +63,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms; +import static org.hamcrest.Matchers.equalTo; public class SignificantTermsAggregatorTests extends AggregatorTestCase { @@ -372,6 +376,52 @@ public class SignificantTermsAggregatorTests extends AggregatorTestCase { } } + public void testThreeLayerLong() throws IOException { + try (Directory dir = newDirectory()) { + try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) { + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + for (int k = 0; k < 10; k++) { + Document d = new Document(); + d.add(new SortedNumericDocValuesField("i", i)); + d.add(new SortedNumericDocValuesField("j", j)); + d.add(new SortedNumericDocValuesField("k", k)); + writer.addDocument(d); + } + } + } + try (IndexReader reader = maybeWrapReaderEs(writer.getReader())) { + IndexSearcher searcher = newIndexSearcher(reader); + SignificantTermsAggregationBuilder request = new SignificantTermsAggregationBuilder("i").field("i") + .subAggregation(new SignificantTermsAggregationBuilder("j").field("j") + .subAggregation(new SignificantTermsAggregationBuilder("k").field("k"))); + SignificantLongTerms result = search(searcher, new MatchAllDocsQuery(), request, + longField("i"), longField("j"), longField("k")); + for (int i = 0; i < 10; i++) { + SignificantLongTerms.Bucket iBucket = result.getBucketByKey(Integer.toString(i)); + assertThat(iBucket.getDocCount(), equalTo(100L)); + SignificantLongTerms jAgg = iBucket.getAggregations().get("j"); + for (int j = 0; j < 10; j++) { + SignificantLongTerms.Bucket jBucket = jAgg.getBucketByKey(Integer.toString(j)); + assertThat(jBucket.getDocCount(), equalTo(10L)); + SignificantLongTerms kAgg = jBucket.getAggregations().get("k"); + for (int k = 0; k < 10; k++) { + SignificantLongTerms.Bucket kBucket = kAgg.getBucketByKey(Integer.toString(k)); + assertThat(kBucket.getDocCount(), equalTo(1L)); + } + } + } + } + } + } + } + + private NumberFieldMapper.NumberFieldType longField(String name) { + NumberFieldMapper.NumberFieldType type = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + type.setName(name); + return type; + } + private void addMixedTextDocs(TextFieldType textFieldType, IndexWriter w) throws IOException { for (int i = 0; i < 10; i++) { Document doc = new Document(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java index ec79caeb59b..a1a135ea58f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java @@ -39,6 +39,8 @@ import org.elasticsearch.index.mapper.TextFieldMapper.TextFieldType; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler; import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantTerms; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantTextAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import java.io.IOException;