Remove single shard optimization when suggesting shard_size (#37041)

When executing terms aggregations we set the shard_size, meaning the
number of buckets to collect on each shard, to a value that's higher than
the number of requested buckets, to guarantee some basic level of
precision. We have an optimization in place so that we leave shard_size
set to size whenever we are searching against a single shard, in which
case maximum precision is guaranteed by definition.

Such optimization requires us access to the total number of shards that
the search is executing against. In the context of cross-cluster search,
once we will introduce multiple reduction steps (one per cluster) each
cluster will only know the number of local shards, which is problematic
as we should only optimize if we are searching against a single shard in a
single cluster. It could be that we are searching against one shard per cluster
in which case the current code would optimize number of terms causing
a loss of precision.

While discussing how to address the CCS scenario, we decided that we do
not want to introduce further complexity caused by this single shard
optimization, as it benefits only a minority of cases, especially when
the benefits are not so great.

This commit removes the single shard optimization, meaning that we will
always have heuristic enabled on how many number of buckets to collect
on the shards, even when searching against a single shard.

This will cause more buckets to be collected when searching against a single
shard compared to before. If that becomes a problem for some users, they
can work around that by setting the shard_size equal to the size.

Relates to #32125
This commit is contained in:
Luca Cavanna 2019-01-02 17:45:49 +01:00 committed by GitHub
parent e0a677b033
commit 42ea644903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 24 additions and 46 deletions

View File

@ -448,7 +448,7 @@ If the number of unique terms is greater than `size`, the returned list can be s
size buckets was not returned).
To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter
can be used to control the volumes of candidate terms produced by each shard.
Low-frequency terms can turn out to be the most interesting ones once all results are combined so the

View File

@ -364,7 +364,7 @@ If the number of unique terms is greater than `size`, the returned list can be s
size buckets was not returned).
To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter
can be used to control the volumes of candidate terms produced by each shard.
Low-frequency terms can turn out to be the most interesting ones once all results are combined so the

View File

@ -220,8 +220,7 @@ NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sens
override it and reset it to be equal to `size`.
The default `shard_size` will be `size` if the search request needs to go to a single shard, and `(size * 1.5 + 10)`
otherwise.
The default `shard_size` is `(size * 1.5 + 10)`.
==== Calculating Document Count Error

View File

@ -31,18 +31,12 @@ public final class BucketUtils {
*
* @param finalSize
* The number of terms required in the final reduce phase.
* @param singleShard
* whether a single shard is being queried, or multiple shards
* @return A suggested default for the size of any shard-side PriorityQueues
*/
public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) {
public static int suggestShardSideQueueSize(int finalSize) {
if (finalSize < 1) {
throw new IllegalArgumentException("size must be positive, got " + finalSize);
}
if (singleShard) {
// In the case of a single shard, we do not need to over-request
return finalSize;
}
// Request 50% more buckets on the shards in order to improve accuracy
// as well as a small constant that should help with small values of 'size'
final long shardSampleSize = (long) (finalSize * 1.5 + 10);

View File

@ -157,7 +157,7 @@ public class GeoGridAggregationBuilder extends ValuesSourceAggregationBuilder<Va
if (shardSize < 0) {
// Use default heuristic to avoid any wrong-ranking caused by
// distributed counting
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1);
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize);
}
if (requiredSize <= 0 || shardSize <= 0) {

View File

@ -195,8 +195,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
// each shard and as
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}
if (valuesSource instanceof ValuesSource.Bytes) {

View File

@ -175,8 +175,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory<Signific
// we want to find have only one occurrence on each shard and as
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}
// TODO - need to check with mapping that this is indeed a text field....

View File

@ -121,8 +121,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
// The user has not made a shardSize selection. Use default
// heuristic to avoid any wrong-ranking caused by distributed
// counting
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}
bucketCountThresholds.ensureValidity();
if (valuesSource instanceof ValuesSource.Bytes) {

View File

@ -27,22 +27,14 @@ public class BucketUtilsTests extends ESTestCase {
public void testBadInput() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean()));
() -> BucketUtils.suggestShardSideQueueSize(0));
assertEquals(e.getMessage(), "size must be positive, got 0");
}
public void testOptimizesSingleShard() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true));
}
}
public void testOverFlow() {
for (int iter = 0; iter < 10; ++iter) {
final int size = Integer.MAX_VALUE - randomInt(10);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size);
assertThat(shardSize, greaterThanOrEqualTo(shardSize));
}
}
@ -50,8 +42,7 @@ public class BucketUtilsTests extends ESTestCase {
public void testShardSizeIsGreaterThanGlobalSize() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size);
assertThat(shardSize, greaterThanOrEqualTo(size));
}
}

View File

@ -23,11 +23,11 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
@ -35,17 +35,18 @@ import java.util.List;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
public class TermsShardMinDocCountIT extends ESIntegTestCase {
private static final String index = "someindex";
private static final String type = "testtype";
public String randomExecutionHint() {
private static String randomExecutionHint() {
return randomBoolean() ? null : randomFrom(SignificantTermsAggregatorFactory.ExecutionMode.values()).toString();
}
@ -74,7 +75,7 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase {
SearchResponse response = client().prepareSearch(index)
.addAggregation(
(filter("inclass", QueryBuilders.termQuery("class", true)))
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2)
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2).shardSize(2)
.executionHint(randomExecutionHint()))
)
.get();
@ -87,16 +88,14 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase {
response = client().prepareSearch(index)
.addAggregation(
(filter("inclass", QueryBuilders.termQuery("class", true)))
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2)
.shardMinDocCount(2).size(2)
.executionHint(randomExecutionHint()))
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).shardSize(2)
.shardMinDocCount(2).size(2).executionHint(randomExecutionHint()))
)
.get();
assertSearchResponse(response);
filteredBucket = response.getAggregations().get("inclass");
sigterms = filteredBucket.getAggregations().get("mySignificantTerms");
assertThat(sigterms.getBuckets().size(), equalTo(2));
}
private void addTermsDocs(String term, int numInClass, int numNotInClass, List<IndexRequestBuilder> builders) {
@ -133,7 +132,7 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase {
// first, check that indeed when not setting the shardMinDocCount parameter 0 terms are returned
SearchResponse response = client().prepareSearch(index)
.addAggregation(
terms("myTerms").field("text").minDocCount(2).size(2).executionHint(randomExecutionHint())
terms("myTerms").field("text").minDocCount(2).size(2).shardSize(2).executionHint(randomExecutionHint())
.order(BucketOrder.key(true))
)
.get();
@ -141,11 +140,10 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase {
Terms sigterms = response.getAggregations().get("myTerms");
assertThat(sigterms.getBuckets().size(), equalTo(0));
response = client().prepareSearch(index)
.addAggregation(
terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).executionHint(randomExecutionHint())
.order(BucketOrder.key(true))
terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).shardSize(2)
.executionHint(randomExecutionHint()).order(BucketOrder.key(true))
)
.get();
assertSearchResponse(response);
@ -154,11 +152,10 @@ public class TermsShardMinDocCountIT extends ESIntegTestCase {
}
private void addTermsDocs(String term, int numDocs, List<IndexRequestBuilder> builders) {
private static void addTermsDocs(String term, int numDocs, List<IndexRequestBuilder> builders) {
String sourceClass = "{\"text\": \"" + term + "\"}";
for (int i = 0; i < numDocs; i++) {
builders.add(client().prepareIndex(index, type).setSource(sourceClass, XContentType.JSON));
}
}
}