Clean up a few of vwh's rough edges (#59341) (#59807)

This cleans up a few rough edged in the `variable_width_histogram`,
mostly found by @wwang500:
1. Setting its tuning parameters in an unexpected order could cause the
   request to fail.
2. We checked that the maximum number of buckets was both less than
   50000 and MAX_BUCKETS. This drops the 50000.
3. Fixes a divide by 0 that can occur of the `shard_size` is 1.
4. Fixes a divide by 0 that can occur if the `shard_size * 3` overflows
   a signed int.
5. Requires `shard_size * 3 / 4` to be at least `buckets`. If it is less
   than `buckets` we will very consistently return fewer buckets than
   requested. For the most part we expect folks to leave it at the
   default. If they change it, we expect it to be much bigger than
   `buckets`.
6. Allocate a smaller `mergeMap` in when initially bucketing requests
   that don't use the entire `shard_size * 3 / 4`. Its just a waste.
7. Default `shard_size` to `10 * buckets` rather than `100`. It *looks*
   like that was our intention the whole time. And it feels like it'd
   keep the algorithm humming along more smoothly.
8. Default the `initial_buffer` to `min(10 * shard_size, 50000)` like
   we've documented it rather than `5000`. Like the point above, this
   feels like the right thing to do to keep the algorithm happy.

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Nik Everett 2020-07-17 15:16:09 -04:00 committed by GitHub
parent f6b08a3115
commit 514b2f3414
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 124 additions and 29 deletions

View File

@ -78,7 +78,7 @@ after the reduction step. Increasing the `shard_size` will improve the accuracy
also make it more expensive to compute the final result because bigger priority queues will have to be managed on a
shard level, and the data transfers between the nodes and the client will be larger.
TIP: Parameters `buckets`, `shard_size`, and `initial_buffer` are optional. By default, `buckets = 10`, `shard_size = 500` and `initial_buffer = min(50 * shard_size, 50000)`.
TIP: Parameters `buckets`, `shard_size`, and `initial_buffer` are optional. By default, `buckets = 10`, `shard_size = buckets * 50`, and `initial_buffer = min(10 * shard_size, 50000)`.
==== Initial Buffer
The `initial_buffer` parameter can be used to specify the number of individual documents that will be stored in memory

View File

@ -38,6 +38,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
@ -48,7 +49,7 @@ public class VariableWidthHistogramAggregationBuilder
private static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets");
private static ParseField INITIAL_BUFFER_FIELD = new ParseField("initial_buffer");
private static final ParseField INITIAL_BUFFER_FIELD = new ParseField("initial_buffer");
private static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size");
@ -62,12 +63,13 @@ public class VariableWidthHistogramAggregationBuilder
}
private int numBuckets = 10;
private int shardSize = numBuckets * 50;
private int initialBuffer = Math.min(10 * this.shardSize, 50000);
private int shardSize = -1;
private int initialBuffer = -1;
public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
VariableWidthHistogramAggregatorFactory.registerAggregators(builder);
}
/** Create a new builder with the given name. */
public VariableWidthHistogramAggregationBuilder(String name) {
super(name);
@ -93,41 +95,48 @@ public class VariableWidthHistogramAggregationBuilder
public VariableWidthHistogramAggregationBuilder setNumBuckets(int numBuckets){
if (numBuckets <= 0) {
throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than 0 for ["
+ name + "]");
} else if (numBuckets > 50000){
throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must not be greater than 50,000 for ["
throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than [0] for ["
+ name + "]");
}
this.numBuckets = numBuckets;
return this;
}
public VariableWidthHistogramAggregationBuilder setShardSize(int shardSize){
if (shardSize < numBuckets) {
throw new IllegalArgumentException(SHARD_SIZE_FIELD.getPreferredName() + " must not be less than "
+ NUM_BUCKETS_FIELD.getPreferredName() + " for [" + name + "]");
public VariableWidthHistogramAggregationBuilder setShardSize(int shardSize) {
if (shardSize <= 1) {
// A shard size of 1 will cause divide by 0s and, even if it worked, would produce garbage results.
throw new IllegalArgumentException(SHARD_SIZE_FIELD.getPreferredName() + " must be greater than [1] for [" + name + "]");
}
this.shardSize = shardSize;
return this;
}
public VariableWidthHistogramAggregationBuilder setInitialBuffer(int initialBuffer){
if (initialBuffer < numBuckets) {
// If numBuckets buckets are being returned, then at least that many must be stored in memory
throw new IllegalArgumentException(INITIAL_BUFFER_FIELD.getPreferredName() + " must be greater than numBuckets "
+ NUM_BUCKETS_FIELD.getPreferredName() + " for [" + name + "]");
public VariableWidthHistogramAggregationBuilder setInitialBuffer(int initialBuffer) {
if (initialBuffer <= 0) {
throw new IllegalArgumentException(INITIAL_BUFFER_FIELD.getPreferredName() + " must be greater than [0] for ["
+ name + "]");
}
this.initialBuffer = initialBuffer;
return this;
}
public int getNumBuckets(){ return numBuckets; }
public int getNumBuckets() {
return numBuckets;
}
public int getShardSize(){ return shardSize; }
public int getShardSize() {
if (shardSize == -1) {
return numBuckets * 50;
}
return shardSize;
}
public int getInitialBuffer(){ return initialBuffer; }
public int getInitialBuffer() {
if (initialBuffer == -1) {
return Math.min(10 * getShardSize(), 50000);
}
return initialBuffer;
}
@Override
public BucketCardinality bucketCardinality() {
@ -149,12 +158,32 @@ public class VariableWidthHistogramAggregationBuilder
ValuesSourceConfig config,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
Settings settings = queryShardContext.getIndexSettings().getNodeSettings();
int maxBuckets = MultiBucketConsumerService.MAX_BUCKET_SETTING.get(settings);
if (numBuckets > maxBuckets) {
throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName()+
" must be less than " + maxBuckets);
throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be less than " + maxBuckets);
}
int initialBuffer = getInitialBuffer();
int shardSize = getShardSize();
if (initialBuffer < numBuckets) {
// If numBuckets buckets are being returned, then at least that many must be stored in memory
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"%s must be at least %s but was [%s<%s] for [%s]",
INITIAL_BUFFER_FIELD.getPreferredName(),
NUM_BUCKETS_FIELD.getPreferredName(),
initialBuffer,
numBuckets,
name
)
);
}
int mergePhaseInit = VariableWidthHistogramAggregator.mergePhaseInitialBucketCount(shardSize);
if (mergePhaseInit < numBuckets) {
// If the initial buckets from the merge phase is super low we will consistently return too few buckets
throw new IllegalArgumentException("3/4 of " + SHARD_SIZE_FIELD.getPreferredName() + " must be at least "
+ NUM_BUCKETS_FIELD.getPreferredName() + " but was [" + mergePhaseInit + "<" + numBuckets + "] for [" + name + "]");
}
return new VariableWidthHistogramAggregatorFactory(name, config, numBuckets, shardSize, initialBuffer,
queryShardContext, parent, subFactoriesBuilder, metadata);

View File

@ -170,8 +170,7 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator
MergeBucketsPhase(DoubleArray buffer, int bufferSize) {
// Cluster the documents to reduce the number of buckets
// Target shardSizes * (3/4) buckets so that there's room for more distant buckets to be added during rest of collection
bucketBufferedDocs(buffer, bufferSize, shardSize * 3 / 4);
bucketBufferedDocs(buffer, bufferSize, mergePhaseInitialBucketCount(shardSize));
if(bufferSize > 1) {
updateAvgBucketDistance();
@ -232,7 +231,7 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator
* By just creating a merge map, we eliminate the need to actually sort <code>buffer</code>. We can just
* use the merge map to find any doc's sorted index.
*/
private void bucketBufferedDocs(final DoubleArray buffer, final int bufferSize, final int numBuckets){
private void bucketBufferedDocs(final DoubleArray buffer, final int bufferSize, final int numBuckets) {
// Allocate space for the clusters about to be created
clusterMins = bigArrays.newDoubleArray(1);
clusterMaxes = bigArrays.newDoubleArray(1);
@ -265,7 +264,7 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator
}
}
mergeBuckets(mergeMap, numBuckets);
mergeBuckets(mergeMap, bucketOrd + 1);
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}
@ -584,5 +583,9 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator
Releasables.close(collector);
}
public static int mergePhaseInitialBucketCount(int shardSize) {
// Target shardSizes * (3/4) buckets so that there's room for more distant buckets to be added during rest of collection
return (int) ((long) shardSize * 3 / 4);
}
}

View File

@ -88,7 +88,7 @@ public class VariableWidthHistogramAggregatorTests extends AggregatorTestCase {
expectedMins.put(10d, 10d);
testBothCases(DEFAULT_QUERY, dataset, true,
aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(4).setShardSize(4),
aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(4),
histogram -> {
final List<InternalVariableWidthHistogram.Bucket> buckets = histogram.getBuckets();
assertEquals(expectedDocCount.size(), buckets.size());
@ -499,6 +499,69 @@ public class VariableWidthHistogramAggregatorTests extends AggregatorTestCase {
assertThat(e.getMessage(), containsString("cannot be nested"));
}
public void testShardSizeTooSmall() throws Exception{
Exception e = expectThrows(IllegalArgumentException.class, () ->
new VariableWidthHistogramAggregationBuilder("test").setShardSize(1));
assertThat(e.getMessage(), equalTo("shard_size must be greater than [1] for [test]"));
}
public void testSmallShardSize() throws Exception {
Exception e = expectThrows(IllegalArgumentException.class, () -> testSearchCase(
DEFAULT_QUERY,
org.elasticsearch.common.collect.List.of(),
true,
aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(2).setShardSize(2),
histogram -> {fail();}
));
assertThat(e.getMessage(), equalTo("3/4 of shard_size must be at least buckets but was [1<2] for [_name]"));
}
public void testHugeShardSize() throws Exception {
final List<Number> dataset = Arrays.asList(1, 2, 3);
testBothCases(DEFAULT_QUERY, dataset, true, aggregation -> aggregation.field(NUMERIC_FIELD).setShardSize(1000000000), histogram -> {
assertThat(
histogram.getBuckets().stream().map(InternalVariableWidthHistogram.Bucket::getKey).collect(toList()),
equalTo(org.elasticsearch.common.collect.List.of(1.0, 2.0, 3.0))
);
});
}
public void testSmallInitialBuffer() throws Exception {
Exception e = expectThrows(IllegalArgumentException.class, () -> testSearchCase(
DEFAULT_QUERY,
org.elasticsearch.common.collect.List.of(),
true,
aggregation -> aggregation.field(NUMERIC_FIELD).setInitialBuffer(1),
histogram -> {fail();}
));
assertThat(e.getMessage(), equalTo("initial_buffer must be at least buckets but was [1<10] for [_name]"));
}
public void testOutOfOrderInitialBuffer() throws Exception {
final List<Number> dataset = Arrays.asList(1, 2, 3);
testBothCases(
DEFAULT_QUERY,
dataset,
true,
aggregation -> aggregation.field(NUMERIC_FIELD).setInitialBuffer(3).setNumBuckets(3),
histogram -> {
assertThat(
histogram.getBuckets().stream().map(InternalVariableWidthHistogram.Bucket::getKey).collect(toList()),
equalTo(org.elasticsearch.common.collect.List.of(1.0, 2.0, 3.0))
);
}
);
}
public void testDefaultShardSizeDependsOnNumBuckets() throws Exception {
assertThat(new VariableWidthHistogramAggregationBuilder("test").setNumBuckets(3).getShardSize(), equalTo(150));
}
public void testDefaultInitialBufferDependsOnNumBuckets() throws Exception {
assertThat(new VariableWidthHistogramAggregationBuilder("test").setShardSize(50).getInitialBuffer(), equalTo(500));
assertThat(new VariableWidthHistogramAggregationBuilder("test").setShardSize(10000).getInitialBuffer(), equalTo(50000));
assertThat(new VariableWidthHistogramAggregationBuilder("test").setNumBuckets(3).getInitialBuffer(), equalTo(1500));
}
private void testSearchCase(final Query query, final List<Number> dataset, boolean multipleSegments,
final Consumer<VariableWidthHistogramAggregationBuilder> configure,