diff --git a/docs/reference/search/profile.asciidoc b/docs/reference/search/profile.asciidoc index 342ca2c6a7b..63486c87c56 100644 --- a/docs/reference/search/profile.asciidoc +++ b/docs/reference/search/profile.asciidoc @@ -782,7 +782,7 @@ This yields the following aggregation profile output: { "aggregations" : [ { - "type" : "LongTermsAggregator", + "type" : "NumericTermsAggregator", "description" : "my_scoped_agg", "time_in_nanos" : 195386, "breakdown" : { @@ -796,6 +796,7 @@ This yields the following aggregation profile output: "collect_count" : 4 }, "debug": { + "result_strategy": "long_terms", "total_buckets": 4 } }, @@ -815,7 +816,7 @@ This yields the following aggregation profile output: }, "children" : [ { - "type" : "LongTermsAggregator", + "type" : "NumericTermsAggregator", "description" : "my_level_agg", "time_in_nanos" : 160329, "breakdown" : { @@ -829,6 +830,7 @@ This yields the following aggregation profile output: "collect_count" : 4, }, "debug": { + "result_strategy": "long_terms", "total_buckets": 4 } } @@ -845,10 +847,10 @@ This yields the following aggregation profile output: // TESTRESPONSE[s/"id": "\[P6-vulHtQRWuD4YnubWb7A\]\[test\]\[0\]"/"id": $body.profile.shards.0.id/] From the profile structure we can see that the `my_scoped_agg` is internally -being run as a `LongTermsAggregator` (because the field it is aggregating, +being run as a `NumericTermsAggregator` (because the field it is aggregating, `likes`, is a numeric field). At the same level, we see a `GlobalAggregator` which comes from `my_global_agg`. That aggregation then has a child -`LongTermsAggregator` which comes from the second term's aggregation on `likes`. +`NumericTermsAggregator` which comes from the second term's aggregation on `likes`. The `time_in_nanos` field shows the time executed by each aggregation, and is inclusive of all children. While the overall time is useful, the `breakdown` diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml index 03e9f850775..8ba07f1a6bf 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml @@ -837,7 +837,8 @@ setup: field: number - match: { aggregations.n_terms.buckets.0.key: 1 } - match: { aggregations.n_terms.buckets.1.key: 3 } - - match: { profile.shards.0.aggregations.0.type: LongTermsAggregator } + - match: { profile.shards.0.aggregations.0.type: NumericTermsAggregator } - match: { profile.shards.0.aggregations.0.description: n_terms } - match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 } + - match: { profile.shards.0.aggregations.0.debug.result_strategy: long_terms } - match: { profile.shards.0.aggregations.0.debug.total_buckets: 2 } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java index b8c98b477e9..f1eba9623f0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java @@ -196,7 +196,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase { } - public void testDeletesIssue7951() throws Exception { + public void testPopularTermManyDeletedDocs() throws Exception { String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}"; assertAcked(prepareCreate(INDEX_NAME).setSettings(settings, XContentType.JSON) .addMapping("_doc", "text", "type=keyword", CLASS_FIELD, "type=keyword")); @@ -238,7 +238,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase { significantTerms("sig_terms") .field(TEXT_FIELD) .minDocCount(1))); - }else + } else { request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE) .addAggregation( @@ -478,8 +478,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase { .subAggregation(significantText("mySignificantTerms", TEXT_FIELD) .significanceHeuristic(scriptHeuristic) .minDocCount(1).shardSize(2).size(2))); - }else - { + } else { request = client().prepareSearch(INDEX_NAME) .addAggregation(terms("class").field(CLASS_FIELD) .subAggregation(significantTerms("mySignificantTerms") diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index b2362360466..cf0a0642363 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -39,7 +39,7 @@ public class DoubleTerms extends InternalMappedTerms { - private final double term; + double term; Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError, DocValueFormat format) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java deleted file mode 100644 index 6f42cbcf6b7..00000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.util.NumericUtils; -import org.elasticsearch.index.fielddata.FieldData; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.bucket.terms.LongTerms.Bucket; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class DoubleTermsAggregator extends LongTermsAggregator { - - DoubleTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format, - BucketOrder order, BucketCountThresholds bucketCountThresholds, SearchContext aggregationContext, Aggregator parent, - SubAggCollectionMode collectionMode, boolean showTermDocCountError, IncludeExclude.LongFilter longFilter, - boolean collectsFromSingleBucket, Map metadata) throws IOException { - super(name, factories, valuesSource, format, order, bucketCountThresholds, aggregationContext, parent, collectionMode, - showTermDocCountError, longFilter, collectsFromSingleBucket, metadata); - } - - @Override - protected SortedNumericDocValues getValues(Numeric valuesSource, LeafReaderContext ctx) throws IOException { - return FieldData.toSortableLongBits(valuesSource.doubleValues(ctx)); - } - - @Override - protected InternalAggregation buildResult(long otherDocCount, List buckets) { - return convertToDouble((LongTerms) super.buildResult(otherDocCount, buckets)); - } - - @Override - public DoubleTerms buildEmptyAggregation() { - final LongTerms terms = (LongTerms) super.buildEmptyAggregation(); - return convertToDouble(terms); - } - - private static DoubleTerms convertToDouble(LongTerms terms) { - List buckets = terms.buckets.stream().map(DoubleTermsAggregator::convertToDouble).collect(Collectors.toList()); - return new DoubleTerms(terms.getName(), terms.order, terms.requiredSize, terms.minDocCount, - terms.getMetadata(), terms.format, terms.shardSize, terms.showTermDocCountError, terms.otherDocCount, buckets, - terms.docCountError); - } - - private static DoubleTerms.Bucket convertToDouble(LongTerms.Bucket bucket) { - double value = NumericUtils.sortableLongToDouble(bucket.term); - return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError, - bucket.format); - } -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java deleted file mode 100644 index 109a318f67f..00000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.search.ScoreMode; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalOrder; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter; -import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; -import org.elasticsearch.search.aggregations.bucket.terms.LongTerms.Bucket; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; - -import static java.util.Collections.emptyList; - -public class LongTermsAggregator extends TermsAggregator { - - protected final ValuesSource.Numeric valuesSource; - protected final LongKeyedBucketOrds bucketOrds; - private boolean showTermDocCountError; - private LongFilter longFilter; - - public LongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format, - BucketOrder order, BucketCountThresholds bucketCountThresholds, SearchContext aggregationContext, Aggregator parent, - SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, IncludeExclude.LongFilter longFilter, - boolean collectsFromSingleBucket, Map metadata) throws IOException { - super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata); - this.valuesSource = valuesSource; - this.showTermDocCountError = showTermDocCountError; - this.longFilter = longFilter; - bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); - } - - @Override - public ScoreMode scoreMode() { - if (valuesSource != null && valuesSource.needsScores()) { - return ScoreMode.COMPLETE; - } - return super.scoreMode(); - } - - protected SortedNumericDocValues getValues(ValuesSource.Numeric valuesSource, LeafReaderContext ctx) throws IOException { - return valuesSource.longValues(ctx); - } - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - final SortedNumericDocValues values = getValues(valuesSource, ctx); - return new LeafBucketCollectorBase(sub, values) { - @Override - public void collect(int doc, long owningBucketOrd) throws IOException { - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - - long previous = Long.MAX_VALUE; - for (int i = 0; i < valuesCount; ++i) { - final long val = values.nextValue(); - if (previous != val || i == 0) { - if ((longFilter == null) || (longFilter.accept(val))) { - long bucketOrdinal = bucketOrds.add(owningBucketOrd, val); - if (bucketOrdinal < 0) { // already seen - bucketOrdinal = -1 - bucketOrdinal; - collectExistingBucket(sub, doc, bucketOrdinal); - } else { - collectBucket(sub, doc, bucketOrdinal); - } - } - - previous = val; - } - } - } - } - }; - } - - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - LongTerms.Bucket[][] topBucketsPerOrd = new LongTerms.Bucket[owningBucketOrds.length][]; - long[] otherDocCounts = new long[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - if (bucketCountThresholds.getMinDocCount() == 0 && (InternalOrder.isCountDesc(order) == false || - bucketsInOrd < bucketCountThresholds.getRequiredSize())) { - // we need to fill-in the blanks - for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) { - final SortedNumericDocValues values = getValues(valuesSource, ctx); - for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { - if (values.advanceExact(docId)) { - final int valueCount = values.docValueCount(); - for (int v = 0; v < valueCount; ++v) { - long value = values.nextValue(); - if (longFilter == null || longFilter.accept(value)) { - bucketOrds.add(owningBucketOrds[ordIdx], value); - } - } - } - } - } - bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - } - - final int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); - LongTerms.Bucket spare = null; - BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - if (spare == null) { - spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format); - } - spare.term = ordsEnum.value(); - spare.docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += spare.docCount; - spare.bucketOrd = ordsEnum.ord(); - if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) { - spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } - } - } - - // Get the top buckets - LongTerms.Bucket[] list = topBucketsPerOrd[ordIdx] = new LongTerms.Bucket[ordered.size()]; - for (int b = ordered.size() - 1; b >= 0; --b) { - list[b] = ordered.pop(); - list[b].docCountError = 0; - otherDocCounts[ordIdx] -= list[b].docCount; - } - } - - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - result[ordIdx] = buildResult(otherDocCounts[ordIdx], Arrays.asList(topBucketsPerOrd[ordIdx])); - } - return result; - } - - protected InternalAggregation buildResult(long otherDocCount, List buckets) { - return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, - buckets, 0); - } - - @Override - public InternalAggregation buildEmptyAggregation() { - return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0); - } - - @Override - public void doClose() { - super.doClose(); - Releasables.close(bucketOrds); - } - - @Override - public void collectDebugInfo(BiConsumer add) { - super.collectDebugInfo(add); - add.accept("total_buckets", bucketOrds.size()); - } -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java new file mode 100644 index 00000000000..ec7d7727901 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -0,0 +1,655 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.terms; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.NumericUtils; +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.collect.List; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.index.fielddata.FieldData; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.InternalOrder; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter; +import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; +import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import static java.util.Collections.emptyList; + +public class NumericTermsAggregator extends TermsAggregator { + private final ResultStrategy resultStrategy; + private final ValuesSource.Numeric valuesSource; + private final LongKeyedBucketOrds bucketOrds; + private final LongFilter longFilter; + + public NumericTermsAggregator( + String name, + AggregatorFactories factories, + Function> resultStrategy, + ValuesSource.Numeric valuesSource, + DocValueFormat format, + BucketOrder order, + BucketCountThresholds bucketCountThresholds, + SearchContext aggregationContext, + Aggregator parent, + SubAggCollectionMode subAggCollectMode, + IncludeExclude.LongFilter longFilter, + boolean collectsFromSingleBucket, + Map metadata + ) + throws IOException { + super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata); + this.resultStrategy = resultStrategy.apply(this); + this.valuesSource = valuesSource; + this.longFilter = longFilter; + bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); + } + + @Override + public ScoreMode scoreMode() { + if (valuesSource != null && valuesSource.needsScores()) { + return ScoreMode.COMPLETE; + } + return super.scoreMode(); + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + SortedNumericDocValues values = resultStrategy.getValues(ctx); + return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + if (values.advanceExact(doc)) { + int valuesCount = values.docValueCount(); + + long previous = Long.MAX_VALUE; + for (int i = 0; i < valuesCount; ++i) { + long val = values.nextValue(); + if (previous != val || i == 0) { + if ((longFilter == null) || (longFilter.accept(val))) { + long bucketOrdinal = bucketOrds.add(owningBucketOrd, val); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = -1 - bucketOrdinal; + collectExistingBucket(sub, doc, bucketOrdinal); + } else { + collectBucket(sub, doc, bucketOrdinal); + } + } + + previous = val; + } + } + } + } + }); + } + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + return resultStrategy.buildAggregations(owningBucketOrds); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return resultStrategy.buildEmptyResult(); + } + + @Override + public void doClose() { + Releasables.close(super::doClose, bucketOrds, resultStrategy); + } + + @Override + public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); + add.accept("result_strategy", resultStrategy.describe()); + add.accept("total_buckets", bucketOrds.size()); + } + + /** + * Strategy for building results. + */ + abstract class ResultStrategy + implements + Releasable { + private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length); + long[] otherDocCounts = new long[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); + long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); + + int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + PriorityQueue ordered = buildPriorityQueue(size); + B spare = null; + BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); + Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); + while (ordsEnum.next()) { + if (spare == null) { + spare = emptyBucketBuilder.get(); + } + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts[ordIdx] += docCount; + if (bucketCountThresholds.getShardMinDocCount() <= docCount) { + updateBucket(spare, ordsEnum, docCount); + spare = ordered.insertWithOverflow(spare); + if (spare == null) { + consumeBucketsAndMaybeBreak(1); + } + } + } + + // Get the top buckets + B[] bucketsForOrd = buildBuckets(ordered.size()); + topBucketsPerOrd[ordIdx] = bucketsForOrd; + for (int b = ordered.size() - 1; b >= 0; --b) { + topBucketsPerOrd[ordIdx][b] = ordered.pop(); + otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount(); + } + } + + buildSubAggs(topBucketsPerOrd); + + InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; + for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]); + } + return result; + } + + /** + * Short description of the collection mechanism added to the profile + * output to help with debugging. + */ + abstract String describe(); + + /** + * Resolve the doc values to collect results of this type. + */ + abstract SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException; + + /** + * Wrap the "standard" numeric terms collector to collect any more + * information that this result type may need. + */ + abstract LeafBucketCollector wrapCollector(LeafBucketCollector primary); + + /** + * Build an array to hold the "top" buckets for each ordinal. + */ + abstract B[][] buildTopBucketsPerOrd(int size); + + /** + * Build an array of buckets for a particular ordinal. These arrays + * are asigned to the value returned by {@link #buildTopBucketsPerOrd}. + */ + abstract B[] buildBuckets(int size); + + /** + * Build a {@linkplain Supplier} that can be used to build "empty" + * buckets. Those buckets will then be {@link #updateBucket updated} + * for each collected bucket. + */ + abstract Supplier emptyBucketBuilder(long owningBucketOrd); + + /** + * Update fields in {@code spare} to reflect information collected for + * this bucket ordinal. + */ + abstract void updateBucket(B spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException; + + /** + * Build a {@link PriorityQueue} to sort the buckets. After we've + * collected all of the buckets we'll collect all entries in the queue. + */ + abstract PriorityQueue buildPriorityQueue(int size); + + /** + * Build the sub-aggregations into the buckets. This will usually + * delegate to {@link #buildSubAggsForAllBuckets}. + */ + abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException; + + /** + * Collect extra entries for "zero" hit documents if they were requested + * and required. + */ + abstract void collectZeroDocEntriesIfNeeded(long ord) throws IOException; + + /** + * Turn the buckets into an aggregation result. + */ + abstract R buildResult(long owningBucketOrd, long otherDocCounts, B[] topBuckets); + + /** + * Build an "empty" result. Only called if there isn't any data on this + * shard. + */ + abstract R buildEmptyResult(); + } + + abstract class StandardTermsResultStrategy, B extends InternalTerms.Bucket> extends + ResultStrategy { + protected final boolean showTermDocCountError; + + StandardTermsResultStrategy(boolean showTermDocCountError) { + this.showTermDocCountError = showTermDocCountError; + } + + @Override + final LeafBucketCollector wrapCollector(LeafBucketCollector primary) { + return primary; + } + + @Override + final PriorityQueue buildPriorityQueue(int size) { + return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); + } + + @Override + final void buildSubAggs(B[][] topBucketsPerOrd) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + } + + @Override + Supplier emptyBucketBuilder(long owningBucketOrd) { + return this::buildEmptyBucket; + } + + abstract B buildEmptyBucket(); + + @Override + final void collectZeroDocEntriesIfNeeded(long ord) throws IOException { + if (bucketCountThresholds.getMinDocCount() != 0) { + return; + } + if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(ord) >= bucketCountThresholds.getRequiredSize()) { + return; + } + // we need to fill-in the blanks + for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) { + SortedNumericDocValues values = getValues(ctx); + for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { + if (values.advanceExact(docId)) { + int valueCount = values.docValueCount(); + for (int v = 0; v < valueCount; ++v) { + long value = values.nextValue(); + if (longFilter == null || longFilter.accept(value)) { + bucketOrds.add(ord, value); + } + } + } + } + } + } + + @Override + public final void close() {} + } + + class LongTermsResults extends StandardTermsResultStrategy { + LongTermsResults(boolean showTermDocCountError) { + super(showTermDocCountError); + } + + @Override + String describe() { + return "long_terms"; + } + + @Override + SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException { + return valuesSource.longValues(ctx); + } + + @Override + LongTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new LongTerms.Bucket[size][]; + } + + @Override + LongTerms.Bucket[] buildBuckets(int size) { + return new LongTerms.Bucket[size]; + } + + @Override + LongTerms.Bucket buildEmptyBucket() { + return new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format); + } + + @Override + void updateBucket(LongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) { + spare.term = ordsEnum.value(); + spare.docCount = docCount; + spare.bucketOrd = ordsEnum.ord(); + } + + @Override + LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket[] topBuckets) { + return new LongTerms( + name, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + otherDocCount, + List.of(topBuckets), + 0 + ); + } + + @Override + LongTerms buildEmptyResult() { + return new LongTerms( + name, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + 0, + emptyList(), + 0 + ); + } + } + + class DoubleTermsResults extends StandardTermsResultStrategy { + DoubleTermsResults(boolean showTermDocCountError) { + super(showTermDocCountError); + } + + @Override + String describe() { + return "double_terms"; + } + + @Override + SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException { + return FieldData.toSortableLongBits(valuesSource.doubleValues(ctx)); + } + + @Override + DoubleTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new DoubleTerms.Bucket[size][]; + } + + @Override + DoubleTerms.Bucket[] buildBuckets(int size) { + return new DoubleTerms.Bucket[size]; + } + + @Override + DoubleTerms.Bucket buildEmptyBucket() { + return new DoubleTerms.Bucket(0, 0, null, showTermDocCountError, 0, format); + } + + @Override + void updateBucket(DoubleTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) { + spare.term = NumericUtils.sortableLongToDouble(ordsEnum.value()); + spare.docCount = docCount; + spare.bucketOrd = ordsEnum.ord(); + } + + @Override + DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bucket[] topBuckets) { + return new DoubleTerms( + name, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + otherDocCount, + List.of(topBuckets), + 0 + ); + } + + @Override + DoubleTerms buildEmptyResult() { + return new DoubleTerms( + name, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + showTermDocCountError, + 0, + emptyList(), + 0 + ); + } + } + + class SignificantLongTermsResults extends ResultStrategy { + private final BackgroundFrequencies backgroundFrequencies; + private final long supersetSize; + private final SignificanceHeuristic significanceHeuristic; + private LongArray subsetSizes; + + SignificantLongTermsResults( + SignificantTermsAggregatorFactory termsAggFactory, + SignificanceHeuristic significanceHeuristic, + boolean collectsFromSingleBucket + ) { + LookupBackgroundFrequencies lookup = new LookupBackgroundFrequencies(termsAggFactory); + backgroundFrequencies = collectsFromSingleBucket ? lookup : new CacheBackgroundFrequencies(lookup, context.bigArrays()); + supersetSize = termsAggFactory.getSupersetNumDocs(); + this.significanceHeuristic = significanceHeuristic; + subsetSizes = context.bigArrays().newLongArray(1, true); + } + + @Override + SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException { + return valuesSource.longValues(ctx); + } + + @Override + String describe() { + return "significant_terms"; + } + + @Override + LeafBucketCollector wrapCollector(LeafBucketCollector primary) { + return new LeafBucketCollectorBase(primary, null) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + super.collect(doc, owningBucketOrd); + subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1); + subsetSizes.increment(owningBucketOrd, 1); + } + }; + } + + @Override + SignificantLongTerms.Bucket[][] buildTopBucketsPerOrd(int size) { + return new SignificantLongTerms.Bucket[size][]; + } + + @Override + SignificantLongTerms.Bucket[] buildBuckets(int size) { + return new SignificantLongTerms.Bucket[size]; + } + + @Override + Supplier emptyBucketBuilder(long owningBucketOrd) { + long subsetSize = subsetSizes.get(owningBucketOrd); + return () -> new SignificantLongTerms.Bucket(0, subsetSize, 0, supersetSize, 0, null, format, 0); + } + + @Override + void updateBucket(SignificantLongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException { + spare.term = ordsEnum.value(); + spare.subsetDf = docCount; + spare.supersetDf = backgroundFrequencies.freq(spare.term); + spare.bucketOrd = ordsEnum.ord(); + // During shard-local down-selection we use subset/superset stats that are for this shard only + // Back at the central reducer these properties will be updated with global stats + spare.updateScore(significanceHeuristic); + } + + @Override + PriorityQueue buildPriorityQueue(int size) { + return new BucketSignificancePriorityQueue<>(size); + } + + @Override + void buildSubAggs(SignificantLongTerms.Bucket[][] topBucketsPerOrd) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + } + + @Override + void collectZeroDocEntriesIfNeeded(long ord) throws IOException {} + + @Override + SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCounts, SignificantLongTerms.Bucket[] topBuckets) { + return new SignificantLongTerms( + name, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + subsetSizes.get(owningBucketOrd), + supersetSize, + significanceHeuristic, + List.of(topBuckets) + ); + } + + @Override + SignificantLongTerms buildEmptyResult() { + // We need to account for the significance of a miss in our global stats - provide corpus size as context + ContextIndexSearcher searcher = context.searcher(); + IndexReader topReader = searcher.getIndexReader(); + int supersetSize = topReader.numDocs(); + return new SignificantLongTerms( + name, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + 0, + supersetSize, + significanceHeuristic, + emptyList() + ); + } + + @Override + public void close() { + Releasables.close(backgroundFrequencies, subsetSizes); + } + } + + /** + * Lookup frequencies for terms. + */ + private interface BackgroundFrequencies extends Releasable { + long freq(long term) throws IOException; + } + + /** + * Lookup frequencies for terms. + */ + private static class LookupBackgroundFrequencies implements BackgroundFrequencies { + // TODO a reference to the factory is weird - probably should be reference to what we need from it. + private final SignificantTermsAggregatorFactory termsAggFactory; + + LookupBackgroundFrequencies(SignificantTermsAggregatorFactory termsAggFactory) { + this.termsAggFactory = termsAggFactory; + } + + @Override + public long freq(long term) throws IOException { + return termsAggFactory.getBackgroundFrequency(term); + } + + @Override + public void close() { + termsAggFactory.close(); + } + } + + /** + * Lookup and cache background frequencies for terms. + */ + private static class CacheBackgroundFrequencies implements BackgroundFrequencies { + private final LookupBackgroundFrequencies lookup; + private final BigArrays bigArrays; + private final LongHash termToPosition; + private LongArray positionToFreq; + + CacheBackgroundFrequencies(LookupBackgroundFrequencies lookup, BigArrays bigArrays) { + this.lookup = lookup; + this.bigArrays = bigArrays; + termToPosition = new LongHash(1, bigArrays); + positionToFreq = bigArrays.newLongArray(1, false); + } + + @Override + public long freq(long term) throws IOException { + long position = termToPosition.add(term); + if (position < 0) { + return positionToFreq.get(-1 - position); + } + long freq = lookup.freq(term); + positionToFreq = bigArrays.grow(positionToFreq, position + 1); + positionToFreq.set(position, freq); + return freq; + } + + @Override + public void close() { + Releasables.close(lookup, termToPosition, positionToFreq); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java deleted file mode 100644 index 0d21206515b..00000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; -import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; -import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.internal.ContextIndexSearcher; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; - -import static java.util.Collections.emptyList; - -public class SignificantLongTermsAggregator extends LongTermsAggregator { - - public SignificantLongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, - DocValueFormat format, BucketCountThresholds bucketCountThresholds, SearchContext context, Aggregator parent, - SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggFactory, - IncludeExclude.LongFilter includeExclude, Map metadata) throws IOException { - - super(name, factories, valuesSource, format, null, bucketCountThresholds, context, parent, - SubAggCollectionMode.BREADTH_FIRST, false, includeExclude, false, metadata); - this.significanceHeuristic = significanceHeuristic; - this.termsAggFactory = termsAggFactory; - } - - protected long numCollectedDocs; - private final SignificantTermsAggregatorFactory termsAggFactory; - private final SignificanceHeuristic significanceHeuristic; - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) { - @Override - public void collect(int doc, long bucket) throws IOException { - super.collect(doc, bucket); - numCollectedDocs++; - } - }; - } - - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - - long bucketsInOrd = bucketOrds.bucketsInOrd(0); - final int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - - long supersetSize = termsAggFactory.getSupersetNumDocs(); - long subsetSize = numCollectedDocs; - - BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); - SignificantLongTerms.Bucket spare = null; - BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0); - while (ordsEnum.next()) { - final int docCount = bucketDocCount(ordsEnum.ord()); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0); - } - spare.term = ordsEnum.value(); - spare.subsetDf = docCount; - spare.subsetSize = subsetSize; - spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.term); - spare.supersetSize = supersetSize; - // During shard-local down-selection we use subset/superset stats that are for this shard only - // Back at the central reducer these properties will be updated with global stats - spare.updateScore(significanceHeuristic); - - spare.bucketOrd = ordsEnum.ord(); - spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } - } - - SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = ordered.pop(); - } - - buildSubAggsForBuckets(list, bucket -> bucket.bucketOrd, (bucket, aggs) -> bucket.aggregations = aggs); - - return new InternalAggregation[] { - new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list)) - }; - } - - @Override - public SignificantLongTerms buildEmptyAggregation() { - // We need to account for the significance of a miss in our global stats - provide corpus size as context - ContextIndexSearcher searcher = context.searcher(); - IndexReader topReader = searcher.getIndexReader(); - int supersetSize = topReader.numDocs(); - return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, 0, supersetSize, significanceHeuristic, emptyList()); - } - - @Override - public void doClose() { - Releasables.close(bucketOrds, termsAggFactory); - } - -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index 235d4f35001..e826929e507 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -59,8 +60,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; -public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory - implements Releasable { +public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory implements Releasable { private static final DeprecationLogger deprecationLogger = new DeprecationLogger( LogManager.getLogger(SignificantTermsAggregatorFactory.class)); @@ -103,8 +103,10 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory sigTermsFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException { + assert collectsFromSingleBucket; ExecutionMode execution = null; if (executionHint != null) { execution = ExecutionMode.fromString(executionHint, deprecationLogger); @@ -124,7 +126,11 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac return execution.create(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent, significanceHeuristic, sigTermsFactory, metadata); + } + @Override + public boolean needsToCollectFromSingleBucket() { + return true; } }; } @@ -147,6 +153,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory sigTermsFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException { if ((includeExclude != null) && (includeExclude.isRegexBased())) { @@ -155,7 +162,8 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac "values for include/exclude clauses used to filter numeric fields"); } - if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) { + ValuesSource.Numeric numericValuesSource = (ValuesSource.Numeric) valuesSource; + if (numericValuesSource.isFloatingPoint()) { throw new UnsupportedOperationException("No support for examining floating point numerics"); } @@ -164,10 +172,15 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac longFilter = includeExclude.convertToLongFilter(format); } - return new SignificantLongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, - bucketCountThresholds, context, parent, significanceHeuristic, sigTermsFactory, longFilter, - metadata); + return new NumericTermsAggregator(name, factories, + agg -> agg.new SignificantLongTermsResults(sigTermsFactory, significanceHeuristic, collectsFromSingleBucket), + numericValuesSource, format, null, bucketCountThresholds, context, parent, SubAggCollectionMode.BREADTH_FIRST, + longFilter, collectsFromSingleBucket, metadata); + } + @Override + public boolean needsToCollectFromSingleBucket() { + return false; } }; } @@ -203,11 +216,12 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac ? null : filterBuilder.toQuery(queryShardContext); IndexSearcher searcher = queryShardContext.searcher(); - this.supersetNumDocs = filter == null - // Important - need to use the doc count that includes deleted docs - // or we have this issue: https://github.com/elastic/elasticsearch/issues/7951 - ? searcher.getIndexReader().maxDoc() - : searcher.count(filter); + /* + * We need to use a superset size that includes deleted docs or we + * could end up blowing up with bad statistics that cause us to blow + * up later on. + */ + this.supersetNumDocs = filter == null ? searcher.getIndexReader().maxDoc() : searcher.count(filter); this.bucketCountThresholds = bucketCountThresholds; this.significanceHeuristic = significanceHeuristic; } @@ -220,6 +234,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac } private FilterableTermsEnum getTermsEnum(String field) throws IOException { + // TODO this method helps because of asMultiBucketAggregator. Once we remove it we can move this logic into the aggregators. if (termsEnum != null) { return termsEnum; } @@ -285,16 +300,16 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Aggregator parent, boolean collectsFromSingleBucket, Map metadata) throws IOException { - if (collectsFromSingleBucket == false) { - return asMultiBucketAggregator(this, searchContext, parent); - } - AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(), SignificantTermsAggregationBuilder.NAME); if (aggregatorSupplier instanceof SignificantTermsAggregatorSupplier == false) { throw new AggregationExecutionException("Registry miss-match - expected SignificantTermsAggregatorSupplier, found [" + aggregatorSupplier.getClass().toString() + "]"); } + SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier; + if (collectsFromSingleBucket == false && sigTermsAggregatorSupplier.needsToCollectFromSingleBucket()) { + return asMultiBucketAggregator(this, searchContext, parent); + } numberOfAggregatorsCreated++; BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds); @@ -314,12 +329,10 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize())); } - SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier; - // TODO we should refactor so that we don't need to use this Factory as a singleton (e.g. stop passing `this` to the aggregators) return sigTermsAggregatorSupplier.build(name, factories, valuesSource, config.format(), bucketCountThresholds, includeExclude, executionHint, searchContext, parent, - significanceHeuristic, this, metadata); + significanceHeuristic, this, collectsFromSingleBucket, metadata); } public enum ExecutionMode { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java index adb94ebd553..24f630ca4b6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorSupplier.java @@ -41,5 +41,8 @@ interface SignificantTermsAggregatorSupplier extends AggregatorSupplier { Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory sigTermsFactory, + boolean collectsFromSingleBucket, Map metadata) throws IOException; + + boolean needsToCollectFromSingleBucket(); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 30235ce0ef1..e56a0393856 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder; import org.elasticsearch.search.aggregations.NonCollectingAggregator; import org.elasticsearch.search.aggregations.bucket.BucketUtils; +import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator.ResultStrategy; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds; import org.elasticsearch.search.aggregations.support.AggregatorSupplier; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; @@ -48,6 +49,7 @@ import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.function.Function; public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(TermsAggregatorFactory.class)); @@ -162,7 +164,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { + "include/exclude clauses used to filter numeric fields"); } - IncludeExclude.LongFilter longFilter = null; if (subAggCollectMode == null) { // TODO can we remove concept of AggregatorFactories.EMPTY? if (factories != AggregatorFactories.EMPTY) { @@ -171,20 +172,23 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { subAggCollectMode = SubAggCollectionMode.DEPTH_FIRST; } } - if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) { + + ValuesSource.Numeric numericValuesSource = (ValuesSource.Numeric) valuesSource; + IncludeExclude.LongFilter longFilter = null; + Function> resultStrategy; + if (numericValuesSource.isFloatingPoint()) { if (includeExclude != null) { longFilter = includeExclude.convertToDoubleFilter(); } - return new DoubleTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, order, - bucketCountThresholds, context, parent, subAggCollectMode, showTermDocCountError, longFilter, - collectsFromSingleBucket, metadata); + resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError); + } else { + if (includeExclude != null) { + longFilter = includeExclude.convertToLongFilter(format); + } + resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError); } - if (includeExclude != null) { - longFilter = includeExclude.convertToLongFilter(format); - } - return new LongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, order, - bucketCountThresholds, context, parent, subAggCollectMode, showTermDocCountError, longFilter, - collectsFromSingleBucket, metadata); + return new NumericTermsAggregator(name, factories, resultStrategy, numericValuesSource, format, order, + bucketCountThresholds, context, parent, subAggCollectMode, longFilter, collectsFromSingleBucket, metadata); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java index 569744b182d..6d3ba2e1087 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java @@ -65,7 +65,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -815,7 +815,7 @@ public class NestedAggregatorTests extends AggregatorTestCase { } /** - * {@link LongTermsAggregator} is the first complex bucking aggregation + * {@link NumericTermsAggregator} is the first complex bucking aggregation * that stopped wrapping itself in {@link AggregatorFactory#asMultiBucketAggregator} * so this tests that nested works properly inside of it. */ diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregatorTests.java index 7d7c5b48fb2..410c3fd081f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregatorTests.java @@ -38,7 +38,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; @@ -247,7 +247,7 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase { } /** - * {@link LongTermsAggregator} is the first complex bucking aggregation + * {@link NumericTermsAggregator} is the first complex bucking aggregation * that stopped wrapping itself in {@link AggregatorFactory#asMultiBucketAggregator} * so this tests that nested works properly inside of it. */ diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java index 3300bbc8ed7..1ea1a840030 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java @@ -23,14 +23,17 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; @@ -60,6 +63,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms; +import static org.hamcrest.Matchers.equalTo; public class SignificantTermsAggregatorTests extends AggregatorTestCase { @@ -372,6 +376,52 @@ public class SignificantTermsAggregatorTests extends AggregatorTestCase { } } + public void testThreeLayerLong() throws IOException { + try (Directory dir = newDirectory()) { + try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) { + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + for (int k = 0; k < 10; k++) { + Document d = new Document(); + d.add(new SortedNumericDocValuesField("i", i)); + d.add(new SortedNumericDocValuesField("j", j)); + d.add(new SortedNumericDocValuesField("k", k)); + writer.addDocument(d); + } + } + } + try (IndexReader reader = maybeWrapReaderEs(writer.getReader())) { + IndexSearcher searcher = newIndexSearcher(reader); + SignificantTermsAggregationBuilder request = new SignificantTermsAggregationBuilder("i").field("i") + .subAggregation(new SignificantTermsAggregationBuilder("j").field("j") + .subAggregation(new SignificantTermsAggregationBuilder("k").field("k"))); + SignificantLongTerms result = search(searcher, new MatchAllDocsQuery(), request, + longField("i"), longField("j"), longField("k")); + for (int i = 0; i < 10; i++) { + SignificantLongTerms.Bucket iBucket = result.getBucketByKey(Integer.toString(i)); + assertThat(iBucket.getDocCount(), equalTo(100L)); + SignificantLongTerms jAgg = iBucket.getAggregations().get("j"); + for (int j = 0; j < 10; j++) { + SignificantLongTerms.Bucket jBucket = jAgg.getBucketByKey(Integer.toString(j)); + assertThat(jBucket.getDocCount(), equalTo(10L)); + SignificantLongTerms kAgg = jBucket.getAggregations().get("k"); + for (int k = 0; k < 10; k++) { + SignificantLongTerms.Bucket kBucket = kAgg.getBucketByKey(Integer.toString(k)); + assertThat(kBucket.getDocCount(), equalTo(1L)); + } + } + } + } + } + } + } + + private NumberFieldMapper.NumberFieldType longField(String name) { + NumberFieldMapper.NumberFieldType type = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + type.setName(name); + return type; + } + private void addMixedTextDocs(TextFieldType textFieldType, IndexWriter w) throws IOException { for (int i = 0; i < 10; i++) { Document doc = new Document(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java index ec79caeb59b..a1a135ea58f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java @@ -39,6 +39,8 @@ import org.elasticsearch.index.mapper.TextFieldMapper.TextFieldType; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler; import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantTerms; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantTextAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import java.io.IOException;