Save memory on numeric sig terms when not top (backport of #56789) (#57221)

This saves memory when running numeric significant terms which are not
at the top level by merging its collection into numeric terms and relying
on the optimization that we made in #55873.
This commit is contained in:
Nik Everett 2020-05-27 12:03:28 -04:00 committed by GitHub
parent 2f6089c2b9
commit 4d5be7c817
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 772 additions and 453 deletions

View File

@ -782,7 +782,7 @@ This yields the following aggregation profile output:
{
"aggregations" : [
{
"type" : "LongTermsAggregator",
"type" : "NumericTermsAggregator",
"description" : "my_scoped_agg",
"time_in_nanos" : 195386,
"breakdown" : {
@ -796,6 +796,7 @@ This yields the following aggregation profile output:
"collect_count" : 4
},
"debug": {
"result_strategy": "long_terms",
"total_buckets": 4
}
},
@ -815,7 +816,7 @@ This yields the following aggregation profile output:
},
"children" : [
{
"type" : "LongTermsAggregator",
"type" : "NumericTermsAggregator",
"description" : "my_level_agg",
"time_in_nanos" : 160329,
"breakdown" : {
@ -829,6 +830,7 @@ This yields the following aggregation profile output:
"collect_count" : 4,
},
"debug": {
"result_strategy": "long_terms",
"total_buckets": 4
}
}
@ -845,10 +847,10 @@ This yields the following aggregation profile output:
// TESTRESPONSE[s/"id": "\[P6-vulHtQRWuD4YnubWb7A\]\[test\]\[0\]"/"id": $body.profile.shards.0.id/]
From the profile structure we can see that the `my_scoped_agg` is internally
being run as a `LongTermsAggregator` (because the field it is aggregating,
being run as a `NumericTermsAggregator` (because the field it is aggregating,
`likes`, is a numeric field). At the same level, we see a `GlobalAggregator`
which comes from `my_global_agg`. That aggregation then has a child
`LongTermsAggregator` which comes from the second term's aggregation on `likes`.
`NumericTermsAggregator` which comes from the second term's aggregation on `likes`.
The `time_in_nanos` field shows the time executed by each aggregation, and is
inclusive of all children. While the overall time is useful, the `breakdown`

View File

@ -837,7 +837,8 @@ setup:
field: number
- match: { aggregations.n_terms.buckets.0.key: 1 }
- match: { aggregations.n_terms.buckets.1.key: 3 }
- match: { profile.shards.0.aggregations.0.type: LongTermsAggregator }
- match: { profile.shards.0.aggregations.0.type: NumericTermsAggregator }
- match: { profile.shards.0.aggregations.0.description: n_terms }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.result_strategy: long_terms }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 2 }

View File

@ -196,7 +196,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
}
public void testDeletesIssue7951() throws Exception {
public void testPopularTermManyDeletedDocs() throws Exception {
String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}";
assertAcked(prepareCreate(INDEX_NAME).setSettings(settings, XContentType.JSON)
.addMapping("_doc", "text", "type=keyword", CLASS_FIELD, "type=keyword"));
@ -238,7 +238,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
significantTerms("sig_terms")
.field(TEXT_FIELD)
.minDocCount(1)));
}else
} else
{
request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(
@ -478,8 +478,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
.subAggregation(significantText("mySignificantTerms", TEXT_FIELD)
.significanceHeuristic(scriptHeuristic)
.minDocCount(1).shardSize(2).size(2)));
}else
{
} else {
request = client().prepareSearch(INDEX_NAME)
.addAggregation(terms("class").field(CLASS_FIELD)
.subAggregation(significantTerms("mySignificantTerms")

View File

@ -39,7 +39,7 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
public static final String NAME = "dterms";
static class Bucket extends InternalTerms.Bucket<Bucket> {
private final double term;
double term;
Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError,
DocValueFormat format) {

View File

@ -1,78 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms.Bucket;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class DoubleTermsAggregator extends LongTermsAggregator {
DoubleTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
BucketOrder order, BucketCountThresholds bucketCountThresholds, SearchContext aggregationContext, Aggregator parent,
SubAggCollectionMode collectionMode, boolean showTermDocCountError, IncludeExclude.LongFilter longFilter,
boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, format, order, bucketCountThresholds, aggregationContext, parent, collectionMode,
showTermDocCountError, longFilter, collectsFromSingleBucket, metadata);
}
@Override
protected SortedNumericDocValues getValues(Numeric valuesSource, LeafReaderContext ctx) throws IOException {
return FieldData.toSortableLongBits(valuesSource.doubleValues(ctx));
}
@Override
protected InternalAggregation buildResult(long otherDocCount, List<Bucket> buckets) {
return convertToDouble((LongTerms) super.buildResult(otherDocCount, buckets));
}
@Override
public DoubleTerms buildEmptyAggregation() {
final LongTerms terms = (LongTerms) super.buildEmptyAggregation();
return convertToDouble(terms);
}
private static DoubleTerms convertToDouble(LongTerms terms) {
List<DoubleTerms.Bucket> buckets = terms.buckets.stream().map(DoubleTermsAggregator::convertToDouble).collect(Collectors.toList());
return new DoubleTerms(terms.getName(), terms.order, terms.requiredSize, terms.minDocCount,
terms.getMetadata(), terms.format, terms.shardSize, terms.showTermDocCountError, terms.otherDocCount, buckets,
terms.docCountError);
}
private static DoubleTerms.Bucket convertToDouble(LongTerms.Bucket bucket) {
double value = NumericUtils.sortableLongToDouble(bucket.term);
return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError,
bucket.format);
}
}

View File

@ -1,196 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms.Bucket;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyList;
public class LongTermsAggregator extends TermsAggregator {
protected final ValuesSource.Numeric valuesSource;
protected final LongKeyedBucketOrds bucketOrds;
private boolean showTermDocCountError;
private LongFilter longFilter;
public LongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
BucketOrder order, BucketCountThresholds bucketCountThresholds, SearchContext aggregationContext, Aggregator parent,
SubAggCollectionMode subAggCollectMode, boolean showTermDocCountError, IncludeExclude.LongFilter longFilter,
boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata);
this.valuesSource = valuesSource;
this.showTermDocCountError = showTermDocCountError;
this.longFilter = longFilter;
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
@Override
public ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
return ScoreMode.COMPLETE;
}
return super.scoreMode();
}
protected SortedNumericDocValues getValues(ValuesSource.Numeric valuesSource, LeafReaderContext ctx) throws IOException {
return valuesSource.longValues(ctx);
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
final SortedNumericDocValues values = getValues(valuesSource, ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
final long val = values.nextValue();
if (previous != val || i == 0) {
if ((longFilter == null) || (longFilter.accept(val))) {
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
}
previous = val;
}
}
}
}
};
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
LongTerms.Bucket[][] topBucketsPerOrd = new LongTerms.Bucket[owningBucketOrds.length][];
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
if (bucketCountThresholds.getMinDocCount() == 0 && (InternalOrder.isCountDesc(order) == false ||
bucketsInOrd < bucketCountThresholds.getRequiredSize())) {
// we need to fill-in the blanks
for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) {
final SortedNumericDocValues values = getValues(valuesSource, ctx);
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
if (values.advanceExact(docId)) {
final int valueCount = values.docValueCount();
for (int v = 0; v < valueCount; ++v) {
long value = values.nextValue();
if (longFilter == null || longFilter.accept(value)) {
bucketOrds.add(owningBucketOrds[ordIdx], value);
}
}
}
}
}
bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
}
final int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
BucketPriorityQueue<LongTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
LongTerms.Bucket spare = null;
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
spare = new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format);
}
spare.term = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += spare.docCount;
spare.bucketOrd = ordsEnum.ord();
if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
}
}
// Get the top buckets
LongTerms.Bucket[] list = topBucketsPerOrd[ordIdx] = new LongTerms.Bucket[ordered.size()];
for (int b = ordered.size() - 1; b >= 0; --b) {
list[b] = ordered.pop();
list[b].docCountError = 0;
otherDocCounts[ordIdx] -= list[b].docCount;
}
}
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(otherDocCounts[ordIdx], Arrays.asList(topBucketsPerOrd[ordIdx]));
}
return result;
}
protected InternalAggregation buildResult(long otherDocCount, List<Bucket> buckets) {
return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
buckets, 0);
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
}
@Override
public void doClose() {
super.doClose();
Releasables.close(bucketOrds);
}
@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("total_buckets", bucketOrds.size());
}
}

