Set collection mode to breadth_first in the terms aggregation when the cardinality of the field is unknown or smaller than the requested size.

closes #9825
This commit is contained in:
Jim Ferenczi 2016-06-08 12:46:11 +02:00
parent 01004c72ba
commit ad232aebbe
5 changed files with 101 additions and 23 deletions

View File

@ -166,16 +166,16 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
int doc = 0; int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next(); doc += docDeltaIterator.next();
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
}
// aggregations should only be replayed on matching documents
assert docIt.docID() == doc;
}
final long bucket = buckets.next(); final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket); final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) { if (rebasedBucket != -1) {
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
}
// aggregations should only be replayed on matching documents
assert docIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket); leafCollector.collect(doc, rebasedBucket);
} }
} }

View File

@ -56,7 +56,7 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
private Terms.Order order = Terms.Order.compound(Terms.Order.count(false), Terms.Order.term(true)); private Terms.Order order = Terms.Order.compound(Terms.Order.count(false), Terms.Order.term(true));
private IncludeExclude includeExclude = null; private IncludeExclude includeExclude = null;
private String executionHint = null; private String executionHint = null;
private SubAggCollectionMode collectMode = SubAggCollectionMode.DEPTH_FIRST; private SubAggCollectionMode collectMode = null;
private TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( private TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds(
DEFAULT_BUCKET_COUNT_THRESHOLDS); DEFAULT_BUCKET_COUNT_THRESHOLDS);
private boolean showTermDocCountError = false; private boolean showTermDocCountError = false;
@ -71,7 +71,7 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
public TermsAggregationBuilder(StreamInput in) throws IOException { public TermsAggregationBuilder(StreamInput in) throws IOException {
super(in, StringTerms.TYPE, ValuesSourceType.ANY); super(in, StringTerms.TYPE, ValuesSourceType.ANY);
bucketCountThresholds = new BucketCountThresholds(in); bucketCountThresholds = new BucketCountThresholds(in);
collectMode = SubAggCollectionMode.readFromStream(in); collectMode = in.readOptionalWriteable(SubAggCollectionMode::readFromStream);
executionHint = in.readOptionalString(); executionHint = in.readOptionalString();
includeExclude = in.readOptionalWriteable(IncludeExclude::new); includeExclude = in.readOptionalWriteable(IncludeExclude::new);
order = InternalOrder.Streams.readOrder(in); order = InternalOrder.Streams.readOrder(in);
@ -86,7 +86,7 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
@Override @Override
protected void innerWriteTo(StreamOutput out) throws IOException { protected void innerWriteTo(StreamOutput out) throws IOException {
bucketCountThresholds.writeTo(out); bucketCountThresholds.writeTo(out);
collectMode.writeTo(out); out.writeOptionalWriteable(collectMode);
out.writeOptionalString(executionHint); out.writeOptionalString(executionHint);
out.writeOptionalWriteable(includeExclude); out.writeOptionalWriteable(includeExclude);
InternalOrder.Streams.writeOrder(order, out); InternalOrder.Streams.writeOrder(order, out);
@ -266,7 +266,9 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
} }
builder.field(ORDER_FIELD.getPreferredName()); builder.field(ORDER_FIELD.getPreferredName());
order.toXContent(builder, params); order.toXContent(builder, params);
builder.field(SubAggCollectionMode.KEY.getPreferredName(), collectMode.parseField().getPreferredName()); if (collectMode != null) {
builder.field(SubAggCollectionMode.KEY.getPreferredName(), collectMode.parseField().getPreferredName());
}
if (includeExclude != null) { if (includeExclude != null) {
includeExclude.toXContent(builder, params); includeExclude.toXContent(builder, params);
} }

View File

@ -150,14 +150,22 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
} }
} }
} }
SubAggCollectionMode cm = collectMode;
if (cm == null) {
cm = SubAggCollectionMode.DEPTH_FIRST;
if (factories != AggregatorFactories.EMPTY) {
cm = subAggCollectionMode(bucketCountThresholds.getShardSize(), maxOrd);
}
}
DocValueFormat format = config.format(); DocValueFormat format = config.format();
if ((includeExclude != null) && (includeExclude.isRegexBased()) && format != DocValueFormat.RAW) { if ((includeExclude != null) && (includeExclude.isRegexBased()) && format != DocValueFormat.RAW) {
throw new AggregationExecutionException("Aggregation [" + name + "] cannot support regular expression style include/exclude " throw new AggregationExecutionException("Aggregation [" + name + "] cannot support regular expression style include/exclude "
+ "settings as they can only be applied to string fields. Use an array of values for include/exclude clauses"); + "settings as they can only be applied to string fields. Use an array of values for include/exclude clauses");
} }
return execution.create(name, factories, valuesSource, order, format, bucketCountThresholds, includeExclude, context, parent, return execution.create(name, factories, valuesSource, order, format, bucketCountThresholds, includeExclude, context, parent,
collectMode, showTermDocCountError, pipelineAggregators, metaData); cm, showTermDocCountError, pipelineAggregators, metaData);
} }
if ((includeExclude != null) && (includeExclude.isRegexBased())) { if ((includeExclude != null) && (includeExclude.isRegexBased())) {
@ -167,19 +175,27 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
if (valuesSource instanceof ValuesSource.Numeric) { if (valuesSource instanceof ValuesSource.Numeric) {
IncludeExclude.LongFilter longFilter = null; IncludeExclude.LongFilter longFilter = null;
SubAggCollectionMode cm = collectMode;
if (cm == null) {
if (factories != AggregatorFactories.EMPTY) {
cm = subAggCollectionMode(bucketCountThresholds.getShardSize(), -1);
} else {
cm = SubAggCollectionMode.DEPTH_FIRST;
}
}
if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) { if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) {
if (includeExclude != null) { if (includeExclude != null) {
longFilter = includeExclude.convertToDoubleFilter(); longFilter = includeExclude.convertToDoubleFilter();
} }
return new DoubleTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, config.format(), order, return new DoubleTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, config.format(), order,
bucketCountThresholds, context, parent, collectMode, showTermDocCountError, longFilter, bucketCountThresholds, context, parent, cm, showTermDocCountError, longFilter,
pipelineAggregators, metaData); pipelineAggregators, metaData);
} }
if (includeExclude != null) { if (includeExclude != null) {
longFilter = includeExclude.convertToLongFilter(config.format()); longFilter = includeExclude.convertToLongFilter(config.format());
} }
return new LongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, config.format(), order, return new LongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, config.format(), order,
bucketCountThresholds, context, parent, collectMode, showTermDocCountError, longFilter, pipelineAggregators, bucketCountThresholds, context, parent, cm, showTermDocCountError, longFilter, pipelineAggregators,
metaData); metaData);
} }
@ -187,6 +203,20 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
+ "]. It can only be applied to numeric or string fields."); + "]. It can only be applied to numeric or string fields.");
} }
// return the SubAggCollectionMode that this aggregation should use based on the expected size
// and the cardinality of the field
static SubAggCollectionMode subAggCollectionMode(int expectedSize, long maxOrd) {
if (expectedSize == Integer.MAX_VALUE) {
// return all buckets
return SubAggCollectionMode.DEPTH_FIRST;
}
if (maxOrd == -1 || maxOrd > expectedSize) {
// use breadth_first if the cardinality is bigger than the expected size or unknown (-1)
return SubAggCollectionMode.BREADTH_FIRST;
}
return SubAggCollectionMode.DEPTH_FIRST;
}
public enum ExecutionMode { public enum ExecutionMode {
MAP(new ParseField("map")) { MAP(new ParseField("map")) {

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.test.ESSingleNodeTestCase;
import static org.hamcrest.Matchers.equalTo;
public class TermsAggregatorFactoryTests extends ESSingleNodeTestCase {
public void testSubAggCollectMode() throws Exception {
assertThat(TermsAggregatorFactory.subAggCollectionMode(Integer.MAX_VALUE, -1),
equalTo(Aggregator.SubAggCollectionMode.DEPTH_FIRST));
assertThat(TermsAggregatorFactory.subAggCollectionMode(10, -1),
equalTo(Aggregator.SubAggCollectionMode.BREADTH_FIRST));
assertThat(TermsAggregatorFactory.subAggCollectionMode(10, 5),
equalTo(Aggregator.SubAggCollectionMode.DEPTH_FIRST));
assertThat(TermsAggregatorFactory.subAggCollectionMode(10, 10),
equalTo(Aggregator.SubAggCollectionMode.DEPTH_FIRST));
assertThat(TermsAggregatorFactory.subAggCollectionMode(10, 100),
equalTo(Aggregator.SubAggCollectionMode.BREADTH_FIRST));
assertThat(TermsAggregatorFactory.subAggCollectionMode(1, 2),
equalTo(Aggregator.SubAggCollectionMode.BREADTH_FIRST));
assertThat(TermsAggregatorFactory.subAggCollectionMode(1, 100),
equalTo(Aggregator.SubAggCollectionMode.BREADTH_FIRST));
}
}

View File

@ -577,7 +577,8 @@ Deferring calculation of child aggregations
For fields with many unique terms and a small number of required results it can be more efficient to delay the calculation For fields with many unique terms and a small number of required results it can be more efficient to delay the calculation
of child aggregations until the top parent-level aggs have been pruned. Ordinarily, all branches of the aggregation tree of child aggregations until the top parent-level aggs have been pruned. Ordinarily, all branches of the aggregation tree
are expanded in one depth-first pass and only then any pruning occurs. In some rare scenarios this can be very wasteful and can hit memory constraints. are expanded in one depth-first pass and only then any pruning occurs.
In some scenarios this can be very wasteful and can hit memory constraints.
An example problem scenario is querying a movie database for the 10 most popular actors and their 5 most common co-stars: An example problem scenario is querying a movie database for the 10 most popular actors and their 5 most common co-stars:
[source,js] [source,js]
@ -602,10 +603,13 @@ An example problem scenario is querying a movie database for the 10 most popular
} }
-------------------------------------------------- --------------------------------------------------
Even though the number of movies may be comparatively small and we want only 50 result buckets there is a combinatorial explosion of buckets Even though the number of actors may be comparatively small and we want only 50 result buckets there is a combinatorial explosion of buckets
during calculation - a single movie will produce n² buckets where n is the number of actors. The sane option would be to first determine during calculation - a single actor can produce n² buckets where n is the number of actors. The sane option would be to first determine
the 10 most popular actors and only then examine the top co-stars for these 10 actors. This alternative strategy is what we call the `breadth_first` collection the 10 most popular actors and only then examine the top co-stars for these 10 actors. This alternative strategy is what we call the `breadth_first` collection
mode as opposed to the default `depth_first` mode: mode as opposed to the `depth_first` mode.
NOTE: The `breadth_first` is the default mode for fields with a cardinality bigger than the requested size or when the cardinality is unknown (numeric fields or scripts for instance).
It is possible to override the default heuristic and to provide a collect mode directly in the request:
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
@ -615,7 +619,7 @@ mode as opposed to the default `depth_first` mode:
"terms" : { "terms" : {
"field" : "actors", "field" : "actors",
"size" : 10, "size" : 10,
"collect_mode" : "breadth_first" "collect_mode" : "breadth_first" <1>
}, },
"aggs" : { "aggs" : {
"costars" : { "costars" : {
@ -630,12 +634,10 @@ mode as opposed to the default `depth_first` mode:
} }
-------------------------------------------------- --------------------------------------------------
<1> the possible values are `breadth_first` and `depth_first`
When using `breadth_first` mode the set of documents that fall into the uppermost buckets are When using `breadth_first` mode the set of documents that fall into the uppermost buckets are
cached for subsequent replay so there is a memory overhead in doing this which is linear with the number of matching documents. cached for subsequent replay so there is a memory overhead in doing this which is linear with the number of matching documents.
In most requests the volume of buckets generated is smaller than the number of documents that fall into them so the default `depth_first`
collection mode is normally the best bet but occasionally the `breadth_first` strategy can be significantly more efficient. Currently
elasticsearch will always use the `depth_first` collect_mode unless explicitly instructed to use `breadth_first` as in the above example.
Note that the `order` parameter can still be used to refer to data from a child aggregation when using the `breadth_first` setting - the parent Note that the `order` parameter can still be used to refer to data from a child aggregation when using the `breadth_first` setting - the parent
aggregation understands that this child aggregation will need to be called first before any of the other child aggregations. aggregation understands that this child aggregation will need to be called first before any of the other child aggregations.