Aggregations: change to default shard_size in terms aggregation

The default shard size in the terms aggregation now uses BucketUtils.suggestShardSideQueueSize() to set the shard size if the user does not specify it as a parameter.

Closes #6857
This commit is contained in:
Colin Goodheart-Smithe 2014-07-22 16:01:58 +01:00
parent 5487c56c70
commit dc9e9cb4cc
2 changed files with 167 additions and 10 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
import org.elasticsearch.search.internal.SearchContext;
@ -32,7 +33,6 @@ import java.io.IOException;
*/
public class TermsParser implements Aggregator.Parser {
@Override
public String type() {
return StringTerms.TYPE.name();
@ -41,19 +41,22 @@ public class TermsParser implements Aggregator.Parser {
@Override
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
TermsParametersParser aggParser = new TermsParametersParser();
ValuesSourceParser vsParser = ValuesSourceParser.any(aggregationName, StringTerms.TYPE, context)
.scriptable(true)
.formattable(true)
.requiresSortedValues(true)
.requiresUniqueValues(true)
.build();
ValuesSourceParser vsParser = ValuesSourceParser.any(aggregationName, StringTerms.TYPE, context).scriptable(true).formattable(true)
.requiresSortedValues(true).requiresUniqueValues(true).build();
IncludeExclude.Parser incExcParser = new IncludeExclude.Parser(aggregationName, StringTerms.TYPE, context);
aggParser.parse(aggregationName, parser, context, vsParser, incExcParser);
TermsAggregator.BucketCountThresholds bucketCountThresholds = aggParser.getBucketCountThresholds();
bucketCountThresholds.ensureValidity();
InternalOrder order = resolveOrder(aggParser.getOrderKey(), aggParser.isOrderAsc());
return new TermsAggregatorFactory(aggregationName, vsParser.config(), order, bucketCountThresholds, aggParser.getIncludeExclude(), aggParser.getExecutionHint(), aggParser.getCollectionMode());
TermsAggregator.BucketCountThresholds bucketCountThresholds = aggParser.getBucketCountThresholds();
if (!(order == InternalOrder.TERM_ASC || order == InternalOrder.TERM_DESC)
&& bucketCountThresholds.getShardSize() == aggParser.getDefaultBucketCountThresholds().getShardSize()) {
// The user has not made a shardSize selection. Use default heuristic to avoid any wrong-ranking caused by distributed counting
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
}
bucketCountThresholds.ensureValidity();
return new TermsAggregatorFactory(aggregationName, vsParser.config(), order, bucketCountThresholds, aggParser.getIncludeExclude(),
aggParser.getExecutionHint(), aggParser.getCollectionMode());
}
static InternalOrder resolveOrder(String key, boolean asc) {

View File

@ -45,6 +45,31 @@ public class ShardSizeTermsTests extends ShardSizeTests {
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<String, Long> expected = ImmutableMap.<String, Long>builder()
.put("1", 8l)
.put("3", 8l)
.put("2", 5l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsText().string())));
}
}
@Test
public void shardSizeEqualsSize_string() throws Exception {
createIdx("type=string,index=not_analyzed");
indexData();
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3).shardSize(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
@ -109,6 +134,31 @@ public class ShardSizeTermsTests extends ShardSizeTests {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKey())));
}
}
@Test
public void noShardSizeTermOrder_string() throws Exception {
createIdx("type=string,index=not_analyzed");
indexData();
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<String, Long> expected = ImmutableMap.<String, Long>builder()
.put("1", 8l)
.put("2", 5l)
.put("3", 8l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsText().string())));
}
}
@Test
public void noShardSize_long() throws Exception {
@ -123,6 +173,32 @@ public class ShardSizeTermsTests extends ShardSizeTests {
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder()
.put(1, 8l)
.put(3, 8l)
.put(2, 5l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}
@Test
public void shardSizeEqualsSize_long() throws Exception {
createIdx("type=long");
indexData();
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3).shardSize(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
@ -188,6 +264,32 @@ public class ShardSizeTermsTests extends ShardSizeTests {
}
}
@Test
public void noShardSizeTermOrder_long() throws Exception {
createIdx("type=long");
indexData();
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder()
.put(1, 8l)
.put(2, 5l)
.put(3, 8l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}
@Test
public void noShardSize_double() throws Exception {
@ -201,6 +303,32 @@ public class ShardSizeTermsTests extends ShardSizeTests {
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder()
.put(1, 8l)
.put(3, 8l)
.put(2, 5l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}
@Test
public void shardSizeEqualsSize_double() throws Exception {
createIdx("type=double");
indexData();
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3).shardSize(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
@ -265,4 +393,30 @@ public class ShardSizeTermsTests extends ShardSizeTests {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}
@Test
public void noShardSizeTermOrder_double() throws Exception {
createIdx("type=double");
indexData();
SearchResponse response = client().prepareSearch("idx").setTypes("type")
.setQuery(matchAllQuery())
.addAggregation(terms("keys").field("key").size(3)
.collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true)))
.execute().actionGet();
Terms terms = response.getAggregations().get("keys");
Collection<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(3));
Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder()
.put(1, 8l)
.put(2, 5l)
.put(3, 8l)
.build();
for (Terms.Bucket bucket : buckets) {
assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue())));
}
}
}