View File

@ -0,0 +1,655 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.collect.List;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
public class NumericTermsAggregator extends TermsAggregator {
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource.Numeric valuesSource;
private final LongKeyedBucketOrds bucketOrds;
private final LongFilter longFilter;
public NumericTermsAggregator(
String name,
AggregatorFactories factories,
Function<NumericTermsAggregator, ResultStrategy<?, ?>> resultStrategy,
ValuesSource.Numeric valuesSource,
DocValueFormat format,
BucketOrder order,
BucketCountThresholds bucketCountThresholds,
SearchContext aggregationContext,
Aggregator parent,
SubAggCollectionMode subAggCollectMode,
IncludeExclude.LongFilter longFilter,
boolean collectsFromSingleBucket,
Map<String, Object> metadata
)
throws IOException {
super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata);
this.resultStrategy = resultStrategy.apply(this);
this.valuesSource = valuesSource;
this.longFilter = longFilter;
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
@Override
public ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
return ScoreMode.COMPLETE;
}
return super.scoreMode();
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedNumericDocValues values = resultStrategy.getValues(ctx);
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
int valuesCount = values.docValueCount();
long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long val = values.nextValue();
if (previous != val || i == 0) {
if ((longFilter == null) || (longFilter.accept(val))) {
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
}
previous = val;
}
}
}
}
});
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
}
@Override
public InternalAggregation buildEmptyAggregation() {
return resultStrategy.buildEmptyResult();
}
@Override
public void doClose() {
Releasables.close(super::doClose, bucketOrds, resultStrategy);
}
@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("result_strategy", resultStrategy.describe());
add.accept("total_buckets", bucketOrds.size());
}
/**
* Strategy for building results.
*/
abstract class ResultStrategy<R extends InternalAggregation, B extends InternalMultiBucketAggregation.InternalBucket>
implements
Releasable {
private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
PriorityQueue<B> ordered = buildPriorityQueue(size);
B spare = null;
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
spare = emptyBucketBuilder.get();
}
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (bucketCountThresholds.getShardMinDocCount() <= docCount) {
updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
}
}
// Get the top buckets
B[] bucketsForOrd = buildBuckets(ordered.size());
topBucketsPerOrd[ordIdx] = bucketsForOrd;
for (int b = ordered.size() - 1; b >= 0; --b) {
topBucketsPerOrd[ordIdx][b] = ordered.pop();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
}
}
buildSubAggs(topBucketsPerOrd);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]);
}
return result;
}
/**
* Short description of the collection mechanism added to the profile
* output to help with debugging.
*/
abstract String describe();
/**
* Resolve the doc values to collect results of this type.
*/
abstract SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException;
/**
* Wrap the "standard" numeric terms collector to collect any more
* information that this result type may need.
*/
abstract LeafBucketCollector wrapCollector(LeafBucketCollector primary);
/**
* Build an array to hold the "top" buckets for each ordinal.
*/
abstract B[][] buildTopBucketsPerOrd(int size);
/**
* Build an array of buckets for a particular ordinal. These arrays
* are asigned to the value returned by {@link #buildTopBucketsPerOrd}.
*/
abstract B[] buildBuckets(int size);
/**
* Build a {@linkplain Supplier} that can be used to build "empty"
* buckets. Those buckets will then be {@link #updateBucket updated}
* for each collected bucket.
*/
abstract Supplier<B> emptyBucketBuilder(long owningBucketOrd);
/**
* Update fields in {@code spare} to reflect information collected for
* this bucket ordinal.
*/
abstract void updateBucket(B spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException;
/**
* Build a {@link PriorityQueue} to sort the buckets. After we've
* collected all of the buckets we'll collect all entries in the queue.
*/
abstract PriorityQueue<B> buildPriorityQueue(int size);
/**
* Build the sub-aggregations into the buckets. This will usually
* delegate to {@link #buildSubAggsForAllBuckets}.
*/
abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException;
/**
* Collect extra entries for "zero" hit documents if they were requested
* and required.
*/
abstract void collectZeroDocEntriesIfNeeded(long ord) throws IOException;
/**
* Turn the buckets into an aggregation result.
*/
abstract R buildResult(long owningBucketOrd, long otherDocCounts, B[] topBuckets);
/**
* Build an "empty" result. Only called if there isn't any data on this
* shard.
*/
abstract R buildEmptyResult();
}
abstract class StandardTermsResultStrategy<R extends InternalMappedTerms<R, B>, B extends InternalTerms.Bucket<B>> extends
ResultStrategy<R, B> {
protected final boolean showTermDocCountError;
StandardTermsResultStrategy(boolean showTermDocCountError) {
this.showTermDocCountError = showTermDocCountError;
}
@Override
final LeafBucketCollector wrapCollector(LeafBucketCollector primary) {
return primary;
}
@Override
final PriorityQueue<B> buildPriorityQueue(int size) {
return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
}
@Override
final void buildSubAggs(B[][] topBucketsPerOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
}
@Override
Supplier<B> emptyBucketBuilder(long owningBucketOrd) {
return this::buildEmptyBucket;
}
abstract B buildEmptyBucket();
@Override
final void collectZeroDocEntriesIfNeeded(long ord) throws IOException {
if (bucketCountThresholds.getMinDocCount() != 0) {
return;
}
if (InternalOrder.isCountDesc(order) && bucketOrds.bucketsInOrd(ord) >= bucketCountThresholds.getRequiredSize()) {
return;
}
// we need to fill-in the blanks
for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) {
SortedNumericDocValues values = getValues(ctx);
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
if (values.advanceExact(docId)) {
int valueCount = values.docValueCount();
for (int v = 0; v < valueCount; ++v) {
long value = values.nextValue();
if (longFilter == null || longFilter.accept(value)) {
bucketOrds.add(ord, value);
}
}
}
}
}
}
@Override
public final void close() {}
}
class LongTermsResults extends StandardTermsResultStrategy<LongTerms, LongTerms.Bucket> {
LongTermsResults(boolean showTermDocCountError) {
super(showTermDocCountError);
}
@Override
String describe() {
return "long_terms";
}
@Override
SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException {
return valuesSource.longValues(ctx);
}
@Override
LongTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new LongTerms.Bucket[size][];
}
@Override
LongTerms.Bucket[] buildBuckets(int size) {
return new LongTerms.Bucket[size];
}
@Override
LongTerms.Bucket buildEmptyBucket() {
return new LongTerms.Bucket(0, 0, null, showTermDocCountError, 0, format);
}
@Override
void updateBucket(LongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) {
spare.term = ordsEnum.value();
spare.docCount = docCount;
spare.bucketOrd = ordsEnum.ord();
}
@Override
LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket[] topBuckets) {
return new LongTerms(
name,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
0
);
}
@Override
LongTerms buildEmptyResult() {
return new LongTerms(
name,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
0,
emptyList(),
0
);
}
}
class DoubleTermsResults extends StandardTermsResultStrategy<DoubleTerms, DoubleTerms.Bucket> {
DoubleTermsResults(boolean showTermDocCountError) {
super(showTermDocCountError);
}
@Override
String describe() {
return "double_terms";
}
@Override
SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException {
return FieldData.toSortableLongBits(valuesSource.doubleValues(ctx));
}
@Override
DoubleTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new DoubleTerms.Bucket[size][];
}
@Override
DoubleTerms.Bucket[] buildBuckets(int size) {
return new DoubleTerms.Bucket[size];
}
@Override
DoubleTerms.Bucket buildEmptyBucket() {
return new DoubleTerms.Bucket(0, 0, null, showTermDocCountError, 0, format);
}
@Override
void updateBucket(DoubleTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) {
spare.term = NumericUtils.sortableLongToDouble(ordsEnum.value());
spare.docCount = docCount;
spare.bucketOrd = ordsEnum.ord();
}
@Override
DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bucket[] topBuckets) {
return new DoubleTerms(
name,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
0
);
}
@Override
DoubleTerms buildEmptyResult() {
return new DoubleTerms(
name,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
0,
emptyList(),
0
);
}
}
class SignificantLongTermsResults extends ResultStrategy<SignificantLongTerms, SignificantLongTerms.Bucket> {
private final BackgroundFrequencies backgroundFrequencies;
private final long supersetSize;
private final SignificanceHeuristic significanceHeuristic;
private LongArray subsetSizes;
SignificantLongTermsResults(
SignificantTermsAggregatorFactory termsAggFactory,
SignificanceHeuristic significanceHeuristic,
boolean collectsFromSingleBucket
) {
LookupBackgroundFrequencies lookup = new LookupBackgroundFrequencies(termsAggFactory);
backgroundFrequencies = collectsFromSingleBucket ? lookup : new CacheBackgroundFrequencies(lookup, context.bigArrays());
supersetSize = termsAggFactory.getSupersetNumDocs();
this.significanceHeuristic = significanceHeuristic;
subsetSizes = context.bigArrays().newLongArray(1, true);
}
@Override
SortedNumericDocValues getValues(LeafReaderContext ctx) throws IOException {
return valuesSource.longValues(ctx);
}
@Override
String describe() {
return "significant_terms";
}
@Override
LeafBucketCollector wrapCollector(LeafBucketCollector primary) {
return new LeafBucketCollectorBase(primary, null) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
super.collect(doc, owningBucketOrd);
subsetSizes = context.bigArrays().grow(subsetSizes, owningBucketOrd + 1);
subsetSizes.increment(owningBucketOrd, 1);
}
};
}
@Override
SignificantLongTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new SignificantLongTerms.Bucket[size][];
}
@Override
SignificantLongTerms.Bucket[] buildBuckets(int size) {
return new SignificantLongTerms.Bucket[size];
}
@Override
Supplier<SignificantLongTerms.Bucket> emptyBucketBuilder(long owningBucketOrd) {
long subsetSize = subsetSizes.get(owningBucketOrd);
return () -> new SignificantLongTerms.Bucket(0, subsetSize, 0, supersetSize, 0, null, format, 0);
}
@Override
void updateBucket(SignificantLongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException {
spare.term = ordsEnum.value();
spare.subsetDf = docCount;
spare.supersetDf = backgroundFrequencies.freq(spare.term);
spare.bucketOrd = ordsEnum.ord();
// During shard-local down-selection we use subset/superset stats that are for this shard only
// Back at the central reducer these properties will be updated with global stats
spare.updateScore(significanceHeuristic);
}
@Override
PriorityQueue<SignificantLongTerms.Bucket> buildPriorityQueue(int size) {
return new BucketSignificancePriorityQueue<>(size);
}
@Override
void buildSubAggs(SignificantLongTerms.Bucket[][] topBucketsPerOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
}
@Override
void collectZeroDocEntriesIfNeeded(long ord) throws IOException {}
@Override
SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCounts, SignificantLongTerms.Bucket[] topBuckets) {
return new SignificantLongTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSizes.get(owningBucketOrd),
supersetSize,
significanceHeuristic,
List.of(topBuckets)
);
}
@Override
SignificantLongTerms buildEmptyResult() {
// We need to account for the significance of a miss in our global stats - provide corpus size as context
ContextIndexSearcher searcher = context.searcher();
IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs();
return new SignificantLongTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
0,
supersetSize,
significanceHeuristic,
emptyList()
);
}
@Override
public void close() {
Releasables.close(backgroundFrequencies, subsetSizes);
}
}
/**
* Lookup frequencies for terms.
*/
private interface BackgroundFrequencies extends Releasable {
long freq(long term) throws IOException;
}
/**
* Lookup frequencies for terms.
*/
private static class LookupBackgroundFrequencies implements BackgroundFrequencies {
// TODO a reference to the factory is weird - probably should be reference to what we need from it.
private final SignificantTermsAggregatorFactory termsAggFactory;
LookupBackgroundFrequencies(SignificantTermsAggregatorFactory termsAggFactory) {
this.termsAggFactory = termsAggFactory;
}
@Override
public long freq(long term) throws IOException {
return termsAggFactory.getBackgroundFrequency(term);
}
@Override
public void close() {
termsAggFactory.close();
}
}
/**
* Lookup and cache background frequencies for terms.
*/
private static class CacheBackgroundFrequencies implements BackgroundFrequencies {
private final LookupBackgroundFrequencies lookup;
private final BigArrays bigArrays;
private final LongHash termToPosition;
private LongArray positionToFreq;
CacheBackgroundFrequencies(LookupBackgroundFrequencies lookup, BigArrays bigArrays) {
this.lookup = lookup;
this.bigArrays = bigArrays;
termToPosition = new LongHash(1, bigArrays);
positionToFreq = bigArrays.newLongArray(1, false);
}
@Override
public long freq(long term) throws IOException {
long position = termToPosition.add(term);
if (position < 0) {
return positionToFreq.get(-1 - position);
}
long freq = lookup.freq(term);
positionToFreq = bigArrays.grow(positionToFreq, position + 1);
positionToFreq.set(position, freq);
return freq;
}
@Override
public void close() {
Releasables.close(lookup, termToPosition, positionToFreq);
}
}
}

