Use the breadth first collection mode for significant terms aggs. (#29042)

This helps avoid memory issues when computing deep sub-aggregations. Because it
should be rare to use sub-aggregations with significant terms, we opted to always
choose breadth first as opposed to exposing a `collect_mode` option.

Closes #28652.
This commit is contained in:
Antonio Matarrese 2019-04-11 17:38:25 -05:00 committed by Julie Tibshirani
parent 0f496842fd
commit 79c7a57737
6 changed files with 71 additions and 8 deletions

View File

@ -542,6 +542,12 @@ It is possible (although rarely required) to filter the values for which buckets
`exclude` parameters which are based on a regular expression string or arrays of exact terms. This functionality mirrors the features
described in the <<search-aggregations-bucket-terms-aggregation,terms aggregation>> documentation.
==== Collect mode
To avoid memory issues, the `significant_terms` aggregation always computes child aggregations in `breadth_first` mode.
A description of the different collection modes can be found in the
<<search-aggregations-bucket-terms-aggregation-collect, terms aggregation>> documentation.
==== Execution hint
There are different mechanisms by which terms aggregations can be executed:

View File

@ -775,6 +775,7 @@ fields, then use `copy_to` in your mapping to create a new dedicated field at
index time which contains the values from both fields. You can aggregate on
this single field, which will benefit from the global ordinals optimization.
[[search-aggregations-bucket-terms-aggregation-collect]]
==== Collect mode
Deferring calculation of child aggregations

View File

@ -65,7 +65,7 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent,
forceRemapGlobalOrds, SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
forceRemapGlobalOrds, SubAggCollectionMode.BREADTH_FIRST, false, pipelineAggregators, metaData);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
this.numCollectedDocs = 0;
@ -146,12 +146,19 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
}
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = ordered.pop();
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
}
runDeferredCollections(survivingBucketOrds);
for (SignificantStringTerms.Bucket bucket : list) {
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),

View File

@ -50,7 +50,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, valuesSource, format, null, bucketCountThresholds, context, parent,
SubAggCollectionMode.DEPTH_FIRST, false, includeExclude, pipelineAggregators, metaData);
SubAggCollectionMode.BREADTH_FIRST, false, includeExclude, pipelineAggregators, metaData);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
}
@ -106,12 +106,20 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
}
}
final SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantLongTerms.Bucket bucket = ordered.pop();
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
}
runDeferredCollections(survivingBucketOrds);
for (SignificantLongTerms.Bucket bucket : list) {
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
}
return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
pipelineAggregators(), metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
}

View File

@ -57,7 +57,7 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, aggregationContext, parent,
SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
SubAggCollectionMode.BREADTH_FIRST, false, pipelineAggregators, metaData);
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
}
@ -113,12 +113,20 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
}
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
final long[] survivingBucketOrds = new long[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
survivingBucketOrds[i] = bucket.bucketOrd;
list[i] = bucket;
}
runDeferredCollections(survivingBucketOrds);
for (SignificantStringTerms.Bucket bucket : list) {
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be
// recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}
return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.plugins.Plugin;
@ -38,6 +39,7 @@ import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
@ -543,6 +545,37 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
}
}
/**
* A simple test that adds a sub-aggregation to a significant terms aggregation,
* to help check that sub-aggregation collection is handled correctly.
*/
public void testSubAggregations() throws Exception {
indexEqualTestData();
QueryBuilder query = QueryBuilders.termsQuery(TEXT_FIELD, "a", "b");
AggregationBuilder subAgg = terms("class").field(CLASS_FIELD);
AggregationBuilder agg = significantTerms("significant_terms")
.field(TEXT_FIELD)
.executionHint(randomExecutionHint())
.significanceHeuristic(new ChiSquare(true, true))
.minDocCount(1).shardSize(1000).size(1000)
.subAggregation(subAgg);
SearchResponse response = client().prepareSearch("test")
.setQuery(query)
.addAggregation(agg)
.get();
assertSearchResponse(response);
SignificantTerms sigTerms = response.getAggregations().get("significant_terms");
assertThat(sigTerms.getBuckets().size(), equalTo(2));
for (SignificantTerms.Bucket bucket : sigTerms) {
StringTerms terms = bucket.getAggregations().get("class");
assertThat(terms.getBuckets().size(), equalTo(2));
}
}
private void indexEqualTestData() throws ExecutionException, InterruptedException {
assertAcked(prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))