From 98c379c507a8cc93ae6015a5355fc5b6a213c0f6 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 4 Jun 2020 14:32:32 -0400 Subject: [PATCH] Merge remaining sig_terms into terms (#57397) (#57687) Merges the remaining implementation of `significant_terms` into `terms` so that we can more easilly make them work properly without `asMultiBucketAggregator` which *should* save memory and speed them up. Relates #56487 --- .../test/search.aggregation/20_terms.yml | 32 +- .../GlobalOrdinalsStringTermsAggregator.java | 15 +- .../terms/MapStringTermsAggregator.java | 447 ++++++++++++++++++ .../bucket/terms/NumericTermsAggregator.java | 13 +- .../SignificantStringTermsAggregator.java | 141 ------ .../SignificantTermsAggregatorFactory.java | 17 +- .../bucket/terms/StringTermsAggregator.java | 182 ------- .../bucket/terms/TermsAggregatorFactory.java | 17 +- 8 files changed, 521 insertions(+), 343 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java delete mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java delete mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml index d9b65efa46b..50835223fc0 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml @@ -779,7 +779,7 @@ setup: body: { "size" : 0, "aggs" : { "no_field_terms" : { "terms" : { "size": 1 } } } } --- -"global ords profiler": +"string profiler": - skip: version: " - 7.8.99" reason: debug information added in 7.9.0 @@ -831,6 +831,36 @@ setup: - match: { profile.shards.0.aggregations.0.children.0.type: MaxAggregator } - match: { profile.shards.0.aggregations.0.children.0.description: max_number } + - do: + search: + index: test_1 + body: + profile: true + size: 0 + aggs: + str_terms: + terms: + field: str + collect_mode: breadth_first + execution_hint: map + aggs: + max_number: + max: + field: number + - match: { aggregations.str_terms.buckets.0.key: sheep } + - match: { aggregations.str_terms.buckets.0.max_number.value: 3 } + - match: { aggregations.str_terms.buckets.1.key: cow } + - match: { aggregations.str_terms.buckets.1.max_number.value: 1 } + - match: { aggregations.str_terms.buckets.2.key: pig } + - match: { aggregations.str_terms.buckets.2.max_number.value: 1 } + - match: { profile.shards.0.aggregations.0.type: MapStringTermsAggregator } + - match: { profile.shards.0.aggregations.0.description: str_terms } + - match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 } + - match: { profile.shards.0.aggregations.0.debug.deferred_aggregators: [ max_number ] } + - match: { profile.shards.0.aggregations.0.debug.result_strategy: terms } + - match: { profile.shards.0.aggregations.0.children.0.type: MaxAggregator } + - match: { profile.shards.0.aggregations.0.children.0.description: max_number } + - do: indices.create: index: test_3 diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index bd5d37e8460..0d54f998370 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -551,11 +551,12 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr B[] topBuckets = buildBuckets(ordered.size()); for (int i = ordered.size() - 1; i >= 0; --i) { topBuckets[i] = convertTempBucketToRealBucket(ordered.pop()); + otherDocCount[0] -= topBuckets[i].getDocCount(); } buildSubAggs(topBuckets); return new InternalAggregation[] { - buildResult(topBuckets) + buildResult(topBuckets, otherDocCount[0]) }; } @@ -608,7 +609,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr /** * Turn the buckets into an aggregation result. */ - abstract R buildResult(B[] topBuckets); + abstract R buildResult(B[] topBuckets, long otherDocCount); /** * Build an "empty" result. Only called if there isn't any data on this @@ -621,8 +622,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr * Builds results for the standard {@code terms} aggregation. */ class StandardTermsResults extends ResultStrategy { - private long otherDocCount; - @Override String describe() { return "terms"; @@ -648,7 +647,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr spare.globalOrd = globalOrd; spare.bucketOrd = bucketOrd; spare.docCount = docCount; - otherDocCount += docCount; } @Override @@ -660,7 +658,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); result.bucketOrd = temp.bucketOrd; - otherDocCount -= temp.docCount; result.docCountError = 0; return result; } @@ -671,7 +668,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - StringTerms buildResult(StringTerms.Bucket[] topBuckets) { + StringTerms buildResult(StringTerms.Bucket[] topBuckets, long otherDocCount) { return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), 0); @@ -707,7 +704,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr @Override String describe() { - return "terms"; + return "significant_terms"; } @Override @@ -763,7 +760,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets) { + SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets, long otherDocCount) { return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSize, termsAggFactory.getSupersetNumDocs(), significanceHeuristic, Arrays.asList(topBuckets)); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java new file mode 100644 index 00000000000..e672febcd64 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -0,0 +1,447 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.terms; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.InternalOrder; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** + * An aggregator of string values that hashes the strings on the fly rather + * than up front like the {@link GlobalOrdinalsStringTermsAggregator}. + */ +public class MapStringTermsAggregator extends AbstractStringTermsAggregator { + private final ResultStrategy resultStrategy; + private final ValuesSource valuesSource; + private final BytesRefHash bucketOrds; + private final IncludeExclude.StringFilter includeExclude; + + public MapStringTermsAggregator( + String name, + AggregatorFactories factories, + Function> resultStrategy, + ValuesSource valuesSource, + BucketOrder order, + DocValueFormat format, + BucketCountThresholds bucketCountThresholds, + IncludeExclude.StringFilter includeExclude, + SearchContext context, + Aggregator parent, + SubAggCollectionMode collectionMode, + boolean showTermDocCountError, + Map metadata + ) throws IOException { + super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); + this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. + this.valuesSource = valuesSource; + this.includeExclude = includeExclude; + bucketOrds = new BytesRefHash(1, context.bigArrays()); + } + + @Override + public ScoreMode scoreMode() { + if (valuesSource != null && valuesSource.needsScores()) { + return ScoreMode.COMPLETE; + } + return super.scoreMode(); + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + SortedBinaryDocValues values = valuesSource.bytesValues(ctx); + return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) { + final BytesRefBuilder previous = new BytesRefBuilder(); + + @Override + public void collect(int doc, long bucket) throws IOException { + assert bucket == 0; + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); + + // SortedBinaryDocValues don't guarantee uniqueness so we + // need to take care of dups + previous.clear(); + for (int i = 0; i < valuesCount; ++i) { + final BytesRef bytes = values.nextValue(); + if (includeExclude != null && false == includeExclude.accept(bytes)) { + continue; + } + if (i > 0 && previous.get().equals(bytes)) { + continue; + } + long bucketOrdinal = bucketOrds.add(bytes); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = -1 - bucketOrdinal; + collectExistingBucket(sub, doc, bucketOrdinal); + } else { + collectBucket(sub, doc, bucketOrdinal); + } + previous.copyBytes(bytes); + } + } + } + }); + } + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + return resultStrategy.buildAggregations(owningBucketOrds); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return buildEmptyTermsAggregation(); + } + + @Override + public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); + add.accept("result_strategy", resultStrategy.describe()); + } + + @Override + public void doClose() { + Releasables.close(bucketOrds, resultStrategy); + } + + /** + * Strategy for building results. + */ + abstract class ResultStrategy + implements + Releasable { + + private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; + + collectZeroDocEntriesIfNeeded(); + + int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); + + long otherDocCount = 0; + PriorityQueue ordered = buildPriorityQueue(size); + B spare = null; + for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) { + long docCount = bucketDocCount(bucketOrd); + otherDocCount += docCount; + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + spare = buildEmptyBucket(); + } + updateBucket(spare, bucketOrd, docCount); + spare = ordered.insertWithOverflow(spare); + } + + B[] topBuckets = buildBuckets(ordered.size()); + for (int i = ordered.size() - 1; i >= 0; --i) { + topBuckets[i] = ordered.pop(); + otherDocCount -= topBuckets[i].getDocCount(); + finalizeBucket(topBuckets[i]); + } + + buildSubAggs(topBuckets); + return new InternalAggregation[] { + buildResult(topBuckets, otherDocCount) + }; + } + + /** + * Short description of the collection mechanism added to the profile + * output to help with debugging. + */ + abstract String describe(); + + /** + * Wrap the "standard" numeric terms collector to collect any more + * information that this result type may need. + */ + abstract LeafBucketCollector wrapCollector(LeafBucketCollector primary); + + /** + * Collect extra entries for "zero" hit documents if they were requested + * and required. + */ + abstract void collectZeroDocEntriesIfNeeded() throws IOException; + + /** + * Build an empty temporary bucket. + */ + abstract B buildEmptyBucket(); + + /** + * Build a {@link PriorityQueue} to sort the buckets. After we've + * collected all of the buckets we'll collect all entries in the queue. + */ + abstract PriorityQueue buildPriorityQueue(int size); + + /** + * Update fields in {@code spare} to reflect information collected for + * this bucket ordinal. + */ + abstract void updateBucket(B spare, long bucketOrd, long docCount) throws IOException; + + /** + * Build an array of buckets for a particular ordinal to collect the + * results. The populated list is passed to {@link #buildResult}. + */ + abstract B[] buildBuckets(int size); + + /** + * Finalize building a bucket. Called once we know that the bucket will + * be included in the results. + */ + abstract void finalizeBucket(B bucket); + + /** + * Build the sub-aggregations into the buckets. This will usually + * delegate to {@link #buildSubAggsForAllBuckets}. + */ + abstract void buildSubAggs(B[] topBuckets) throws IOException; + + /** + * Turn the buckets into an aggregation result. + */ + abstract R buildResult(B[] topBuckets, long otherDocCount); + + /** + * Build an "empty" result. Only called if there isn't any data on this + * shard. + */ + abstract R buildEmptyResult(); + } + + /** + * Builds results for the standard {@code terms} aggregation. + */ + class StandardTermsResults extends ResultStrategy { + @Override + String describe() { + return "terms"; + } + + @Override + LeafBucketCollector wrapCollector(LeafBucketCollector primary) { + return primary; + } + + @Override + void collectZeroDocEntriesIfNeeded() throws IOException { + if (bucketCountThresholds.getMinDocCount() != 0) { + return; + } + if (InternalOrder.isCountDesc(order) && bucketOrds.size() >= bucketCountThresholds.getRequiredSize()) { + return; + } + // we need to fill-in the blanks + for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) { + SortedBinaryDocValues values = valuesSource.bytesValues(ctx); + // brute force + for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { + if (values.advanceExact(docId)) { + int valueCount = values.docValueCount(); + for (int i = 0; i < valueCount; ++i) { + BytesRef term = values.nextValue(); + if (includeExclude == null || includeExclude.accept(term)) { + bucketOrds.add(term); + } + } + } + } + } + } + + @Override + StringTerms.Bucket buildEmptyBucket() { + return new StringTerms.Bucket(new BytesRef(), 0, null, showTermDocCountError, 0, format); + } + + @Override + PriorityQueue buildPriorityQueue(int size) { + return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); + } + + @Override + void updateBucket(StringTerms.Bucket spare, long bucketOrd, long docCount) throws IOException { + bucketOrds.get(bucketOrd, spare.termBytes); + spare.docCount = docCount; + spare.bucketOrd = bucketOrd; + } + + @Override + StringTerms.Bucket[] buildBuckets(int size) { + return new StringTerms.Bucket[size]; + } + + @Override + void finalizeBucket(StringTerms.Bucket bucket) { + /* + * termBytes contains a reference to the bytes held by the + * bucketOrds which will be invalid once the aggregation is + * closed so we have to copy it. + */ + bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes); + } + + @Override + void buildSubAggs(StringTerms.Bucket[] topBuckets) throws IOException { + buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + } + + @Override + StringTerms buildResult(StringTerms.Bucket[] topBuckets, long otherDocCount) { + return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), + metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, + Arrays.asList(topBuckets), 0); + } + + @Override + StringTerms buildEmptyResult() { + return buildEmptyTermsAggregation(); + } + + @Override + public void close() {} + } + + /** + * Builds results for the {@code significant_terms} aggregation. + */ + class SignificantTermsResults extends ResultStrategy { + // TODO a reference to the factory is weird - probably should be reference to what we need from it. + private final SignificantTermsAggregatorFactory termsAggFactory; + private final SignificanceHeuristic significanceHeuristic; + + private long subsetSize = 0; + + SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) { + this.termsAggFactory = termsAggFactory; + this.significanceHeuristic = significanceHeuristic; + } + + @Override + String describe() { + return "significant_terms"; + } + + @Override + LeafBucketCollector wrapCollector(LeafBucketCollector primary) { + return new LeafBucketCollectorBase(primary, null) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + super.collect(doc, owningBucketOrd); + subsetSize++; + } + }; + } + + @Override + void collectZeroDocEntriesIfNeeded() throws IOException {} + + @Override + SignificantStringTerms.Bucket buildEmptyBucket() { + return new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); + } + + @Override + PriorityQueue buildPriorityQueue(int size) { + return new BucketSignificancePriorityQueue<>(size); + } + + @Override + void updateBucket(SignificantStringTerms.Bucket spare, long bucketOrd, long docCount) throws IOException { + bucketOrds.get(bucketOrd, spare.termBytes); + spare.subsetDf = docCount; + spare.bucketOrd = bucketOrd; + spare.subsetSize = subsetSize; + spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); + spare.supersetSize = termsAggFactory.getSupersetNumDocs(); + /* + * During shard-local down-selection we use subset/superset stats + * that are for this shard only. Back at the central reducer these + * properties will be updated with global stats. + */ + spare.updateScore(significanceHeuristic); + } + + @Override + SignificantStringTerms.Bucket[] buildBuckets(int size) { + return new SignificantStringTerms.Bucket[size]; + } + + @Override + void finalizeBucket(SignificantStringTerms.Bucket bucket) { + /* + * termBytes contains a reference to the bytes held by the + * bucketOrds which will be invalid once the aggregation is + * closed so we have to copy it. + */ + bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes); + } + + @Override + void buildSubAggs(SignificantStringTerms.Bucket[] topBuckets) throws IOException { + buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + } + + @Override + SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets, long otherDocCount) { + return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), format, subsetSize, termsAggFactory.getSupersetNumDocs(), significanceHeuristic, Arrays.asList(topBuckets)); + } + + @Override + SignificantStringTerms buildEmptyResult() { + return buildEmptySignificantTermsAggregation(significanceHeuristic); + } + + @Override + public void close() { + termsAggFactory.close(); + } + } +} + diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index af5df2ba7ea..50e43d99e15 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -164,15 +164,16 @@ public class NumericTermsAggregator extends TermsAggregator { BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts[ordIdx] += docCount; + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } if (spare == null) { spare = emptyBucketBuilder.get(); } - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += docCount; - if (bucketCountThresholds.getShardMinDocCount() <= docCount) { - updateBucket(spare, ordsEnum, docCount); - spare = ordered.insertWithOverflow(spare); - } + updateBucket(spare, ordsEnum, docCount); + spare = ordered.insertWithOverflow(spare); } // Get the top buckets diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java deleted file mode 100644 index 4aa28538cb4..00000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; -import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.internal.ContextIndexSearcher; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; - -import static java.util.Collections.emptyList; - -/** - * An aggregator of significant string values. - */ -public class SignificantStringTermsAggregator extends StringTermsAggregator { - - protected long numCollectedDocs; - protected final SignificantTermsAggregatorFactory termsAggFactory; - private final SignificanceHeuristic significanceHeuristic; - - public SignificantStringTermsAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format, - BucketCountThresholds bucketCountThresholds, IncludeExclude.StringFilter includeExclude, SearchContext aggregationContext, - Aggregator parent, SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggFactory, - Map metadata) throws IOException { - - super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, aggregationContext, parent, - SubAggCollectionMode.BREADTH_FIRST, false, metadata); - this.significanceHeuristic = significanceHeuristic; - this.termsAggFactory = termsAggFactory; - } - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) { - @Override - public void collect(int doc, long bucket) throws IOException { - super.collect(doc, bucket); - numCollectedDocs++; - } - }; - } - - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - - final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - long supersetSize = termsAggFactory.getSupersetNumDocs(); - long subsetSize = numCollectedDocs; - - BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); - SignificantStringTerms.Bucket spare = null; - for (int i = 0; i < bucketOrds.size(); i++) { - final int docCount = bucketDocCount(i); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - - if (spare == null) { - spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); - } - - bucketOrds.get(i, spare.termBytes); - spare.subsetDf = docCount; - spare.subsetSize = subsetSize; - spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); - spare.supersetSize = supersetSize; - // During shard-local down-selection we use subset/superset stats - // that are for this shard only - // Back at the central reducer these properties will be updated with - // global stats - spare.updateScore(significanceHeuristic); - - spare.bucketOrd = i; - spare = ordered.insertWithOverflow(spare); - } - - final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = ordered.pop(); - // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be - // recycled at some point - list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes); - } - - buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - return new InternalAggregation[] { - new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list)) - }; - } - - @Override - public SignificantStringTerms buildEmptyAggregation() { - // We need to account for the significance of a miss in our global stats - provide corpus size as context - ContextIndexSearcher searcher = context.searcher(); - IndexReader topReader = searcher.getIndexReader(); - int supersetSize = topReader.numDocs(); - return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, 0, supersetSize, significanceHeuristic, emptyList()); - } - - @Override - public void doClose() { - Releasables.close(bucketOrds, termsAggFactory); - } - -} - diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index c7124b3cf28..5542fec6bd0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -362,8 +362,21 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac Map metadata) throws IOException { final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format); - return new SignificantStringTermsAggregator(name, factories, valuesSource, format, bucketCountThresholds, filter, - aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, metadata); + return new MapStringTermsAggregator( + name, + factories, + a -> a.new SignificantTermsResults(termsAggregatorFactory, significanceHeuristic), + valuesSource, + null, + format, + bucketCountThresholds, + filter, + aggregationContext, + parent, + SubAggCollectionMode.BREADTH_FIRST, + false, + metadata + ); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java deleted file mode 100644 index 5f264762948..00000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.util.BytesRefHash; -import org.elasticsearch.index.fielddata.SortedBinaryDocValues; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalOrder; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; - -/** - * An aggregator of string values. - */ -public class StringTermsAggregator extends AbstractStringTermsAggregator { - - private final ValuesSource valuesSource; - protected final BytesRefHash bucketOrds; - private final IncludeExclude.StringFilter includeExclude; - - public StringTermsAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource, - BucketOrder order, DocValueFormat format, BucketCountThresholds bucketCountThresholds, - IncludeExclude.StringFilter includeExclude, SearchContext context, - Aggregator parent, SubAggCollectionMode collectionMode, boolean showTermDocCountError, - Map metadata) throws IOException { - - super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); - this.valuesSource = valuesSource; - this.includeExclude = includeExclude; - bucketOrds = new BytesRefHash(1, context.bigArrays()); - } - - @Override - public ScoreMode scoreMode() { - if (valuesSource != null && valuesSource.needsScores()) { - return ScoreMode.COMPLETE; - } - return super.scoreMode(); - } - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - final SortedBinaryDocValues values = valuesSource.bytesValues(ctx); - return new LeafBucketCollectorBase(sub, values) { - final BytesRefBuilder previous = new BytesRefBuilder(); - - @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0; - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - - // SortedBinaryDocValues don't guarantee uniqueness so we - // need to take care of dups - previous.clear(); - for (int i = 0; i < valuesCount; ++i) { - final BytesRef bytes = values.nextValue(); - if (includeExclude != null && !includeExclude.accept(bytes)) { - continue; - } - if (i > 0 && previous.get().equals(bytes)) { - continue; - } - long bucketOrdinal = bucketOrds.add(bytes); - if (bucketOrdinal < 0) { // already seen - bucketOrdinal = -1 - bucketOrdinal; - collectExistingBucket(sub, doc, bucketOrdinal); - } else { - collectBucket(sub, doc, bucketOrdinal); - } - previous.copyBytes(bytes); - } - } - } - }; - } - - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - - if (bucketCountThresholds.getMinDocCount() == 0 - && (InternalOrder.isCountDesc(order) == false - || bucketOrds.size() < bucketCountThresholds.getRequiredSize())) { - // we need to fill-in the blanks - for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) { - final SortedBinaryDocValues values = valuesSource.bytesValues(ctx); - // brute force - for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) { - if (values.advanceExact(docId)) { - final int valueCount = values.docValueCount(); - for (int i = 0; i < valueCount; ++i) { - final BytesRef term = values.nextValue(); - if (includeExclude == null || includeExclude.accept(term)) { - bucketOrds.add(term); - } - } - } - } - } - } - - final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - - long otherDocCount = 0; - BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); - StringTerms.Bucket spare = null; - for (int i = 0; i < bucketOrds.size(); i++) { - if (spare == null) { - spare = new StringTerms.Bucket(new BytesRef(), 0, null, showTermDocCountError, 0, format); - } - bucketOrds.get(i, spare.termBytes); - spare.docCount = bucketDocCount(i); - otherDocCount += spare.docCount; - spare.bucketOrd = i; - if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) { - spare = ordered.insertWithOverflow(spare); - } - } - - // Get the top buckets - final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; --i) { - final StringTerms.Bucket bucket = ordered.pop(); - list[i] = bucket; - otherDocCount -= bucket.docCount; - bucket.termBytes = BytesRef.deepCopyOf(list[i].termBytes); - bucket.docCountError = 0; - } - - buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - return new InternalAggregation[] { - new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, - Arrays.asList(list), 0) - }; - } - - @Override - public InternalAggregation buildEmptyAggregation() { - return buildEmptyTermsAggregation(); - } - - @Override - public void doClose() { - Releasables.close(bucketOrds); - } - -} - diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 62953e229b5..bf2789ffd59 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -324,8 +324,21 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory { boolean showTermDocCountError, Map metadata) throws IOException { final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format); - return new StringTermsAggregator(name, factories, valuesSource, order, format, bucketCountThresholds, filter, - context, parent, subAggCollectMode, showTermDocCountError, metadata); + return new MapStringTermsAggregator( + name, + factories, + a -> a.new StandardTermsResults(), + valuesSource, + order, + format, + bucketCountThresholds, + filter, + context, + parent, + subAggCollectMode, + showTermDocCountError, + metadata + ); } }, GLOBAL_ORDINALS(new ParseField("global_ordinals")) {