View File

@ -1,136 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import static java.util.Collections.emptyList;
public class SignificantLongTermsAggregator extends LongTermsAggregator {
public SignificantLongTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
DocValueFormat format, BucketCountThresholds bucketCountThresholds, SearchContext context, Aggregator parent,
SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggFactory,
IncludeExclude.LongFilter includeExclude, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, format, null, bucketCountThresholds, context, parent,
SubAggCollectionMode.BREADTH_FIRST, false, includeExclude, false, metadata);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
}
protected long numCollectedDocs;
private final SignificantTermsAggregatorFactory termsAggFactory;
private final SignificanceHeuristic significanceHeuristic;
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) {
@Override
public void collect(int doc, long bucket) throws IOException {
super.collect(doc, bucket);
numCollectedDocs++;
}
};
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
long bucketsInOrd = bucketOrds.bucketsInOrd(0);
final int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
long supersetSize = termsAggFactory.getSupersetNumDocs();
long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue<SignificantLongTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
SignificantLongTerms.Bucket spare = null;
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0);
while (ordsEnum.next()) {
final int docCount = bucketDocCount(ordsEnum.ord());
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0);
}
spare.term = ordsEnum.value();
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.term);
spare.supersetSize = supersetSize;
// During shard-local down-selection we use subset/superset stats that are for this shard only
// Back at the central reducer these properties will be updated with global stats
spare.updateScore(significanceHeuristic);
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
if (spare == null) {
consumeBucketsAndMaybeBreak(1);
}
}
SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
buildSubAggsForBuckets(list, bucket -> bucket.bucketOrd, (bucket, aggs) -> bucket.aggregations = aggs);
return new InternalAggregation[] {
new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list))
};
}
@Override
public SignificantLongTerms buildEmptyAggregation() {
// We need to account for the significance of a miss in our global stats - provide corpus size as context
ContextIndexSearcher searcher = context.searcher();
IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs();
return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, 0, supersetSize, significanceHeuristic, emptyList());
}
@Override
public void doClose() {
Releasables.close(bucketOrds, termsAggFactory);
}
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -59,8 +60,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory
implements Releasable {
public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory implements Releasable {
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(
LogManager.getLogger(SignificantTermsAggregatorFactory.class));
@ -103,8 +103,10 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory sigTermsFactory,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
assert collectsFromSingleBucket;
ExecutionMode execution = null;
if (executionHint != null) {
execution = ExecutionMode.fromString(executionHint, deprecationLogger);
@ -124,7 +126,11 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
return execution.create(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent,
significanceHeuristic, sigTermsFactory, metadata);
}
@Override
public boolean needsToCollectFromSingleBucket() {
return true;
}
};
}
@ -147,6 +153,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory sigTermsFactory,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
if ((includeExclude != null) && (includeExclude.isRegexBased())) {
@ -155,7 +162,8 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
"values for include/exclude clauses used to filter numeric fields");
}
if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) {
ValuesSource.Numeric numericValuesSource = (ValuesSource.Numeric) valuesSource;
if (numericValuesSource.isFloatingPoint()) {
throw new UnsupportedOperationException("No support for examining floating point numerics");
}
@ -164,10 +172,15 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
longFilter = includeExclude.convertToLongFilter(format);
}
return new SignificantLongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format,
bucketCountThresholds, context, parent, significanceHeuristic, sigTermsFactory, longFilter,
metadata);
return new NumericTermsAggregator(name, factories,
agg -> agg.new SignificantLongTermsResults(sigTermsFactory, significanceHeuristic, collectsFromSingleBucket),
numericValuesSource, format, null, bucketCountThresholds, context, parent, SubAggCollectionMode.BREADTH_FIRST,
longFilter, collectsFromSingleBucket, metadata);
}
@Override
public boolean needsToCollectFromSingleBucket() {
return false;
}
};
}
@ -203,11 +216,12 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
? null
: filterBuilder.toQuery(queryShardContext);
IndexSearcher searcher = queryShardContext.searcher();
this.supersetNumDocs = filter == null
// Important - need to use the doc count that includes deleted docs
// or we have this issue: https://github.com/elastic/elasticsearch/issues/7951
? searcher.getIndexReader().maxDoc()
: searcher.count(filter);
/*
* We need to use a superset size that includes deleted docs or we
* could end up blowing up with bad statistics that cause us to blow
* up later on.
*/
this.supersetNumDocs = filter == null ? searcher.getIndexReader().maxDoc() : searcher.count(filter);
this.bucketCountThresholds = bucketCountThresholds;
this.significanceHeuristic = significanceHeuristic;
}
@ -220,6 +234,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
}
private FilterableTermsEnum getTermsEnum(String field) throws IOException {
// TODO this method helps because of asMultiBucketAggregator. Once we remove it we can move this logic into the aggregators.
if (termsEnum != null) {
return termsEnum;
}
@ -285,16 +300,16 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(),
SignificantTermsAggregationBuilder.NAME);
if (aggregatorSupplier instanceof SignificantTermsAggregatorSupplier == false) {
throw new AggregationExecutionException("Registry miss-match - expected SignificantTermsAggregatorSupplier, found [" +
aggregatorSupplier.getClass().toString() + "]");
}
SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier;
if (collectsFromSingleBucket == false && sigTermsAggregatorSupplier.needsToCollectFromSingleBucket()) {
return asMultiBucketAggregator(this, searchContext, parent);
}
numberOfAggregatorsCreated++;
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
@ -314,12 +329,10 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}
SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier;
// TODO we should refactor so that we don't need to use this Factory as a singleton (e.g. stop passing `this` to the aggregators)
return sigTermsAggregatorSupplier.build(name, factories, valuesSource, config.format(),
bucketCountThresholds, includeExclude, executionHint, searchContext, parent,
significanceHeuristic, this, metadata);
significanceHeuristic, this, collectsFromSingleBucket, metadata);
}
public enum ExecutionMode {

View File

@ -41,5 +41,8 @@ interface SignificantTermsAggregatorSupplier extends AggregatorSupplier {
Aggregator parent,
SignificanceHeuristic significanceHeuristic,
SignificantTermsAggregatorFactory sigTermsFactory,
boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException;
boolean needsToCollectFromSingleBucket();
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder;
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator.ResultStrategy;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
import org.elasticsearch.search.aggregations.support.AggregatorSupplier;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
@ -48,6 +49,7 @@ import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(TermsAggregatorFactory.class));
@ -162,7 +164,6 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
+ "include/exclude clauses used to filter numeric fields");
}
IncludeExclude.LongFilter longFilter = null;
if (subAggCollectMode == null) {
// TODO can we remove concept of AggregatorFactories.EMPTY?
if (factories != AggregatorFactories.EMPTY) {
@ -171,20 +172,23 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
subAggCollectMode = SubAggCollectionMode.DEPTH_FIRST;
}
}
if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) {
ValuesSource.Numeric numericValuesSource = (ValuesSource.Numeric) valuesSource;
IncludeExclude.LongFilter longFilter = null;
Function<NumericTermsAggregator, ResultStrategy<?, ?>> resultStrategy;
if (numericValuesSource.isFloatingPoint()) {
if (includeExclude != null) {
longFilter = includeExclude.convertToDoubleFilter();
}
return new DoubleTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, order,
bucketCountThresholds, context, parent, subAggCollectMode, showTermDocCountError, longFilter,
collectsFromSingleBucket, metadata);
resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError);
} else {
if (includeExclude != null) {
longFilter = includeExclude.convertToLongFilter(format);
}
resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError);
}
if (includeExclude != null) {
longFilter = includeExclude.convertToLongFilter(format);
}
return new LongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, format, order,
bucketCountThresholds, context, parent, subAggCollectMode, showTermDocCountError, longFilter,
collectsFromSingleBucket, metadata);
return new NumericTermsAggregator(name, factories, resultStrategy, numericValuesSource, format, order,
bucketCountThresholds, context, parent, subAggCollectMode, longFilter, collectsFromSingleBucket, metadata);
}
@Override

