From 1e3f94237e464aeb5793d3a9c70f353ef67cd557 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 7 Sep 2016 12:14:59 -0700 Subject: [PATCH] groupBy v2: Configurable load factor. (#3437) Also change defaults: - bufferGrouperMaxLoadFactor from 0.75 to 0.7. - maxMergingDictionarySize to 100MB from 25MB, should be more appropriate for most heaps. --- docs/content/querying/groupbyquery.md | 6 +++-- .../query/groupby/GroupByQueryConfig.java | 25 ++++++++++++++----- .../groupby/epinephelinae/BufferGrouper.java | 13 +++++++--- .../epinephelinae/ConcurrentGrouper.java | 2 ++ .../epinephelinae/GroupByQueryEngineV2.java | 1 + .../epinephelinae/RowBasedGrouperHelper.java | 2 ++ .../epinephelinae/SpillingGrouper.java | 2 ++ .../epinephelinae/BufferGrouperTest.java | 4 ++- 8 files changed, 43 insertions(+), 12 deletions(-) diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index d1858a0104f..61ff7031b02 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -172,8 +172,9 @@ When using the "v2" strategy, the following runtime properties apply: |Property|Description|Default| |--------|-----------|-------| |`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1| -|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to -1 to use a reasonable default.|-1| -|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|25000000| +|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0| +|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0| +|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000| |`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that @@ -203,4 +204,5 @@ When using the "v2" strategy, the following query context parameters apply: |--------|-----------| |`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.| |`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.| +|`bufferGrouperMaxLoadFactor`|Overrides the value of `druid.query.groupBy.bufferGrouperMaxLoadFactor` for this query.| |`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index 9fb38558aeb..4b71baa90f5 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -31,6 +31,7 @@ public class GroupByQueryConfig private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; private static final String CTX_KEY_MAX_RESULTS = "maxResults"; private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets"; + private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor"; private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage"; @@ -51,11 +52,14 @@ public class GroupByQueryConfig private int bufferGrouperMaxSize = Integer.MAX_VALUE; @JsonProperty - private int bufferGrouperInitialBuckets = -1; + private float bufferGrouperMaxLoadFactor = 0; + + @JsonProperty + private int bufferGrouperInitialBuckets = 0; @JsonProperty // Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk - private long maxMergingDictionarySize = 25_000_000L; + private long maxMergingDictionarySize = 100_000_000L; @JsonProperty // Max on-disk temporary storage, per-query; when exceeded, the query fails @@ -101,6 +105,11 @@ public class GroupByQueryConfig return bufferGrouperMaxSize; } + public float getBufferGrouperMaxLoadFactor() + { + return bufferGrouperMaxLoadFactor; + } + public int getBufferGrouperInitialBuckets() { return bufferGrouperInitialBuckets; @@ -129,14 +138,18 @@ public class GroupByQueryConfig query.getContextValue(CTX_KEY_MAX_RESULTS, getMaxResults()), getMaxResults() ); - newConfig.bufferGrouperInitialBuckets = query.getContextValue( - CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS, - getBufferGrouperInitialBuckets() - ); newConfig.bufferGrouperMaxSize = Math.min( query.getContextValue(CTX_KEY_BUFFER_GROUPER_MAX_SIZE, getBufferGrouperMaxSize()), getBufferGrouperMaxSize() ); + newConfig.bufferGrouperMaxLoadFactor = query.getContextValue( + CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR, + getBufferGrouperMaxLoadFactor() + ); + newConfig.bufferGrouperInitialBuckets = query.getContextValue( + CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS, + getBufferGrouperInitialBuckets() + ); newConfig.maxOnDiskStorage = Math.min( ((Number)query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(), getMaxOnDiskStorage() diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index 273158e69e3..183ced6d7ef 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -62,7 +62,7 @@ public class BufferGrouper> implements Group private static final int MIN_INITIAL_BUCKETS = 4; private static final int DEFAULT_INITIAL_BUCKETS = 1024; - private static final float MAX_LOAD_FACTOR = 0.75f; + private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f; private static final int HASH_SIZE = Ints.BYTES; private final ByteBuffer buffer; @@ -74,6 +74,7 @@ public class BufferGrouper> implements Group private final int bucketSize; private final int tableArenaSize; private final int bufferGrouperMaxSize; // Integer.MAX_VALUE in production, only used for unit tests + private final float maxLoadFactor; // Buffer pointing to the current table (it moves around as the table grows) private ByteBuffer tableBuffer; @@ -96,6 +97,7 @@ public class BufferGrouper> implements Group final ColumnSelectorFactory columnSelectorFactory, final AggregatorFactory[] aggregatorFactories, final int bufferGrouperMaxSize, + final float maxLoadFactor, final int initialBuckets ) { @@ -105,8 +107,13 @@ public class BufferGrouper> implements Group this.aggregators = new BufferAggregator[aggregatorFactories.length]; this.aggregatorOffsets = new int[aggregatorFactories.length]; this.bufferGrouperMaxSize = bufferGrouperMaxSize; + this.maxLoadFactor = maxLoadFactor > 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR; this.initialBuckets = initialBuckets > 0 ? Math.max(MIN_INITIAL_BUCKETS, initialBuckets) : DEFAULT_INITIAL_BUCKETS; + if (this.maxLoadFactor >= 1.0f) { + throw new IAE("Invalid maxLoadFactor[%f], must be < 1.0", maxLoadFactor); + } + int offset = HASH_SIZE + keySize; for (int i = 0; i < aggregatorFactories.length; i++) { aggregators[i] = aggregatorFactories[i].factorizeBuffered(columnSelectorFactory); @@ -434,9 +441,9 @@ public class BufferGrouper> implements Group } } - private static int maxSizeForBuckets(int buckets) + private int maxSizeForBuckets(int buckets) { - return Math.max(1, (int) (buckets * MAX_LOAD_FACTOR)); + return Math.max(1, (int) (buckets * maxLoadFactor)); } /** diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index d597ecb51c8..77064593fbe 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -46,6 +46,7 @@ public class ConcurrentGrouper> implements G final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, final int bufferGrouperMaxSize, + final float bufferGrouperMaxLoadFactor, final int bufferGrouperInitialBuckets, final KeySerdeFactory keySerdeFactory, final ColumnSelectorFactory columnSelectorFactory, @@ -68,6 +69,7 @@ public class ConcurrentGrouper> implements G temporaryStorage, spillMapper, bufferGrouperMaxSize, + bufferGrouperMaxLoadFactor, bufferGrouperInitialBuckets ) ); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 47dd3998596..93ef0b3a65a 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -214,6 +214,7 @@ public class GroupByQueryEngineV2 query.getAggregatorSpecs() .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]), querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperMaxLoadFactor(), querySpecificConfig.getBufferGrouperInitialBuckets() ); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 269b127adae..a02cbe26570 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -97,6 +97,7 @@ public class RowBasedGrouperHelper temporaryStorage, spillMapper, querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperMaxLoadFactor(), querySpecificConfig.getBufferGrouperInitialBuckets() ); } else { @@ -106,6 +107,7 @@ public class RowBasedGrouperHelper temporaryStorage, spillMapper, querySpecificConfig.getBufferGrouperMaxSize(), + querySpecificConfig.getBufferGrouperMaxLoadFactor(), querySpecificConfig.getBufferGrouperInitialBuckets(), keySerdeFactory, columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java index 59cf05ccebb..b89bb00ea38 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -69,6 +69,7 @@ public class SpillingGrouper> implements Gro final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, final int bufferGrouperMaxSize, + final float bufferGrouperMaxLoadFactor, final int bufferGrouperInitialBuckets ) { @@ -79,6 +80,7 @@ public class SpillingGrouper> implements Gro columnSelectorFactory, aggregatorFactories, bufferGrouperMaxSize, + bufferGrouperMaxLoadFactor, bufferGrouperInitialBuckets ); this.aggregatorFactories = aggregatorFactories; diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java index 827167e7ad0..06d20cedc1f 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java @@ -50,7 +50,8 @@ public class BufferGrouperTest new CountAggregatorFactory("count") }, Integer.MAX_VALUE, - -1 + 0, + 0 ); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); @@ -156,6 +157,7 @@ public class BufferGrouperTest new CountAggregatorFactory("count") }, Integer.MAX_VALUE, + 0.75f, initialBuckets ); }