From 514b2f34149b993d0433db193e9bc2cf4c3911ef Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 17 Jul 2020 15:16:09 -0400 Subject: [PATCH] Clean up a few of vwh's rough edges (#59341) (#59807) This cleans up a few rough edged in the `variable_width_histogram`, mostly found by @wwang500: 1. Setting its tuning parameters in an unexpected order could cause the request to fail. 2. We checked that the maximum number of buckets was both less than 50000 and MAX_BUCKETS. This drops the 50000. 3. Fixes a divide by 0 that can occur of the `shard_size` is 1. 4. Fixes a divide by 0 that can occur if the `shard_size * 3` overflows a signed int. 5. Requires `shard_size * 3 / 4` to be at least `buckets`. If it is less than `buckets` we will very consistently return fewer buckets than requested. For the most part we expect folks to leave it at the default. If they change it, we expect it to be much bigger than `buckets`. 6. Allocate a smaller `mergeMap` in when initially bucketing requests that don't use the entire `shard_size * 3 / 4`. Its just a waste. 7. Default `shard_size` to `10 * buckets` rather than `100`. It *looks* like that was our intention the whole time. And it feels like it'd keep the algorithm humming along more smoothly. 8. Default the `initial_buffer` to `min(10 * shard_size, 50000)` like we've documented it rather than `5000`. Like the point above, this feels like the right thing to do to keep the algorithm happy. Co-authored-by: Elastic Machine Co-authored-by: Elastic Machine --- ...ariablewidthhistogram-aggregation.asciidoc | 2 +- ...iableWidthHistogramAggregationBuilder.java | 75 +++++++++++++------ .../VariableWidthHistogramAggregator.java | 11 ++- ...VariableWidthHistogramAggregatorTests.java | 65 +++++++++++++++- 4 files changed, 124 insertions(+), 29 deletions(-) diff --git a/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc b/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc index 1f5435891b9..ad5ce3dd0d9 100644 --- a/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/variablewidthhistogram-aggregation.asciidoc @@ -78,7 +78,7 @@ after the reduction step. Increasing the `shard_size` will improve the accuracy also make it more expensive to compute the final result because bigger priority queues will have to be managed on a shard level, and the data transfers between the nodes and the client will be larger. -TIP: Parameters `buckets`, `shard_size`, and `initial_buffer` are optional. By default, `buckets = 10`, `shard_size = 500` and `initial_buffer = min(50 * shard_size, 50000)`. +TIP: Parameters `buckets`, `shard_size`, and `initial_buffer` are optional. By default, `buckets = 10`, `shard_size = buckets * 50`, and `initial_buffer = min(10 * shard_size, 50000)`. ==== Initial Buffer The `initial_buffer` parameter can be used to specify the number of individual documents that will be stored in memory diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java index 5b7b42e6253..8f2424880f4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java @@ -38,6 +38,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.aggregations.support.ValuesSourceType; import java.io.IOException; +import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -48,7 +49,7 @@ public class VariableWidthHistogramAggregationBuilder private static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets"); - private static ParseField INITIAL_BUFFER_FIELD = new ParseField("initial_buffer"); + private static final ParseField INITIAL_BUFFER_FIELD = new ParseField("initial_buffer"); private static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size"); @@ -62,12 +63,13 @@ public class VariableWidthHistogramAggregationBuilder } private int numBuckets = 10; - private int shardSize = numBuckets * 50; - private int initialBuffer = Math.min(10 * this.shardSize, 50000); + private int shardSize = -1; + private int initialBuffer = -1; public static void registerAggregators(ValuesSourceRegistry.Builder builder) { VariableWidthHistogramAggregatorFactory.registerAggregators(builder); } + /** Create a new builder with the given name. */ public VariableWidthHistogramAggregationBuilder(String name) { super(name); @@ -93,41 +95,48 @@ public class VariableWidthHistogramAggregationBuilder public VariableWidthHistogramAggregationBuilder setNumBuckets(int numBuckets){ if (numBuckets <= 0) { - throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than 0 for [" - + name + "]"); - } else if (numBuckets > 50000){ - throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must not be greater than 50,000 for [" + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than [0] for [" + name + "]"); } this.numBuckets = numBuckets; return this; } - public VariableWidthHistogramAggregationBuilder setShardSize(int shardSize){ - if (shardSize < numBuckets) { - throw new IllegalArgumentException(SHARD_SIZE_FIELD.getPreferredName() + " must not be less than " - + NUM_BUCKETS_FIELD.getPreferredName() + " for [" + name + "]"); + public VariableWidthHistogramAggregationBuilder setShardSize(int shardSize) { + if (shardSize <= 1) { + // A shard size of 1 will cause divide by 0s and, even if it worked, would produce garbage results. + throw new IllegalArgumentException(SHARD_SIZE_FIELD.getPreferredName() + " must be greater than [1] for [" + name + "]"); } this.shardSize = shardSize; return this; } - public VariableWidthHistogramAggregationBuilder setInitialBuffer(int initialBuffer){ - if (initialBuffer < numBuckets) { - // If numBuckets buckets are being returned, then at least that many must be stored in memory - throw new IllegalArgumentException(INITIAL_BUFFER_FIELD.getPreferredName() + " must be greater than numBuckets " - + NUM_BUCKETS_FIELD.getPreferredName() + " for [" + name + "]"); - + public VariableWidthHistogramAggregationBuilder setInitialBuffer(int initialBuffer) { + if (initialBuffer <= 0) { + throw new IllegalArgumentException(INITIAL_BUFFER_FIELD.getPreferredName() + " must be greater than [0] for [" + + name + "]"); } this.initialBuffer = initialBuffer; return this; } - public int getNumBuckets(){ return numBuckets; } + public int getNumBuckets() { + return numBuckets; + } - public int getShardSize(){ return shardSize; } + public int getShardSize() { + if (shardSize == -1) { + return numBuckets * 50; + } + return shardSize; + } - public int getInitialBuffer(){ return initialBuffer; } + public int getInitialBuffer() { + if (initialBuffer == -1) { + return Math.min(10 * getShardSize(), 50000); + } + return initialBuffer; + } @Override public BucketCardinality bucketCardinality() { @@ -149,12 +158,32 @@ public class VariableWidthHistogramAggregationBuilder ValuesSourceConfig config, AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder) throws IOException { - Settings settings = queryShardContext.getIndexSettings().getNodeSettings(); int maxBuckets = MultiBucketConsumerService.MAX_BUCKET_SETTING.get(settings); if (numBuckets > maxBuckets) { - throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName()+ - " must be less than " + maxBuckets); + throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be less than " + maxBuckets); + } + int initialBuffer = getInitialBuffer(); + int shardSize = getShardSize(); + if (initialBuffer < numBuckets) { + // If numBuckets buckets are being returned, then at least that many must be stored in memory + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "%s must be at least %s but was [%s<%s] for [%s]", + INITIAL_BUFFER_FIELD.getPreferredName(), + NUM_BUCKETS_FIELD.getPreferredName(), + initialBuffer, + numBuckets, + name + ) + ); + } + int mergePhaseInit = VariableWidthHistogramAggregator.mergePhaseInitialBucketCount(shardSize); + if (mergePhaseInit < numBuckets) { + // If the initial buckets from the merge phase is super low we will consistently return too few buckets + throw new IllegalArgumentException("3/4 of " + SHARD_SIZE_FIELD.getPreferredName() + " must be at least " + + NUM_BUCKETS_FIELD.getPreferredName() + " but was [" + mergePhaseInit + "<" + numBuckets + "] for [" + name + "]"); } return new VariableWidthHistogramAggregatorFactory(name, config, numBuckets, shardSize, initialBuffer, queryShardContext, parent, subFactoriesBuilder, metadata); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java index 63247d70b65..63e40557663 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -170,8 +170,7 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator MergeBucketsPhase(DoubleArray buffer, int bufferSize) { // Cluster the documents to reduce the number of buckets - // Target shardSizes * (3/4) buckets so that there's room for more distant buckets to be added during rest of collection - bucketBufferedDocs(buffer, bufferSize, shardSize * 3 / 4); + bucketBufferedDocs(buffer, bufferSize, mergePhaseInitialBucketCount(shardSize)); if(bufferSize > 1) { updateAvgBucketDistance(); @@ -232,7 +231,7 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator * By just creating a merge map, we eliminate the need to actually sort buffer. We can just * use the merge map to find any doc's sorted index. */ - private void bucketBufferedDocs(final DoubleArray buffer, final int bufferSize, final int numBuckets){ + private void bucketBufferedDocs(final DoubleArray buffer, final int bufferSize, final int numBuckets) { // Allocate space for the clusters about to be created clusterMins = bigArrays.newDoubleArray(1); clusterMaxes = bigArrays.newDoubleArray(1); @@ -265,7 +264,7 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator } } - mergeBuckets(mergeMap, numBuckets); + mergeBuckets(mergeMap, bucketOrd + 1); if (deferringCollector != null) { deferringCollector.mergeBuckets(mergeMap); } @@ -584,5 +583,9 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator Releasables.close(collector); } + public static int mergePhaseInitialBucketCount(int shardSize) { + // Target shardSizes * (3/4) buckets so that there's room for more distant buckets to be added during rest of collection + return (int) ((long) shardSize * 3 / 4); + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java index 0876d25cc9d..968da0dbed3 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregatorTests.java @@ -88,7 +88,7 @@ public class VariableWidthHistogramAggregatorTests extends AggregatorTestCase { expectedMins.put(10d, 10d); testBothCases(DEFAULT_QUERY, dataset, true, - aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(4).setShardSize(4), + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(4), histogram -> { final List buckets = histogram.getBuckets(); assertEquals(expectedDocCount.size(), buckets.size()); @@ -499,6 +499,69 @@ public class VariableWidthHistogramAggregatorTests extends AggregatorTestCase { assertThat(e.getMessage(), containsString("cannot be nested")); } + public void testShardSizeTooSmall() throws Exception{ + Exception e = expectThrows(IllegalArgumentException.class, () -> + new VariableWidthHistogramAggregationBuilder("test").setShardSize(1)); + assertThat(e.getMessage(), equalTo("shard_size must be greater than [1] for [test]")); + } + + public void testSmallShardSize() throws Exception { + Exception e = expectThrows(IllegalArgumentException.class, () -> testSearchCase( + DEFAULT_QUERY, + org.elasticsearch.common.collect.List.of(), + true, + aggregation -> aggregation.field(NUMERIC_FIELD).setNumBuckets(2).setShardSize(2), + histogram -> {fail();} + )); + assertThat(e.getMessage(), equalTo("3/4 of shard_size must be at least buckets but was [1<2] for [_name]")); + } + + public void testHugeShardSize() throws Exception { + final List dataset = Arrays.asList(1, 2, 3); + testBothCases(DEFAULT_QUERY, dataset, true, aggregation -> aggregation.field(NUMERIC_FIELD).setShardSize(1000000000), histogram -> { + assertThat( + histogram.getBuckets().stream().map(InternalVariableWidthHistogram.Bucket::getKey).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of(1.0, 2.0, 3.0)) + ); + }); + } + + public void testSmallInitialBuffer() throws Exception { + Exception e = expectThrows(IllegalArgumentException.class, () -> testSearchCase( + DEFAULT_QUERY, + org.elasticsearch.common.collect.List.of(), + true, + aggregation -> aggregation.field(NUMERIC_FIELD).setInitialBuffer(1), + histogram -> {fail();} + )); + assertThat(e.getMessage(), equalTo("initial_buffer must be at least buckets but was [1<10] for [_name]")); + } + + public void testOutOfOrderInitialBuffer() throws Exception { + final List dataset = Arrays.asList(1, 2, 3); + testBothCases( + DEFAULT_QUERY, + dataset, + true, + aggregation -> aggregation.field(NUMERIC_FIELD).setInitialBuffer(3).setNumBuckets(3), + histogram -> { + assertThat( + histogram.getBuckets().stream().map(InternalVariableWidthHistogram.Bucket::getKey).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of(1.0, 2.0, 3.0)) + ); + } + ); + } + + public void testDefaultShardSizeDependsOnNumBuckets() throws Exception { + assertThat(new VariableWidthHistogramAggregationBuilder("test").setNumBuckets(3).getShardSize(), equalTo(150)); + } + + public void testDefaultInitialBufferDependsOnNumBuckets() throws Exception { + assertThat(new VariableWidthHistogramAggregationBuilder("test").setShardSize(50).getInitialBuffer(), equalTo(500)); + assertThat(new VariableWidthHistogramAggregationBuilder("test").setShardSize(10000).getInitialBuffer(), equalTo(50000)); + assertThat(new VariableWidthHistogramAggregationBuilder("test").setNumBuckets(3).getInitialBuffer(), equalTo(1500)); + } private void testSearchCase(final Query query, final List dataset, boolean multipleSegments, final Consumer configure,