View File

@ -65,7 +65,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
@ -815,7 +815,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
}
/**
* {@link LongTermsAggregator} is the first complex bucking aggregation
* {@link NumericTermsAggregator} is the first complex bucking aggregation
* that stopped wrapping itself in {@link AggregatorFactory#asMultiBucketAggregator}
* so this tests that nested works properly inside of it.
*/

View File

@ -38,7 +38,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTermsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.NumericTermsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
@ -247,7 +247,7 @@ public class ReverseNestedAggregatorTests extends AggregatorTestCase {
}
/**
* {@link LongTermsAggregator} is the first complex bucking aggregation
* {@link NumericTermsAggregator} is the first complex bucking aggregation
* that stopped wrapping itself in {@link AggregatorFactory#asMultiBucketAggregator}
* so this tests that nested works properly inside of it.
*/

View File

@ -23,14 +23,17 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
@ -60,6 +63,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms;
import static org.hamcrest.Matchers.equalTo;
public class SignificantTermsAggregatorTests extends AggregatorTestCase {
@ -372,6 +376,52 @@ public class SignificantTermsAggregatorTests extends AggregatorTestCase {
}
}
public void testThreeLayerLong() throws IOException {
try (Directory dir = newDirectory()) {
try (RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
for (int k = 0; k < 10; k++) {
Document d = new Document();
d.add(new SortedNumericDocValuesField("i", i));
d.add(new SortedNumericDocValuesField("j", j));
d.add(new SortedNumericDocValuesField("k", k));
writer.addDocument(d);
}
}
}
try (IndexReader reader = maybeWrapReaderEs(writer.getReader())) {
IndexSearcher searcher = newIndexSearcher(reader);
SignificantTermsAggregationBuilder request = new SignificantTermsAggregationBuilder("i").field("i")
.subAggregation(new SignificantTermsAggregationBuilder("j").field("j")
.subAggregation(new SignificantTermsAggregationBuilder("k").field("k")));
SignificantLongTerms result = search(searcher, new MatchAllDocsQuery(), request,
longField("i"), longField("j"), longField("k"));
for (int i = 0; i < 10; i++) {
SignificantLongTerms.Bucket iBucket = result.getBucketByKey(Integer.toString(i));
assertThat(iBucket.getDocCount(), equalTo(100L));
SignificantLongTerms jAgg = iBucket.getAggregations().get("j");
for (int j = 0; j < 10; j++) {
SignificantLongTerms.Bucket jBucket = jAgg.getBucketByKey(Integer.toString(j));
assertThat(jBucket.getDocCount(), equalTo(10L));
SignificantLongTerms kAgg = jBucket.getAggregations().get("k");
for (int k = 0; k < 10; k++) {
SignificantLongTerms.Bucket kBucket = kAgg.getBucketByKey(Integer.toString(k));
assertThat(kBucket.getDocCount(), equalTo(1L));
}
}
}
}
}
}
}
private NumberFieldMapper.NumberFieldType longField(String name) {
NumberFieldMapper.NumberFieldType type = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
type.setName(name);
return type;
}
private void addMixedTextDocs(TextFieldType textFieldType, IndexWriter w) throws IOException {
for (int i = 0; i < 10; i++) {
Document doc = new Document();

View File

@ -39,6 +39,8 @@ import org.elasticsearch.index.mapper.TextFieldMapper.TextFieldType;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler;
import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantTextAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import java.io.IOException;