From 42ea6449033d82902a86285ae6db84412c8c7f35 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 2 Jan 2019 17:45:49 +0100 Subject: [PATCH] Remove single shard optimization when suggesting shard_size (#37041) When executing terms aggregations we set the shard_size, meaning the number of buckets to collect on each shard, to a value that's higher than the number of requested buckets, to guarantee some basic level of precision. We have an optimization in place so that we leave shard_size set to size whenever we are searching against a single shard, in which case maximum precision is guaranteed by definition. Such optimization requires us access to the total number of shards that the search is executing against. In the context of cross-cluster search, once we will introduce multiple reduction steps (one per cluster) each cluster will only know the number of local shards, which is problematic as we should only optimize if we are searching against a single shard in a single cluster. It could be that we are searching against one shard per cluster in which case the current code would optimize number of terms causing a loss of precision. While discussing how to address the CCS scenario, we decided that we do not want to introduce further complexity caused by this single shard optimization, as it benefits only a minority of cases, especially when the benefits are not so great. This commit removes the single shard optimization, meaning that we will always have heuristic enabled on how many number of buckets to collect on the shards, even when searching against a single shard. This will cause more buckets to be collected when searching against a single shard compared to before. If that becomes a problem for some users, they can work around that by setting the shard_size equal to the size. Relates to #32125 --- .../significantterms-aggregation.asciidoc | 2 +- .../significanttext-aggregation.asciidoc | 2 +- .../bucket/terms-aggregation.asciidoc | 3 +- .../aggregations/bucket/BucketUtils.java | 8 +---- .../geogrid/GeoGridAggregationBuilder.java | 2 +- .../SignificantTermsAggregatorFactory.java | 3 +- .../SignificantTextAggregatorFactory.java | 3 +- .../bucket/terms/TermsAggregatorFactory.java | 3 +- .../aggregations/bucket/BucketUtilsTests.java | 15 ++-------- .../bucket/TermsShardMinDocCountIT.java | 29 +++++++++---------- 10 files changed, 24 insertions(+), 46 deletions(-) diff --git a/docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc b/docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc index 0a8a46a0b67..bfaeecc1f82 100644 --- a/docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc @@ -448,7 +448,7 @@ If the number of unique terms is greater than `size`, the returned list can be s size buckets was not returned). To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard -using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter +(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter can be used to control the volumes of candidate terms produced by each shard. Low-frequency terms can turn out to be the most interesting ones once all results are combined so the diff --git a/docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc b/docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc index a541eb0ac14..429c822d362 100644 --- a/docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc @@ -364,7 +364,7 @@ If the number of unique terms is greater than `size`, the returned list can be s size buckets was not returned). To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard -using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter +(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter can be used to control the volumes of candidate terms produced by each shard. Low-frequency terms can turn out to be the most interesting ones once all results are combined so the diff --git a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc index 1562bf41074..188b2ed3774 100644 --- a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc @@ -220,8 +220,7 @@ NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sens override it and reset it to be equal to `size`. -The default `shard_size` will be `size` if the search request needs to go to a single shard, and `(size * 1.5 + 10)` -otherwise. +The default `shard_size` is `(size * 1.5 + 10)`. ==== Calculating Document Count Error diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java index 17b50fa9bef..823a2b1e434 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java @@ -31,18 +31,12 @@ public final class BucketUtils { * * @param finalSize * The number of terms required in the final reduce phase. - * @param singleShard - * whether a single shard is being queried, or multiple shards * @return A suggested default for the size of any shard-side PriorityQueues */ - public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) { + public static int suggestShardSideQueueSize(int finalSize) { if (finalSize < 1) { throw new IllegalArgumentException("size must be positive, got " + finalSize); } - if (singleShard) { - // In the case of a single shard, we do not need to over-request - return finalSize; - } // Request 50% more buckets on the shards in order to improve accuracy // as well as a small constant that should help with small values of 'size' final long shardSampleSize = (long) (finalSize * 1.5 + 10); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java index 353f391f213..38469ff8753 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java @@ -157,7 +157,7 @@ public class GeoGridAggregationBuilder extends ValuesSourceAggregationBuilder BucketUtils.suggestShardSideQueueSize(0, randomBoolean())); + () -> BucketUtils.suggestShardSideQueueSize(0)); assertEquals(e.getMessage(), "size must be positive, got 0"); } - public void testOptimizesSingleShard() { - for (int iter = 0; iter < 10; ++iter) { - final int size = randomIntBetween(1, Integer.MAX_VALUE); - assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true)); - } - } - public void testOverFlow() { for (int iter = 0; iter < 10; ++iter) { final int size = Integer.MAX_VALUE - randomInt(10); - final int numberOfShards = randomIntBetween(1, 10); - final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1); + final int shardSize = BucketUtils.suggestShardSideQueueSize( size); assertThat(shardSize, greaterThanOrEqualTo(shardSize)); } } @@ -50,8 +42,7 @@ public class BucketUtilsTests extends ESTestCase { public void testShardSizeIsGreaterThanGlobalSize() { for (int iter = 0; iter < 10; ++iter) { final int size = randomIntBetween(1, Integer.MAX_VALUE); - final int numberOfShards = randomIntBetween(1, 10); - final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1); + final int shardSize = BucketUtils.suggestShardSideQueueSize( size); assertThat(shardSize, greaterThanOrEqualTo(size)); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java index 0f685ded62c..a3311db1135 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java @@ -23,11 +23,11 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms; import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregatorFactory; import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.test.ESIntegTestCase; import java.util.ArrayList; @@ -35,17 +35,18 @@ import java.util.List; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.elasticsearch.search.aggregations.AggregationBuilders.filter; import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; public class TermsShardMinDocCountIT extends ESIntegTestCase { private static final String index = "someindex"; private static final String type = "testtype"; - public String randomExecutionHint() { + + private static String randomExecutionHint() { return randomBoolean() ? null : randomFrom(SignificantTermsAggregatorFactory.ExecutionMode.values()).toString(); } @@ -74,7 +75,7 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase { SearchResponse response = client().prepareSearch(index) .addAggregation( (filter("inclass", QueryBuilders.termQuery("class", true))) - .subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2) + .subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2).shardSize(2) .executionHint(randomExecutionHint())) ) .get(); @@ -87,16 +88,14 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase { response = client().prepareSearch(index) .addAggregation( (filter("inclass", QueryBuilders.termQuery("class", true))) - .subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2) - .shardMinDocCount(2).size(2) - .executionHint(randomExecutionHint())) + .subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).shardSize(2) + .shardMinDocCount(2).size(2).executionHint(randomExecutionHint())) ) .get(); assertSearchResponse(response); filteredBucket = response.getAggregations().get("inclass"); sigterms = filteredBucket.getAggregations().get("mySignificantTerms"); assertThat(sigterms.getBuckets().size(), equalTo(2)); - } private void addTermsDocs(String term, int numInClass, int numNotInClass, List builders) { @@ -133,19 +132,18 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase { // first, check that indeed when not setting the shardMinDocCount parameter 0 terms are returned SearchResponse response = client().prepareSearch(index) .addAggregation( - terms("myTerms").field("text").minDocCount(2).size(2).executionHint(randomExecutionHint()) - .order(BucketOrder.key(true)) + terms("myTerms").field("text").minDocCount(2).size(2).shardSize(2).executionHint(randomExecutionHint()) + .order(BucketOrder.key(true)) ) .get(); assertSearchResponse(response); Terms sigterms = response.getAggregations().get("myTerms"); assertThat(sigterms.getBuckets().size(), equalTo(0)); - response = client().prepareSearch(index) .addAggregation( - terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).executionHint(randomExecutionHint()) - .order(BucketOrder.key(true)) + terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).shardSize(2) + .executionHint(randomExecutionHint()).order(BucketOrder.key(true)) ) .get(); assertSearchResponse(response); @@ -154,11 +152,10 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase { } - private void addTermsDocs(String term, int numDocs, List builders) { + private static void addTermsDocs(String term, int numDocs, List builders) { String sourceClass = "{\"text\": \"" + term + "\"}"; for (int i = 0; i < numDocs; i++) { builders.add(client().prepareIndex(index, type).setSource(sourceClass, XContentType.JSON)); } - } }