Update BucketUtils#suggestShardSideQueueSize signature (#33210)
`BucketUtils#suggestShardSideQueueSize` used to calculate the shard_size based on the number of shards. It returns now a different value only based on whether we are querying a single shard or multiple shards. This commit replaces the numberOfShards argument with a boolean that tells whether we are querying a single shard or not.
This commit is contained in:
parent
8c57d4af6a
commit
034fdbca28
|
@ -31,27 +31,21 @@ public final class BucketUtils {
|
||||||
*
|
*
|
||||||
* @param finalSize
|
* @param finalSize
|
||||||
* The number of terms required in the final reduce phase.
|
* The number of terms required in the final reduce phase.
|
||||||
* @param numberOfShards
|
* @param singleShard
|
||||||
* The number of shards being queried.
|
* whether a single shard is being queried, or multiple shards
|
||||||
* @return A suggested default for the size of any shard-side PriorityQueues
|
* @return A suggested default for the size of any shard-side PriorityQueues
|
||||||
*/
|
*/
|
||||||
public static int suggestShardSideQueueSize(int finalSize, int numberOfShards) {
|
public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) {
|
||||||
if (finalSize < 1) {
|
if (finalSize < 1) {
|
||||||
throw new IllegalArgumentException("size must be positive, got " + finalSize);
|
throw new IllegalArgumentException("size must be positive, got " + finalSize);
|
||||||
}
|
}
|
||||||
if (numberOfShards < 1) {
|
if (singleShard) {
|
||||||
throw new IllegalArgumentException("number of shards must be positive, got " + numberOfShards);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numberOfShards == 1) {
|
|
||||||
// In the case of a single shard, we do not need to over-request
|
// In the case of a single shard, we do not need to over-request
|
||||||
return finalSize;
|
return finalSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request 50% more buckets on the shards in order to improve accuracy
|
// Request 50% more buckets on the shards in order to improve accuracy
|
||||||
// as well as a small constant that should help with small values of 'size'
|
// as well as a small constant that should help with small values of 'size'
|
||||||
final long shardSampleSize = (long) (finalSize * 1.5 + 10);
|
final long shardSampleSize = (long) (finalSize * 1.5 + 10);
|
||||||
return (int) Math.min(Integer.MAX_VALUE, shardSampleSize);
|
return (int) Math.min(Integer.MAX_VALUE, shardSampleSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,7 +157,7 @@ public class GeoGridAggregationBuilder extends ValuesSourceAggregationBuilder<Va
|
||||||
if (shardSize < 0) {
|
if (shardSize < 0) {
|
||||||
// Use default heuristic to avoid any wrong-ranking caused by
|
// Use default heuristic to avoid any wrong-ranking caused by
|
||||||
// distributed counting
|
// distributed counting
|
||||||
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards());
|
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (requiredSize <= 0 || shardSize <= 0) {
|
if (requiredSize <= 0 || shardSize <= 0) {
|
||||||
|
|
|
@ -195,7 +195,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
||||||
// such are impossible to differentiate from non-significant terms
|
// such are impossible to differentiate from non-significant terms
|
||||||
// at that early stage.
|
// at that early stage.
|
||||||
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
|
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
|
||||||
context.numberOfShards()));
|
context.numberOfShards() == 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (valuesSource instanceof ValuesSource.Bytes) {
|
if (valuesSource instanceof ValuesSource.Bytes) {
|
||||||
|
|
|
@ -176,7 +176,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory<Signific
|
||||||
// such are impossible to differentiate from non-significant terms
|
// such are impossible to differentiate from non-significant terms
|
||||||
// at that early stage.
|
// at that early stage.
|
||||||
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
|
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
|
||||||
context.numberOfShards()));
|
context.numberOfShards() == 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO - need to check with mapping that this is indeed a text field....
|
// TODO - need to check with mapping that this is indeed a text field....
|
||||||
|
|
|
@ -122,7 +122,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
|
||||||
// heuristic to avoid any wrong-ranking caused by distributed
|
// heuristic to avoid any wrong-ranking caused by distributed
|
||||||
// counting
|
// counting
|
||||||
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
|
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
|
||||||
context.numberOfShards()));
|
context.numberOfShards() == 1));
|
||||||
}
|
}
|
||||||
bucketCountThresholds.ensureValidity();
|
bucketCountThresholds.ensureValidity();
|
||||||
if (valuesSource instanceof ValuesSource.Bytes) {
|
if (valuesSource instanceof ValuesSource.Bytes) {
|
||||||
|
|
|
@ -27,18 +27,14 @@ public class BucketUtilsTests extends ESTestCase {
|
||||||
|
|
||||||
public void testBadInput() {
|
public void testBadInput() {
|
||||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||||
() -> BucketUtils.suggestShardSideQueueSize(0, 10));
|
() -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean()));
|
||||||
assertEquals(e.getMessage(), "size must be positive, got 0");
|
assertEquals(e.getMessage(), "size must be positive, got 0");
|
||||||
|
|
||||||
e = expectThrows(IllegalArgumentException.class,
|
|
||||||
() -> BucketUtils.suggestShardSideQueueSize(10, 0));
|
|
||||||
assertEquals(e.getMessage(), "number of shards must be positive, got 0");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testOptimizesSingleShard() {
|
public void testOptimizesSingleShard() {
|
||||||
for (int iter = 0; iter < 10; ++iter) {
|
for (int iter = 0; iter < 10; ++iter) {
|
||||||
final int size = randomIntBetween(1, Integer.MAX_VALUE);
|
final int size = randomIntBetween(1, Integer.MAX_VALUE);
|
||||||
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, 1));
|
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +42,7 @@ public class BucketUtilsTests extends ESTestCase {
|
||||||
for (int iter = 0; iter < 10; ++iter) {
|
for (int iter = 0; iter < 10; ++iter) {
|
||||||
final int size = Integer.MAX_VALUE - randomInt(10);
|
final int size = Integer.MAX_VALUE - randomInt(10);
|
||||||
final int numberOfShards = randomIntBetween(1, 10);
|
final int numberOfShards = randomIntBetween(1, 10);
|
||||||
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards);
|
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
|
||||||
assertThat(shardSize, greaterThanOrEqualTo(shardSize));
|
assertThat(shardSize, greaterThanOrEqualTo(shardSize));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,7 +51,7 @@ public class BucketUtilsTests extends ESTestCase {
|
||||||
for (int iter = 0; iter < 10; ++iter) {
|
for (int iter = 0; iter < 10; ++iter) {
|
||||||
final int size = randomIntBetween(1, Integer.MAX_VALUE);
|
final int size = randomIntBetween(1, Integer.MAX_VALUE);
|
||||||
final int numberOfShards = randomIntBetween(1, 10);
|
final int numberOfShards = randomIntBetween(1, 10);
|
||||||
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards);
|
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
|
||||||
assertThat(shardSize, greaterThanOrEqualTo(size));
|
assertThat(shardSize, greaterThanOrEqualTo(size));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue