From 97207cdcc7c8445bd3875bae0c41e5d2c0c9b082 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 11 Jul 2022 08:20:50 -0700 Subject: [PATCH] Automatic sizing for GroupBy dictionaries. (#12763) * Automatic sizing for GroupBy dictionary sizes. Merging and selector dictionary sizes currently both default to 100MB. This is not optimal, because it can lead to OOM on small servers and insufficient resource utilization on larger servers. It also invites end users to try to tune it when queries run out of dictionary space, which can make things worse if the end user sets it to too high. So, this patch: - Adds automatic tuning for selector and merge dictionaries. Selectors use up to 15% of the heap and merge buffers use up to 30% of the heap (aggregate across all queries). - Updates out-of-memory error messages to emphasize enabling disk spilling vs. increasing memory parameters. With the memory parameters automatically sized, it is more likely that an end user will get benefit from enabling disk spilling. - Removes the query context parameters that allow lowering of configured dictionary sizes. These complicate the calculation, and I don't see a reasonable use case for them. * Adjust tests. * Review adjustments. * Additional comment. * Remove unused import. --- docs/querying/groupbyquery.md | 7 +- .../query/groupby/GroupByQueryConfig.java | 117 ++++++++++++++---- .../GroupByMergingQueryRunnerV2.java | 5 + .../epinephelinae/GroupByQueryEngineV2.java | 21 +++- .../epinephelinae/GroupByRowProcessor.java | 4 +- .../query/groupby/epinephelinae/Groupers.java | 17 ++- .../epinephelinae/RowBasedGrouperHelper.java | 11 +- .../vector/VectorGroupByEngine.java | 12 +- .../groupby/strategy/GroupByStrategyV2.java | 5 + .../query/groupby/GroupByQueryConfigTest.java | 62 ++++++++-- .../query/groupby/GroupByQueryRunnerTest.java | 74 +---------- .../VectorGroupByEngineIteratorTest.java | 2 + .../druid/sql/http/SqlResourceTest.java | 3 +- 13 files changed, 219 insertions(+), 121 deletions(-) diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md index 41a52f53477..48bf3fb1329 100644 --- a/docs/querying/groupbyquery.md +++ b/docs/querying/groupbyquery.md @@ -386,19 +386,16 @@ Supported runtime properties: |Property|Description|Default| |--------|-----------|-------| -|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000| -|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000| +|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. If set to `0` (automatic), each query's dictionary can use 10% of the Java heap divided by `druid.processing.numMergeBuffers`, or 1GB, whichever is smaller.

See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details on changing this property.|0 (automatic)| +|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. If set to `0` (automatic), each query's dictionary uses 30% of the Java heap divided by `druid.processing.numMergeBuffers`, or 1GB, whichever is smaller.

See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details on changing this property.|0 (automatic)| |`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| Supported query contexts: |Key|Description| |---|-----------| -|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.| -|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.| |`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| - ### Advanced configurations #### Common configurations for all groupBy strategies diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 3518c0e4fb4..be4fb985a45 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -20,14 +20,19 @@ package org.apache.druid.query.groupby; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.utils.JvmUtils; /** * */ public class GroupByQueryConfig { + public static final long AUTOMATIC = 0; + public static final String CTX_KEY_STRATEGY = "groupByStrategy"; public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown"; public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown"; @@ -36,20 +41,32 @@ public class GroupByQueryConfig public static final String CTX_KEY_EXECUTING_NESTED_QUERY = "executingNestedQuery"; public static final String CTX_KEY_ARRAY_RESULT_ROWS = "resultAsArray"; public static final String CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING = "groupByEnableMultiValueUnnesting"; + public static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; private static final String CTX_KEY_MAX_RESULTS = "maxResults"; private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets"; private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor"; - private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage"; - private static final String CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE = "maxSelectorDictionarySize"; - private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize"; private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation"; private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree"; private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads"; private static final String CTX_KEY_MERGE_THREAD_LOCAL = "mergeThreadLocal"; + // Constants for sizing merging and selector dictionaries. Rationale for these constants: + // 1) In no case do we want total aggregate dictionary size to exceed 40% of max memory. + // 2) In no case do we want any dictionary to exceed 1GB of memory: if heaps are giant, better to spill at + // "reasonable" sizes rather than get giant dictionaries. (There is probably some other reason the user + // wanted a giant heap, so we shouldn't monopolize it with dictionaries.) + // 3) Use somewhat more memory for merging dictionary vs. selector dictionaries, because if a merging + // dictionary is full we must spill to disk, whereas if a selector dictionary is full we simply emit + // early to the merge buffer. So, a merging dictionary filling up has a more severe impact on + // query performance. + private static final double MERGING_DICTIONARY_HEAP_FRACTION = 0.3; + private static final double SELECTOR_DICTIONARY_HEAP_FRACTION = 0.1; + private static final long MIN_AUTOMATIC_DICTIONARY_SIZE = 1; + private static final long MAX_AUTOMATIC_DICTIONARY_SIZE = 1_000_000_000; + @JsonProperty private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2; @@ -75,11 +92,11 @@ public class GroupByQueryConfig @JsonProperty // Size of on-heap string dictionary for merging, per-processing-thread; when exceeded, partial results will be // emitted to the merge buffer early. - private long maxSelectorDictionarySize = 100_000_000L; + private HumanReadableBytes maxSelectorDictionarySize = HumanReadableBytes.valueOf(AUTOMATIC); @JsonProperty // Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk - private long maxMergingDictionarySize = 100_000_000L; + private HumanReadableBytes maxMergingDictionarySize = HumanReadableBytes.valueOf(AUTOMATIC); @JsonProperty // Max on-disk temporary storage, per-query; when exceeded, the query fails @@ -165,14 +182,80 @@ public class GroupByQueryConfig return bufferGrouperInitialBuckets; } - public long getMaxSelectorDictionarySize() + /** + * For unit tests. Production code should use {@link #getActualMaxSelectorDictionarySize}. + */ + long getConfiguredMaxSelectorDictionarySize() { - return maxSelectorDictionarySize; + return maxSelectorDictionarySize.getBytes(); } - public long getMaxMergingDictionarySize() + /** + * For unit tests. Production code should use {@link #getActualMaxSelectorDictionarySize}. + */ + long getActualMaxSelectorDictionarySize(final long maxHeapSize, final int numConcurrentQueries) { - return maxMergingDictionarySize; + if (maxSelectorDictionarySize.getBytes() == AUTOMATIC) { + final long heapForDictionaries = (long) (maxHeapSize * SELECTOR_DICTIONARY_HEAP_FRACTION); + + return Math.max( + MIN_AUTOMATIC_DICTIONARY_SIZE, + Math.min( + MAX_AUTOMATIC_DICTIONARY_SIZE, + heapForDictionaries / numConcurrentQueries + ) + ); + } else { + return maxSelectorDictionarySize.getBytes(); + } + } + + public long getActualMaxSelectorDictionarySize(final DruidProcessingConfig processingConfig) + { + return getActualMaxSelectorDictionarySize( + JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(), + + // numMergeBuffers is the number of groupBy queries that can run simultaneously + processingConfig.getNumMergeBuffers() + ); + } + + /** + * For unit tests. Production code should use {@link #getActualMaxMergingDictionarySize}. + */ + long getConfiguredMaxMergingDictionarySize() + { + return maxMergingDictionarySize.getBytes(); + } + + /** + * For unit tests. Production code should use {@link #getActualMaxMergingDictionarySize}. + */ + public long getActualMaxMergingDictionarySize(final long maxHeapSize, final int numConcurrentQueries) + { + if (maxMergingDictionarySize.getBytes() == AUTOMATIC) { + final long heapForDictionaries = (long) (maxHeapSize * MERGING_DICTIONARY_HEAP_FRACTION); + + return Math.max( + MIN_AUTOMATIC_DICTIONARY_SIZE, + Math.min( + MAX_AUTOMATIC_DICTIONARY_SIZE, + heapForDictionaries / numConcurrentQueries + ) + ); + } else { + return maxMergingDictionarySize.getBytes(); + } + } + + public long getActualMaxMergingDictionarySize(final DruidProcessingConfig processingConfig) + { + return getActualMaxMergingDictionarySize( + JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(), + + // numMergeBuffers is the number of groupBy queries that can run simultaneously + processingConfig.getNumMergeBuffers() + ); } public long getMaxOnDiskStorage() @@ -259,20 +342,8 @@ public class GroupByQueryConfig ((Number) query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(), getMaxOnDiskStorage() ); - newConfig.maxSelectorDictionarySize = Math.min( - ((Number) query.getContextValue( - CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE, - getMaxSelectorDictionarySize() - )).longValue(), - getMaxSelectorDictionarySize() - ); - newConfig.maxMergingDictionarySize = Math.min( - ((Number) query.getContextValue( - CTX_KEY_MAX_MERGING_DICTIONARY_SIZE, - getMaxMergingDictionarySize() - )).longValue(), - getMaxMergingDictionarySize() - ); + newConfig.maxSelectorDictionarySize = maxSelectorDictionarySize; // No overrides + newConfig.maxMergingDictionarySize = maxMergingDictionarySize; // No overrides newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit()); newConfig.applyLimitPushDownToSegment = query.getContextBoolean( CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index da6081c2dbf..16e3dd3ac7c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable; import org.apache.druid.query.ChainedExecutionQueryRunner; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; @@ -87,6 +88,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution"; private final GroupByQueryConfig config; + private final DruidProcessingConfig processingConfig; private final Iterable> queryables; private final QueryProcessingPool queryProcessingPool; private final QueryWatcher queryWatcher; @@ -98,6 +100,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner public GroupByMergingQueryRunnerV2( GroupByQueryConfig config, + DruidProcessingConfig processingConfig, QueryProcessingPool queryProcessingPool, QueryWatcher queryWatcher, Iterable> queryables, @@ -109,6 +112,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ) { this.config = config; + this.processingConfig = processingConfig; this.queryProcessingPool = queryProcessingPool; this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); @@ -197,6 +201,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner query, null, config, + processingConfig, Suppliers.ofInstance(mergeBufferHolder.get()), combineBufferHolder, concurrencyHint, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 55e7533f75f..e2cea67b8da 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.ColumnSelectorPlus; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -121,6 +122,7 @@ public class GroupByQueryEngineV2 @Nullable final StorageAdapter storageAdapter, final NonBlockingPool intermediateResultsBufferPool, final GroupByQueryConfig querySpecificConfig, + final DruidProcessingConfig processingConfig, @Nullable final GroupByQueryMetrics groupByQueryMetrics ) { @@ -164,6 +166,7 @@ public class GroupByQueryEngineV2 filter, interval, querySpecificConfig, + processingConfig, groupByQueryMetrics ); } else { @@ -173,6 +176,7 @@ public class GroupByQueryEngineV2 bufferHolder.get(), fudgeTimestamp, querySpecificConfig, + processingConfig, filter, interval, groupByQueryMetrics @@ -193,6 +197,7 @@ public class GroupByQueryEngineV2 final ByteBuffer processingBuffer, @Nullable final DateTime fudgeTimestamp, final GroupByQueryConfig querySpecificConfig, + final DruidProcessingConfig processingConfig, @Nullable final Filter filter, final Interval interval, @Nullable final GroupByQueryMetrics groupByQueryMetrics @@ -237,6 +242,7 @@ public class GroupByQueryEngineV2 return new ArrayAggregateIterator( query, querySpecificConfig, + processingConfig, cursor, processingBuffer, fudgeTimestamp, @@ -248,6 +254,7 @@ public class GroupByQueryEngineV2 return new HashAggregateIterator( query, querySpecificConfig, + processingConfig, cursor, processingBuffer, fudgeTimestamp, @@ -465,11 +472,12 @@ public class GroupByQueryEngineV2 protected CloseableGrouperIterator delegate = null; protected final boolean allSingleValueDims; protected final boolean allowMultiValueGrouping; - + protected final long maxSelectorFootprint; public GroupByEngineIterator( final GroupByQuery query, final GroupByQueryConfig querySpecificConfig, + final DruidProcessingConfig processingConfig, final Cursor cursor, final ByteBuffer buffer, @Nullable final DateTime fudgeTimestamp, @@ -479,6 +487,7 @@ public class GroupByQueryEngineV2 { this.query = query; this.querySpecificConfig = querySpecificConfig; + this.maxSelectorFootprint = querySpecificConfig.getActualMaxSelectorDictionarySize(processingConfig); this.cursor = cursor; this.buffer = buffer; this.keySerde = new GroupByEngineKeySerde(dims, query); @@ -632,6 +641,7 @@ public class GroupByQueryEngineV2 public HashAggregateIterator( GroupByQuery query, GroupByQueryConfig querySpecificConfig, + DruidProcessingConfig processingConfig, Cursor cursor, ByteBuffer buffer, @Nullable DateTime fudgeTimestamp, @@ -639,7 +649,7 @@ public class GroupByQueryEngineV2 boolean allSingleValueDims ) { - super(query, querySpecificConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); + super(query, querySpecificConfig, processingConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); final int dimCount = query.getDimensions().size(); stack = new int[dimCount]; @@ -747,7 +757,7 @@ public class GroupByQueryEngineV2 // Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes // us to go past the limit.) - if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + if (selectorInternalFootprint > maxSelectorFootprint) { return; } } @@ -836,7 +846,7 @@ public class GroupByQueryEngineV2 // Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes // us to go past the limit.) - if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + if (selectorInternalFootprint > maxSelectorFootprint) { return; } } @@ -870,6 +880,7 @@ public class GroupByQueryEngineV2 public ArrayAggregateIterator( GroupByQuery query, GroupByQueryConfig querySpecificConfig, + DruidProcessingConfig processingConfig, Cursor cursor, ByteBuffer buffer, @Nullable DateTime fudgeTimestamp, @@ -878,7 +889,7 @@ public class GroupByQueryEngineV2 int cardinality ) { - super(query, querySpecificConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); + super(query, querySpecificConfig, processingConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); this.cardinality = cardinality; if (dims.length == 1) { this.dim = dims[0]; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 7e753af1331..88c3c965442 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; @@ -37,7 +38,6 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBas import org.apache.druid.query.groupby.resource.GroupByQueryResource; import javax.annotation.Nullable; - import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -89,6 +89,7 @@ public class GroupByRowProcessor final GroupByQuery subquery, final Sequence rows, final GroupByQueryConfig config, + final DruidProcessingConfig processingConfig, final GroupByQueryResource resource, final ObjectMapper spillMapper, final String processingTmpDir, @@ -114,6 +115,7 @@ public class GroupByRowProcessor query, subquery, querySpecificConfig, + processingConfig, new Supplier() { @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java index bb351826e54..6506857625a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java @@ -31,16 +31,21 @@ public class Groupers private static final AggregateResult DICTIONARY_FULL_ZERO_COUNT = AggregateResult.partial( 0, - "Not enough dictionary space to execute this query. Try increasing " - + "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting " - + "druid.query.groupBy.maxOnDiskStorage to a positive number." + "Not enough dictionary memory to execute this query. Try enabling " + + "disk spilling by setting druid.query.groupBy.maxOnDiskStorage to an amount of bytes " + + "available on your machine for on-disk scratch files. " + + "See https://druid.apache.org/docs/latest/querying/groupbyquery.html#memory-tuning-and-resource-limits " + + "for details." ); private static final AggregateResult HASH_TABLE_FULL_ZERO_COUNT = AggregateResult.partial( 0, - "Not enough aggregation buffer space to execute this query. Try increasing " - + "druid.processing.buffer.sizeBytes or enable disk spilling by setting " - + "druid.query.groupBy.maxOnDiskStorage to a positive number." + "Not enough merge buffer memory to execute this query. Try enabling " + + "disk spilling by setting druid.query.groupBy.maxOnDiskStorage to an amount of bytes " + + "available on your machine for on-disk scratch files. Or, if you have additional off-heap memory " + + "available, consider increasing druid.processing.buffer.sizeBytes. " + + "See https://druid.apache.org/docs/latest/querying/groupbyquery.html#memory-tuning-and-resource-limits " + + "for details." ); private static final int USED_FLAG_MASK = 0x7fffffff; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 4bc851b7e17..4689e37ebca 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.ColumnSelectorPlus; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.GroupingAggregatorFactory; import org.apache.druid.query.dimension.ColumnSelectorStrategy; @@ -117,6 +118,7 @@ public class RowBasedGrouperHelper final GroupByQuery query, @Nullable final GroupByQuery subquery, final GroupByQueryConfig config, + final DruidProcessingConfig processingConfig, final Supplier bufferSupplier, final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, @@ -127,6 +129,7 @@ public class RowBasedGrouperHelper query, subquery, config, + processingConfig, bufferSupplier, null, SINGLE_THREAD_CONCURRENCY_HINT, @@ -158,6 +161,7 @@ public class RowBasedGrouperHelper * @param subquery optional subquery that we are receiving results from (see combining vs. subquery * mode above) * @param config groupBy query config + * @param processingConfig processing config * @param bufferSupplier supplier of merge buffers * @param combineBufferHolder holder of combine buffers. Unused if concurrencyHint = -1, and may be null in that case * @param concurrencyHint -1 for single-threaded Grouper, >=1 for concurrent Grouper @@ -174,6 +178,7 @@ public class RowBasedGrouperHelper final GroupByQuery query, @Nullable final GroupByQuery subquery, final GroupByQueryConfig config, + final DruidProcessingConfig processingConfig, final Supplier bufferSupplier, @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final int concurrencyHint, @@ -235,11 +240,13 @@ public class RowBasedGrouperHelper aggregatorFactories = query.getAggregatorSpecs().toArray(new AggregatorFactory[0]); } + final long maxMergingDictionarySize = querySpecificConfig.getActualMaxMergingDictionarySize(processingConfig); + final Grouper.KeySerdeFactory keySerdeFactory = new RowBasedKeySerdeFactory( includeTimestamp, query.getContextSortByDimsFirst(), query.getDimensions(), - querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint), + maxMergingDictionarySize / (concurrencyHint == -1 ? 1 : concurrencyHint), valueTypes, aggregatorFactories, limitSpec @@ -267,7 +274,7 @@ public class RowBasedGrouperHelper includeTimestamp, query.getContextSortByDimsFirst(), query.getDimensions(), - querySpecificConfig.getMaxMergingDictionarySize(), // use entire dictionary space for combining key serde + maxMergingDictionarySize, // use entire dictionary space for combining key serde valueTypes, aggregatorFactories, limitSpec diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index 2c7811c8ff1..ceaa75f85d4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.dimension.DimensionSpec; @@ -130,6 +131,7 @@ public class VectorGroupByEngine @Nullable final Filter filter, final Interval interval, final GroupByQueryConfig config, + final DruidProcessingConfig processingConfig, @Nullable final GroupByQueryMetrics groupByQueryMetrics ) { @@ -190,6 +192,7 @@ public class VectorGroupByEngine return new VectorGroupByEngineIterator( query, config, + processingConfig, storageAdapter, cursor, interval, @@ -228,6 +231,7 @@ public class VectorGroupByEngine { private final GroupByQuery query; private final GroupByQueryConfig querySpecificConfig; + private final DruidProcessingConfig processingConfig; private final StorageAdapter storageAdapter; private final VectorCursor cursor; private final List selectors; @@ -258,7 +262,8 @@ public class VectorGroupByEngine VectorGroupByEngineIterator( final GroupByQuery query, - final GroupByQueryConfig config, + final GroupByQueryConfig querySpecificConfig, + final DruidProcessingConfig processingConfig, final StorageAdapter storageAdapter, final VectorCursor cursor, final Interval queryInterval, @@ -268,7 +273,8 @@ public class VectorGroupByEngine ) { this.query = query; - this.querySpecificConfig = config; + this.querySpecificConfig = querySpecificConfig; + this.processingConfig = processingConfig; this.storageAdapter = storageAdapter; this.cursor = cursor; this.selectors = selectors; @@ -433,7 +439,7 @@ public class VectorGroupByEngine // Advance bucketInterval. bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null; break; - } else if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + } else if (selectorInternalFootprint > querySpecificConfig.getActualMaxSelectorDictionarySize(processingConfig)) { break; } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index c4ee7613f7d..e6adfa4bee1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -441,6 +441,7 @@ public class GroupByStrategyV2 implements GroupByStrategy wasQueryPushedDown ? queryToRun : subquery, subqueryResult, configSupplier.get(), + processingConfig, resource, spillMapper, processingConfig.getTmpDir(), @@ -513,6 +514,7 @@ public class GroupByStrategyV2 implements GroupByStrategy baseSubtotalQuery, queryResult, configSupplier.get(), + processingConfig, resource, spillMapper, processingConfig.getTmpDir(), @@ -575,6 +577,7 @@ public class GroupByStrategyV2 implements GroupByStrategy subtotalQuery, resultSupplierOneFinal.results(subTotalDimensionSpec), configSupplier.get(), + processingConfig, resource, spillMapper, processingConfig.getTmpDir(), @@ -680,6 +683,7 @@ public class GroupByStrategyV2 implements GroupByStrategy { return new GroupByMergingQueryRunnerV2( configSupplier.get(), + processingConfig, queryProcessingPool, queryWatcher, queryRunners, @@ -703,6 +707,7 @@ public class GroupByStrategyV2 implements GroupByStrategy storageAdapter, bufferPool, configSupplier.get().withOverrides(query), + processingConfig, groupByQueryMetrics ); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java index 0ce29983cc6..869bd75765d 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java @@ -40,7 +40,7 @@ public class GroupByQueryConfigTest .put("maxResults", "3") .put("maxOnDiskStorage", "4") .put("maxSelectorDictionarySize", "5") - .put("maxMergingDictionarySize", "6") + .put("maxMergingDictionarySize", "6M") .put("bufferGrouperMaxLoadFactor", "7") .build(); @@ -55,8 +55,8 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config.getMaxIntermediateRows()); Assert.assertEquals(3, config.getMaxResults()); Assert.assertEquals(4, config.getMaxOnDiskStorage()); - Assert.assertEquals(5, config.getMaxSelectorDictionarySize()); - Assert.assertEquals(6, config.getMaxMergingDictionarySize()); + Assert.assertEquals(5, config.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(6_000_000, config.getConfiguredMaxMergingDictionarySize()); Assert.assertEquals(7.0, config.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertFalse(config.isApplyLimitPushDownToSegment()); } @@ -79,8 +79,8 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config2.getMaxIntermediateRows()); Assert.assertEquals(3, config2.getMaxResults()); Assert.assertEquals(4, config2.getMaxOnDiskStorage()); - Assert.assertEquals(5, config2.getMaxSelectorDictionarySize()); - Assert.assertEquals(6, config2.getMaxMergingDictionarySize()); + Assert.assertEquals(5, config2.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(6_000_000, config2.getConfiguredMaxMergingDictionarySize()); Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertFalse(config2.isApplyLimitPushDownToSegment()); } @@ -113,9 +113,57 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config2.getMaxIntermediateRows()); Assert.assertEquals(2, config2.getMaxResults()); Assert.assertEquals(0, config2.getMaxOnDiskStorage()); - Assert.assertEquals(3, config2.getMaxSelectorDictionarySize()); - Assert.assertEquals(4, config2.getMaxMergingDictionarySize()); + Assert.assertEquals(5 /* Can't override */, config2.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(6_000_000 /* Can't override */, config2.getConfiguredMaxMergingDictionarySize()); Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertTrue(config2.isApplyLimitPushDownToSegment()); } + + @Test + public void testAutomaticMergingDictionarySize() + { + final GroupByQueryConfig config = MAPPER.convertValue( + ImmutableMap.of("maxMergingDictionarySize", "0"), + GroupByQueryConfig.class + ); + + Assert.assertEquals(GroupByQueryConfig.AUTOMATIC, config.getConfiguredMaxMergingDictionarySize()); + Assert.assertEquals(150_000_000, config.getActualMaxMergingDictionarySize(1_000_000_000, 2)); + } + + @Test + public void testNonAutomaticMergingDictionarySize() + { + final GroupByQueryConfig config = MAPPER.convertValue( + ImmutableMap.of("maxMergingDictionarySize", "100"), + GroupByQueryConfig.class + ); + + Assert.assertEquals(100, config.getConfiguredMaxMergingDictionarySize()); + Assert.assertEquals(100, config.getActualMaxMergingDictionarySize(1_000_000_000, 2)); + } + + @Test + public void testAutomaticSelectorDictionarySize() + { + final GroupByQueryConfig config = MAPPER.convertValue( + ImmutableMap.of("maxSelectorDictionarySize", "0"), + GroupByQueryConfig.class + ); + + Assert.assertEquals(GroupByQueryConfig.AUTOMATIC, config.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(50_000_000, config.getActualMaxSelectorDictionarySize(1_000_000_000, 2)); + } + + @Test + public void testNonAutomaticSelectorDictionarySize() + { + final GroupByQueryConfig config = MAPPER.convertValue( + ImmutableMap.of("maxSelectorDictionarySize", "100"), + GroupByQueryConfig.class + ); + + Assert.assertEquals(100, config.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(100, config.getActualMaxSelectorDictionarySize(1_000_000_000, 2)); + } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 4a3a0b5cf4d..71274c9f788 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -303,13 +303,13 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest } @Override - public long getMaxSelectorDictionarySize() + public long getConfiguredMaxSelectorDictionarySize() { return 20; } @Override - public long getMaxMergingDictionarySize() + public long getConfiguredMaxMergingDictionarySize() { return 400; } @@ -2956,7 +2956,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest List expectedResults = null; if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough aggregation buffer space to execute this query"); + expectedException.expectMessage("Not enough merge buffer memory to execute this query"); } else { expectedResults = Arrays.asList( makeRow(query, "2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), @@ -3003,68 +3003,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest TestHelper.assertExpectedObjects(expectedResults, results, "overide-maxOnDiskStorage"); } - @Test - public void testNotEnoughDictionarySpaceThroughContextOverride() - { - GroupByQuery query = makeQueryBuilder() - .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) - .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) - .setDimensions(new DefaultDimensionSpec("quality", "alias")) - .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) - .setGranularity(QueryRunnerTestHelper.DAY_GRAN) - .overrideContext(ImmutableMap.of("maxOnDiskStorage", 0, "maxMergingDictionarySize", 1)) - .build(); - - List expectedResults = null; - if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { - expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough dictionary space to execute this query"); - } else { - expectedResults = Arrays.asList( - makeRow(query, "2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), - makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), - makeRow( - query, - "2011-04-01", - "alias", - "entertainment", - "rows", - 1L, - "idx", - 158L - ), - makeRow(query, "2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), - makeRow(query, "2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), - makeRow(query, "2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), - makeRow(query, "2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), - makeRow(query, "2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), - makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), - - makeRow(query, "2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), - makeRow(query, "2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), - makeRow( - query, - "2011-04-02", - "alias", - "entertainment", - "rows", - 1L, - "idx", - 166L - ), - makeRow(query, "2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), - makeRow(query, "2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), - makeRow(query, "2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), - makeRow(query, "2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), - makeRow(query, "2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), - makeRow(query, "2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) - ); - } - - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - TestHelper.assertExpectedObjects(expectedResults, results, "dictionary-space"); - } - @Test public void testNotEnoughDiskSpaceThroughContextOverride() { @@ -3074,7 +3012,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setDimensions(new DefaultDimensionSpec("quality", "alias")) .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) .setGranularity(QueryRunnerTestHelper.DAY_GRAN) - .overrideContext(ImmutableMap.of("maxOnDiskStorage", 1, "maxMergingDictionarySize", 1)) + .overrideContext(ImmutableMap.of("maxOnDiskStorage", 1, GroupByQueryConfig.CTX_KEY_BUFFER_GROUPER_MAX_SIZE, 1)) .build(); List expectedResults = null; @@ -3084,7 +3022,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest // The error message always mentions disk if you have spilling enabled (maxOnDiskStorage > 0) expectedException.expectMessage("Not enough disk space to execute this query"); } else { - expectedException.expectMessage("Not enough dictionary space to execute this query"); + expectedException.expectMessage("Not enough merge buffer memory to execute this query"); } } else { expectedResults = Arrays.asList( @@ -3173,7 +3111,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } else { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough aggregation buffer space to execute this query"); + expectedException.expectMessage("Not enough merge buffer memory to execute this query"); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java index c4a3e903eaf..1c36cdfc9ff 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java @@ -27,6 +27,7 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.epinephelinae.VectorGrouper; import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator; import org.apache.druid.segment.ColumnProcessors; @@ -82,6 +83,7 @@ public class VectorGroupByEngineIteratorTest extends InitializedNullHandlingTest final VectorGroupByEngineIterator iterator = new VectorGroupByEngineIterator( query, new GroupByQueryConfig(), + GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG, storageAdapter, cursor, interval, diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index 3219d3f87d6..5c05f7ccc8f 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -59,6 +59,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; @@ -1271,7 +1272,7 @@ public class SqlResourceTest extends CalciteTestBase false, false, false, - ImmutableMap.of("maxMergingDictionarySize", 1, BaseQuery.SQL_QUERY_ID, "id"), + ImmutableMap.of(GroupByQueryConfig.CTX_KEY_BUFFER_GROUPER_MAX_SIZE, 1, BaseQuery.SQL_QUERY_ID, "id"), null ) ).lhs;