From dc9e9cb4cc87f132a32a00e6589d807350f0b8e0 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Tue, 22 Jul 2014 16:01:58 +0100 Subject: [PATCH] Aggregations: change to default shard_size in terms aggregation The default shard size in the terms aggregation now uses BucketUtils.suggestShardSideQueueSize() to set the shard size if the user does not specify it as a parameter. Closes #6857 --- .../bucket/terms/TermsParser.java | 23 +-- .../bucket/ShardSizeTermsTests.java | 154 ++++++++++++++++++ 2 files changed, 167 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java index c4b57064e80..c38f136dd9b 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.terms; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.BucketUtils; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.support.ValuesSourceParser; import org.elasticsearch.search.internal.SearchContext; @@ -32,7 +33,6 @@ import java.io.IOException; */ public class TermsParser implements Aggregator.Parser { - @Override public String type() { return StringTerms.TYPE.name(); @@ -41,19 +41,22 @@ public class TermsParser implements Aggregator.Parser { @Override public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException { TermsParametersParser aggParser = new TermsParametersParser(); - ValuesSourceParser vsParser = ValuesSourceParser.any(aggregationName, StringTerms.TYPE, context) - .scriptable(true) - .formattable(true) - .requiresSortedValues(true) - .requiresUniqueValues(true) - .build(); + ValuesSourceParser vsParser = ValuesSourceParser.any(aggregationName, StringTerms.TYPE, context).scriptable(true).formattable(true) + .requiresSortedValues(true).requiresUniqueValues(true).build(); IncludeExclude.Parser incExcParser = new IncludeExclude.Parser(aggregationName, StringTerms.TYPE, context); aggParser.parse(aggregationName, parser, context, vsParser, incExcParser); - TermsAggregator.BucketCountThresholds bucketCountThresholds = aggParser.getBucketCountThresholds(); - bucketCountThresholds.ensureValidity(); InternalOrder order = resolveOrder(aggParser.getOrderKey(), aggParser.isOrderAsc()); - return new TermsAggregatorFactory(aggregationName, vsParser.config(), order, bucketCountThresholds, aggParser.getIncludeExclude(), aggParser.getExecutionHint(), aggParser.getCollectionMode()); + TermsAggregator.BucketCountThresholds bucketCountThresholds = aggParser.getBucketCountThresholds(); + if (!(order == InternalOrder.TERM_ASC || order == InternalOrder.TERM_DESC) + && bucketCountThresholds.getShardSize() == aggParser.getDefaultBucketCountThresholds().getShardSize()) { + // The user has not made a shardSize selection. Use default heuristic to avoid any wrong-ranking caused by distributed counting + bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(), + context.numberOfShards())); + } + bucketCountThresholds.ensureValidity(); + return new TermsAggregatorFactory(aggregationName, vsParser.config(), order, bucketCountThresholds, aggParser.getIncludeExclude(), + aggParser.getExecutionHint(), aggParser.getCollectionMode()); } static InternalOrder resolveOrder(String key, boolean asc) { diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardSizeTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardSizeTermsTests.java index 7251617f374..4bdaecc646d 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardSizeTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardSizeTermsTests.java @@ -45,6 +45,31 @@ public class ShardSizeTermsTests extends ShardSizeTests { .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false))) .execute().actionGet(); + Terms terms = response.getAggregations().get("keys"); + Collection buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 5l) + .build(); + for (Terms.Bucket bucket : buckets) { + assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsText().string()))); + } + } + + @Test + public void shardSizeEqualsSize_string() throws Exception { + createIdx("type=string,index=not_analyzed"); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addAggregation(terms("keys").field("key").size(3).shardSize(3) + .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false))) + .execute().actionGet(); + Terms terms = response.getAggregations().get("keys"); Collection buckets = terms.getBuckets(); assertThat(buckets.size(), equalTo(3)); @@ -109,6 +134,31 @@ public class ShardSizeTermsTests extends ShardSizeTests { assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKey()))); } } + + @Test + public void noShardSizeTermOrder_string() throws Exception { + createIdx("type=string,index=not_analyzed"); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addAggregation(terms("keys").field("key").size(3) + .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true))) + .execute().actionGet(); + + Terms terms = response.getAggregations().get("keys"); + Collection buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("2", 5l) + .put("3", 8l) + .build(); + for (Terms.Bucket bucket : buckets) { + assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsText().string()))); + } + } @Test public void noShardSize_long() throws Exception { @@ -123,6 +173,32 @@ public class ShardSizeTermsTests extends ShardSizeTests { .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false))) .execute().actionGet(); + Terms terms = response.getAggregations().get("keys"); + Collection buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .build(); + for (Terms.Bucket bucket : buckets) { + assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); + } + } + + @Test + public void shardSizeEqualsSize_long() throws Exception { + + createIdx("type=long"); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addAggregation(terms("keys").field("key").size(3).shardSize(3) + .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false))) + .execute().actionGet(); + Terms terms = response.getAggregations().get("keys"); Collection buckets = terms.getBuckets(); assertThat(buckets.size(), equalTo(3)); @@ -188,6 +264,32 @@ public class ShardSizeTermsTests extends ShardSizeTests { } } + @Test + public void noShardSizeTermOrder_long() throws Exception { + + createIdx("type=long"); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addAggregation(terms("keys").field("key").size(3) + .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true))) + .execute().actionGet(); + + Terms terms = response.getAggregations().get("keys"); + Collection buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(2, 5l) + .put(3, 8l) + .build(); + for (Terms.Bucket bucket : buckets) { + assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); + } + } + @Test public void noShardSize_double() throws Exception { @@ -201,6 +303,32 @@ public class ShardSizeTermsTests extends ShardSizeTests { .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false))) .execute().actionGet(); + Terms terms = response.getAggregations().get("keys"); + Collection buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .build(); + for (Terms.Bucket bucket : buckets) { + assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); + } + } + + @Test + public void shardSizeEqualsSize_double() throws Exception { + + createIdx("type=double"); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addAggregation(terms("keys").field("key").size(3).shardSize(3) + .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.count(false))) + .execute().actionGet(); + Terms terms = response.getAggregations().get("keys"); Collection buckets = terms.getBuckets(); assertThat(buckets.size(), equalTo(3)); @@ -265,4 +393,30 @@ public class ShardSizeTermsTests extends ShardSizeTests { assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); } } + + @Test + public void noShardSizeTermOrder_double() throws Exception { + + createIdx("type=double"); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addAggregation(terms("keys").field("key").size(3) + .collectMode(randomFrom(SubAggCollectionMode.values())).order(Terms.Order.term(true))) + .execute().actionGet(); + + Terms terms = response.getAggregations().get("keys"); + Collection buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(2, 5l) + .put(3, 8l) + .build(); + for (Terms.Bucket bucket : buckets) { + assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); + } + } }