diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md index 41a52f53477..48bf3fb1329 100644 --- a/docs/querying/groupbyquery.md +++ b/docs/querying/groupbyquery.md @@ -386,19 +386,16 @@ Supported runtime properties: |Property|Description|Default| |--------|-----------|-------| -|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000| -|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000| +|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. If set to `0` (automatic), each query's dictionary can use 10% of the Java heap divided by `druid.processing.numMergeBuffers`, or 1GB, whichever is smaller.

See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details on changing this property.|0 (automatic)| +|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. If set to `0` (automatic), each query's dictionary uses 30% of the Java heap divided by `druid.processing.numMergeBuffers`, or 1GB, whichever is smaller.

See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details on changing this property.|0 (automatic)| |`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| Supported query contexts: |Key|Description| |---|-----------| -|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.| -|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.| |`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| - ### Advanced configurations #### Common configurations for all groupBy strategies diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 3518c0e4fb4..be4fb985a45 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -20,14 +20,19 @@ package org.apache.druid.query.groupby; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; +import org.apache.druid.utils.JvmUtils; /** * */ public class GroupByQueryConfig { + public static final long AUTOMATIC = 0; + public static final String CTX_KEY_STRATEGY = "groupByStrategy"; public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown"; public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown"; @@ -36,20 +41,32 @@ public class GroupByQueryConfig public static final String CTX_KEY_EXECUTING_NESTED_QUERY = "executingNestedQuery"; public static final String CTX_KEY_ARRAY_RESULT_ROWS = "resultAsArray"; public static final String CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING = "groupByEnableMultiValueUnnesting"; + public static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; private static final String CTX_KEY_MAX_RESULTS = "maxResults"; private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets"; private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor"; - private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage"; - private static final String CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE = "maxSelectorDictionarySize"; - private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize"; private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation"; private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree"; private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads"; private static final String CTX_KEY_MERGE_THREAD_LOCAL = "mergeThreadLocal"; + // Constants for sizing merging and selector dictionaries. Rationale for these constants: + // 1) In no case do we want total aggregate dictionary size to exceed 40% of max memory. + // 2) In no case do we want any dictionary to exceed 1GB of memory: if heaps are giant, better to spill at + // "reasonable" sizes rather than get giant dictionaries. (There is probably some other reason the user + // wanted a giant heap, so we shouldn't monopolize it with dictionaries.) + // 3) Use somewhat more memory for merging dictionary vs. selector dictionaries, because if a merging + // dictionary is full we must spill to disk, whereas if a selector dictionary is full we simply emit + // early to the merge buffer. So, a merging dictionary filling up has a more severe impact on + // query performance. + private static final double MERGING_DICTIONARY_HEAP_FRACTION = 0.3; + private static final double SELECTOR_DICTIONARY_HEAP_FRACTION = 0.1; + private static final long MIN_AUTOMATIC_DICTIONARY_SIZE = 1; + private static final long MAX_AUTOMATIC_DICTIONARY_SIZE = 1_000_000_000; + @JsonProperty private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2; @@ -75,11 +92,11 @@ public class GroupByQueryConfig @JsonProperty // Size of on-heap string dictionary for merging, per-processing-thread; when exceeded, partial results will be // emitted to the merge buffer early. - private long maxSelectorDictionarySize = 100_000_000L; + private HumanReadableBytes maxSelectorDictionarySize = HumanReadableBytes.valueOf(AUTOMATIC); @JsonProperty // Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk - private long maxMergingDictionarySize = 100_000_000L; + private HumanReadableBytes maxMergingDictionarySize = HumanReadableBytes.valueOf(AUTOMATIC); @JsonProperty // Max on-disk temporary storage, per-query; when exceeded, the query fails @@ -165,14 +182,80 @@ public class GroupByQueryConfig return bufferGrouperInitialBuckets; } - public long getMaxSelectorDictionarySize() + /** + * For unit tests. Production code should use {@link #getActualMaxSelectorDictionarySize}. + */ + long getConfiguredMaxSelectorDictionarySize() { - return maxSelectorDictionarySize; + return maxSelectorDictionarySize.getBytes(); } - public long getMaxMergingDictionarySize() + /** + * For unit tests. Production code should use {@link #getActualMaxSelectorDictionarySize}. + */ + long getActualMaxSelectorDictionarySize(final long maxHeapSize, final int numConcurrentQueries) { - return maxMergingDictionarySize; + if (maxSelectorDictionarySize.getBytes() == AUTOMATIC) { + final long heapForDictionaries = (long) (maxHeapSize * SELECTOR_DICTIONARY_HEAP_FRACTION); + + return Math.max( + MIN_AUTOMATIC_DICTIONARY_SIZE, + Math.min( + MAX_AUTOMATIC_DICTIONARY_SIZE, + heapForDictionaries / numConcurrentQueries + ) + ); + } else { + return maxSelectorDictionarySize.getBytes(); + } + } + + public long getActualMaxSelectorDictionarySize(final DruidProcessingConfig processingConfig) + { + return getActualMaxSelectorDictionarySize( + JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(), + + // numMergeBuffers is the number of groupBy queries that can run simultaneously + processingConfig.getNumMergeBuffers() + ); + } + + /** + * For unit tests. Production code should use {@link #getActualMaxMergingDictionarySize}. + */ + long getConfiguredMaxMergingDictionarySize() + { + return maxMergingDictionarySize.getBytes(); + } + + /** + * For unit tests. Production code should use {@link #getActualMaxMergingDictionarySize}. + */ + public long getActualMaxMergingDictionarySize(final long maxHeapSize, final int numConcurrentQueries) + { + if (maxMergingDictionarySize.getBytes() == AUTOMATIC) { + final long heapForDictionaries = (long) (maxHeapSize * MERGING_DICTIONARY_HEAP_FRACTION); + + return Math.max( + MIN_AUTOMATIC_DICTIONARY_SIZE, + Math.min( + MAX_AUTOMATIC_DICTIONARY_SIZE, + heapForDictionaries / numConcurrentQueries + ) + ); + } else { + return maxMergingDictionarySize.getBytes(); + } + } + + public long getActualMaxMergingDictionarySize(final DruidProcessingConfig processingConfig) + { + return getActualMaxMergingDictionarySize( + JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(), + + // numMergeBuffers is the number of groupBy queries that can run simultaneously + processingConfig.getNumMergeBuffers() + ); } public long getMaxOnDiskStorage() @@ -259,20 +342,8 @@ public class GroupByQueryConfig ((Number) query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(), getMaxOnDiskStorage() ); - newConfig.maxSelectorDictionarySize = Math.min( - ((Number) query.getContextValue( - CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE, - getMaxSelectorDictionarySize() - )).longValue(), - getMaxSelectorDictionarySize() - ); - newConfig.maxMergingDictionarySize = Math.min( - ((Number) query.getContextValue( - CTX_KEY_MAX_MERGING_DICTIONARY_SIZE, - getMaxMergingDictionarySize() - )).longValue(), - getMaxMergingDictionarySize() - ); + newConfig.maxSelectorDictionarySize = maxSelectorDictionarySize; // No overrides + newConfig.maxMergingDictionarySize = maxMergingDictionarySize; // No overrides newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit()); newConfig.applyLimitPushDownToSegment = query.getContextBoolean( CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index da6081c2dbf..16e3dd3ac7c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable; import org.apache.druid.query.ChainedExecutionQueryRunner; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; @@ -87,6 +88,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution"; private final GroupByQueryConfig config; + private final DruidProcessingConfig processingConfig; private final Iterable> queryables; private final QueryProcessingPool queryProcessingPool; private final QueryWatcher queryWatcher; @@ -98,6 +100,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner public GroupByMergingQueryRunnerV2( GroupByQueryConfig config, + DruidProcessingConfig processingConfig, QueryProcessingPool queryProcessingPool, QueryWatcher queryWatcher, Iterable> queryables, @@ -109,6 +112,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner ) { this.config = config; + this.processingConfig = processingConfig; this.queryProcessingPool = queryProcessingPool; this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); @@ -197,6 +201,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner query, null, config, + processingConfig, Suppliers.ofInstance(mergeBufferHolder.get()), combineBufferHolder, concurrencyHint, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 55e7533f75f..e2cea67b8da 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.ColumnSelectorPlus; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -121,6 +122,7 @@ public class GroupByQueryEngineV2 @Nullable final StorageAdapter storageAdapter, final NonBlockingPool intermediateResultsBufferPool, final GroupByQueryConfig querySpecificConfig, + final DruidProcessingConfig processingConfig, @Nullable final GroupByQueryMetrics groupByQueryMetrics ) { @@ -164,6 +166,7 @@ public class GroupByQueryEngineV2 filter, interval, querySpecificConfig, + processingConfig, groupByQueryMetrics ); } else { @@ -173,6 +176,7 @@ public class GroupByQueryEngineV2 bufferHolder.get(), fudgeTimestamp, querySpecificConfig, + processingConfig, filter, interval, groupByQueryMetrics @@ -193,6 +197,7 @@ public class GroupByQueryEngineV2 final ByteBuffer processingBuffer, @Nullable final DateTime fudgeTimestamp, final GroupByQueryConfig querySpecificConfig, + final DruidProcessingConfig processingConfig, @Nullable final Filter filter, final Interval interval, @Nullable final GroupByQueryMetrics groupByQueryMetrics @@ -237,6 +242,7 @@ public class GroupByQueryEngineV2 return new ArrayAggregateIterator( query, querySpecificConfig, + processingConfig, cursor, processingBuffer, fudgeTimestamp, @@ -248,6 +254,7 @@ public class GroupByQueryEngineV2 return new HashAggregateIterator( query, querySpecificConfig, + processingConfig, cursor, processingBuffer, fudgeTimestamp, @@ -465,11 +472,12 @@ public class GroupByQueryEngineV2 protected CloseableGrouperIterator delegate = null; protected final boolean allSingleValueDims; protected final boolean allowMultiValueGrouping; - + protected final long maxSelectorFootprint; public GroupByEngineIterator( final GroupByQuery query, final GroupByQueryConfig querySpecificConfig, + final DruidProcessingConfig processingConfig, final Cursor cursor, final ByteBuffer buffer, @Nullable final DateTime fudgeTimestamp, @@ -479,6 +487,7 @@ public class GroupByQueryEngineV2 { this.query = query; this.querySpecificConfig = querySpecificConfig; + this.maxSelectorFootprint = querySpecificConfig.getActualMaxSelectorDictionarySize(processingConfig); this.cursor = cursor; this.buffer = buffer; this.keySerde = new GroupByEngineKeySerde(dims, query); @@ -632,6 +641,7 @@ public class GroupByQueryEngineV2 public HashAggregateIterator( GroupByQuery query, GroupByQueryConfig querySpecificConfig, + DruidProcessingConfig processingConfig, Cursor cursor, ByteBuffer buffer, @Nullable DateTime fudgeTimestamp, @@ -639,7 +649,7 @@ public class GroupByQueryEngineV2 boolean allSingleValueDims ) { - super(query, querySpecificConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); + super(query, querySpecificConfig, processingConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); final int dimCount = query.getDimensions().size(); stack = new int[dimCount]; @@ -747,7 +757,7 @@ public class GroupByQueryEngineV2 // Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes // us to go past the limit.) - if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + if (selectorInternalFootprint > maxSelectorFootprint) { return; } } @@ -836,7 +846,7 @@ public class GroupByQueryEngineV2 // Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes // us to go past the limit.) - if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + if (selectorInternalFootprint > maxSelectorFootprint) { return; } } @@ -870,6 +880,7 @@ public class GroupByQueryEngineV2 public ArrayAggregateIterator( GroupByQuery query, GroupByQueryConfig querySpecificConfig, + DruidProcessingConfig processingConfig, Cursor cursor, ByteBuffer buffer, @Nullable DateTime fudgeTimestamp, @@ -878,7 +889,7 @@ public class GroupByQueryEngineV2 int cardinality ) { - super(query, querySpecificConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); + super(query, querySpecificConfig, processingConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); this.cardinality = cardinality; if (dims.length == 1) { this.dim = dims[0]; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 7e753af1331..88c3c965442 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; @@ -37,7 +38,6 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBas import org.apache.druid.query.groupby.resource.GroupByQueryResource; import javax.annotation.Nullable; - import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -89,6 +89,7 @@ public class GroupByRowProcessor final GroupByQuery subquery, final Sequence rows, final GroupByQueryConfig config, + final DruidProcessingConfig processingConfig, final GroupByQueryResource resource, final ObjectMapper spillMapper, final String processingTmpDir, @@ -114,6 +115,7 @@ public class GroupByRowProcessor query, subquery, querySpecificConfig, + processingConfig, new Supplier() { @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java index bb351826e54..6506857625a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/Groupers.java @@ -31,16 +31,21 @@ public class Groupers private static final AggregateResult DICTIONARY_FULL_ZERO_COUNT = AggregateResult.partial( 0, - "Not enough dictionary space to execute this query. Try increasing " - + "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting " - + "druid.query.groupBy.maxOnDiskStorage to a positive number." + "Not enough dictionary memory to execute this query. Try enabling " + + "disk spilling by setting druid.query.groupBy.maxOnDiskStorage to an amount of bytes " + + "available on your machine for on-disk scratch files. " + + "See https://druid.apache.org/docs/latest/querying/groupbyquery.html#memory-tuning-and-resource-limits " + + "for details." ); private static final AggregateResult HASH_TABLE_FULL_ZERO_COUNT = AggregateResult.partial( 0, - "Not enough aggregation buffer space to execute this query. Try increasing " - + "druid.processing.buffer.sizeBytes or enable disk spilling by setting " - + "druid.query.groupBy.maxOnDiskStorage to a positive number." + "Not enough merge buffer memory to execute this query. Try enabling " + + "disk spilling by setting druid.query.groupBy.maxOnDiskStorage to an amount of bytes " + + "available on your machine for on-disk scratch files. Or, if you have additional off-heap memory " + + "available, consider increasing druid.processing.buffer.sizeBytes. " + + "See https://druid.apache.org/docs/latest/querying/groupbyquery.html#memory-tuning-and-resource-limits " + + "for details." ); private static final int USED_FLAG_MASK = 0x7fffffff; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 4bc851b7e17..4689e37ebca 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.ColumnSelectorPlus; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.GroupingAggregatorFactory; import org.apache.druid.query.dimension.ColumnSelectorStrategy; @@ -117,6 +118,7 @@ public class RowBasedGrouperHelper final GroupByQuery query, @Nullable final GroupByQuery subquery, final GroupByQueryConfig config, + final DruidProcessingConfig processingConfig, final Supplier bufferSupplier, final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, @@ -127,6 +129,7 @@ public class RowBasedGrouperHelper query, subquery, config, + processingConfig, bufferSupplier, null, SINGLE_THREAD_CONCURRENCY_HINT, @@ -158,6 +161,7 @@ public class RowBasedGrouperHelper * @param subquery optional subquery that we are receiving results from (see combining vs. subquery * mode above) * @param config groupBy query config + * @param processingConfig processing config * @param bufferSupplier supplier of merge buffers * @param combineBufferHolder holder of combine buffers. Unused if concurrencyHint = -1, and may be null in that case * @param concurrencyHint -1 for single-threaded Grouper, >=1 for concurrent Grouper @@ -174,6 +178,7 @@ public class RowBasedGrouperHelper final GroupByQuery query, @Nullable final GroupByQuery subquery, final GroupByQueryConfig config, + final DruidProcessingConfig processingConfig, final Supplier bufferSupplier, @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final int concurrencyHint, @@ -235,11 +240,13 @@ public class RowBasedGrouperHelper aggregatorFactories = query.getAggregatorSpecs().toArray(new AggregatorFactory[0]); } + final long maxMergingDictionarySize = querySpecificConfig.getActualMaxMergingDictionarySize(processingConfig); + final Grouper.KeySerdeFactory keySerdeFactory = new RowBasedKeySerdeFactory( includeTimestamp, query.getContextSortByDimsFirst(), query.getDimensions(), - querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint), + maxMergingDictionarySize / (concurrencyHint == -1 ? 1 : concurrencyHint), valueTypes, aggregatorFactories, limitSpec @@ -267,7 +274,7 @@ public class RowBasedGrouperHelper includeTimestamp, query.getContextSortByDimsFirst(), query.getDimensions(), - querySpecificConfig.getMaxMergingDictionarySize(), // use entire dictionary space for combining key serde + maxMergingDictionarySize, // use entire dictionary space for combining key serde valueTypes, aggregatorFactories, limitSpec diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index 2c7811c8ff1..ceaa75f85d4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregatorAdapters; import org.apache.druid.query.dimension.DimensionSpec; @@ -130,6 +131,7 @@ public class VectorGroupByEngine @Nullable final Filter filter, final Interval interval, final GroupByQueryConfig config, + final DruidProcessingConfig processingConfig, @Nullable final GroupByQueryMetrics groupByQueryMetrics ) { @@ -190,6 +192,7 @@ public class VectorGroupByEngine return new VectorGroupByEngineIterator( query, config, + processingConfig, storageAdapter, cursor, interval, @@ -228,6 +231,7 @@ public class VectorGroupByEngine { private final GroupByQuery query; private final GroupByQueryConfig querySpecificConfig; + private final DruidProcessingConfig processingConfig; private final StorageAdapter storageAdapter; private final VectorCursor cursor; private final List selectors; @@ -258,7 +262,8 @@ public class VectorGroupByEngine VectorGroupByEngineIterator( final GroupByQuery query, - final GroupByQueryConfig config, + final GroupByQueryConfig querySpecificConfig, + final DruidProcessingConfig processingConfig, final StorageAdapter storageAdapter, final VectorCursor cursor, final Interval queryInterval, @@ -268,7 +273,8 @@ public class VectorGroupByEngine ) { this.query = query; - this.querySpecificConfig = config; + this.querySpecificConfig = querySpecificConfig; + this.processingConfig = processingConfig; this.storageAdapter = storageAdapter; this.cursor = cursor; this.selectors = selectors; @@ -433,7 +439,7 @@ public class VectorGroupByEngine // Advance bucketInterval. bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null; break; - } else if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + } else if (selectorInternalFootprint > querySpecificConfig.getActualMaxSelectorDictionarySize(processingConfig)) { break; } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index c4ee7613f7d..e6adfa4bee1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -441,6 +441,7 @@ public class GroupByStrategyV2 implements GroupByStrategy wasQueryPushedDown ? queryToRun : subquery, subqueryResult, configSupplier.get(), + processingConfig, resource, spillMapper, processingConfig.getTmpDir(), @@ -513,6 +514,7 @@ public class GroupByStrategyV2 implements GroupByStrategy baseSubtotalQuery, queryResult, configSupplier.get(), + processingConfig, resource, spillMapper, processingConfig.getTmpDir(), @@ -575,6 +577,7 @@ public class GroupByStrategyV2 implements GroupByStrategy subtotalQuery, resultSupplierOneFinal.results(subTotalDimensionSpec), configSupplier.get(), + processingConfig, resource, spillMapper, processingConfig.getTmpDir(), @@ -680,6 +683,7 @@ public class GroupByStrategyV2 implements GroupByStrategy { return new GroupByMergingQueryRunnerV2( configSupplier.get(), + processingConfig, queryProcessingPool, queryWatcher, queryRunners, @@ -703,6 +707,7 @@ public class GroupByStrategyV2 implements GroupByStrategy storageAdapter, bufferPool, configSupplier.get().withOverrides(query), + processingConfig, groupByQueryMetrics ); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java index 0ce29983cc6..869bd75765d 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java @@ -40,7 +40,7 @@ public class GroupByQueryConfigTest .put("maxResults", "3") .put("maxOnDiskStorage", "4") .put("maxSelectorDictionarySize", "5") - .put("maxMergingDictionarySize", "6") + .put("maxMergingDictionarySize", "6M") .put("bufferGrouperMaxLoadFactor", "7") .build(); @@ -55,8 +55,8 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config.getMaxIntermediateRows()); Assert.assertEquals(3, config.getMaxResults()); Assert.assertEquals(4, config.getMaxOnDiskStorage()); - Assert.assertEquals(5, config.getMaxSelectorDictionarySize()); - Assert.assertEquals(6, config.getMaxMergingDictionarySize()); + Assert.assertEquals(5, config.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(6_000_000, config.getConfiguredMaxMergingDictionarySize()); Assert.assertEquals(7.0, config.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertFalse(config.isApplyLimitPushDownToSegment()); } @@ -79,8 +79,8 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config2.getMaxIntermediateRows()); Assert.assertEquals(3, config2.getMaxResults()); Assert.assertEquals(4, config2.getMaxOnDiskStorage()); - Assert.assertEquals(5, config2.getMaxSelectorDictionarySize()); - Assert.assertEquals(6, config2.getMaxMergingDictionarySize()); + Assert.assertEquals(5, config2.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(6_000_000, config2.getConfiguredMaxMergingDictionarySize()); Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertFalse(config2.isApplyLimitPushDownToSegment()); } @@ -113,9 +113,57 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config2.getMaxIntermediateRows()); Assert.assertEquals(2, config2.getMaxResults()); Assert.assertEquals(0, config2.getMaxOnDiskStorage()); - Assert.assertEquals(3, config2.getMaxSelectorDictionarySize()); - Assert.assertEquals(4, config2.getMaxMergingDictionarySize()); + Assert.assertEquals(5 /* Can't override */, config2.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(6_000_000 /* Can't override */, config2.getConfiguredMaxMergingDictionarySize()); Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertTrue(config2.isApplyLimitPushDownToSegment()); } + + @Test + public void testAutomaticMergingDictionarySize() + { + final GroupByQueryConfig config = MAPPER.convertValue( + ImmutableMap.of("maxMergingDictionarySize", "0"), + GroupByQueryConfig.class + ); + + Assert.assertEquals(GroupByQueryConfig.AUTOMATIC, config.getConfiguredMaxMergingDictionarySize()); + Assert.assertEquals(150_000_000, config.getActualMaxMergingDictionarySize(1_000_000_000, 2)); + } + + @Test + public void testNonAutomaticMergingDictionarySize() + { + final GroupByQueryConfig config = MAPPER.convertValue( + ImmutableMap.of("maxMergingDictionarySize", "100"), + GroupByQueryConfig.class + ); + + Assert.assertEquals(100, config.getConfiguredMaxMergingDictionarySize()); + Assert.assertEquals(100, config.getActualMaxMergingDictionarySize(1_000_000_000, 2)); + } + + @Test + public void testAutomaticSelectorDictionarySize() + { + final GroupByQueryConfig config = MAPPER.convertValue( + ImmutableMap.of("maxSelectorDictionarySize", "0"), + GroupByQueryConfig.class + ); + + Assert.assertEquals(GroupByQueryConfig.AUTOMATIC, config.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(50_000_000, config.getActualMaxSelectorDictionarySize(1_000_000_000, 2)); + } + + @Test + public void testNonAutomaticSelectorDictionarySize() + { + final GroupByQueryConfig config = MAPPER.convertValue( + ImmutableMap.of("maxSelectorDictionarySize", "100"), + GroupByQueryConfig.class + ); + + Assert.assertEquals(100, config.getConfiguredMaxSelectorDictionarySize()); + Assert.assertEquals(100, config.getActualMaxSelectorDictionarySize(1_000_000_000, 2)); + } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 4a3a0b5cf4d..71274c9f788 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -303,13 +303,13 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest } @Override - public long getMaxSelectorDictionarySize() + public long getConfiguredMaxSelectorDictionarySize() { return 20; } @Override - public long getMaxMergingDictionarySize() + public long getConfiguredMaxMergingDictionarySize() { return 400; } @@ -2956,7 +2956,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest List expectedResults = null; if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough aggregation buffer space to execute this query"); + expectedException.expectMessage("Not enough merge buffer memory to execute this query"); } else { expectedResults = Arrays.asList( makeRow(query, "2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), @@ -3003,68 +3003,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest TestHelper.assertExpectedObjects(expectedResults, results, "overide-maxOnDiskStorage"); } - @Test - public void testNotEnoughDictionarySpaceThroughContextOverride() - { - GroupByQuery query = makeQueryBuilder() - .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) - .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) - .setDimensions(new DefaultDimensionSpec("quality", "alias")) - .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) - .setGranularity(QueryRunnerTestHelper.DAY_GRAN) - .overrideContext(ImmutableMap.of("maxOnDiskStorage", 0, "maxMergingDictionarySize", 1)) - .build(); - - List expectedResults = null; - if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { - expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough dictionary space to execute this query"); - } else { - expectedResults = Arrays.asList( - makeRow(query, "2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), - makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), - makeRow( - query, - "2011-04-01", - "alias", - "entertainment", - "rows", - 1L, - "idx", - 158L - ), - makeRow(query, "2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), - makeRow(query, "2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), - makeRow(query, "2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), - makeRow(query, "2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), - makeRow(query, "2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), - makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), - - makeRow(query, "2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), - makeRow(query, "2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), - makeRow( - query, - "2011-04-02", - "alias", - "entertainment", - "rows", - 1L, - "idx", - 166L - ), - makeRow(query, "2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), - makeRow(query, "2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), - makeRow(query, "2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), - makeRow(query, "2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), - makeRow(query, "2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), - makeRow(query, "2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) - ); - } - - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - TestHelper.assertExpectedObjects(expectedResults, results, "dictionary-space"); - } - @Test public void testNotEnoughDiskSpaceThroughContextOverride() { @@ -3074,7 +3012,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .setDimensions(new DefaultDimensionSpec("quality", "alias")) .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index")) .setGranularity(QueryRunnerTestHelper.DAY_GRAN) - .overrideContext(ImmutableMap.of("maxOnDiskStorage", 1, "maxMergingDictionarySize", 1)) + .overrideContext(ImmutableMap.of("maxOnDiskStorage", 1, GroupByQueryConfig.CTX_KEY_BUFFER_GROUPER_MAX_SIZE, 1)) .build(); List expectedResults = null; @@ -3084,7 +3022,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest // The error message always mentions disk if you have spilling enabled (maxOnDiskStorage > 0) expectedException.expectMessage("Not enough disk space to execute this query"); } else { - expectedException.expectMessage("Not enough dictionary space to execute this query"); + expectedException.expectMessage("Not enough merge buffer memory to execute this query"); } } else { expectedResults = Arrays.asList( @@ -3173,7 +3111,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } else { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough aggregation buffer space to execute this query"); + expectedException.expectMessage("Not enough merge buffer memory to execute this query"); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java index c4a3e903eaf..1c36cdfc9ff 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java @@ -27,6 +27,7 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.epinephelinae.VectorGrouper; import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator; import org.apache.druid.segment.ColumnProcessors; @@ -82,6 +83,7 @@ public class VectorGroupByEngineIteratorTest extends InitializedNullHandlingTest final VectorGroupByEngineIterator iterator = new VectorGroupByEngineIterator( query, new GroupByQueryConfig(), + GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG, storageAdapter, cursor, interval, diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index 3219d3f87d6..5c05f7ccc8f 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -59,6 +59,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; @@ -1271,7 +1272,7 @@ public class SqlResourceTest extends CalciteTestBase false, false, false, - ImmutableMap.of("maxMergingDictionarySize", 1, BaseQuery.SQL_QUERY_ID, "id"), + ImmutableMap.of(GroupByQueryConfig.CTX_KEY_BUFFER_GROUPER_MAX_SIZE, 1, BaseQuery.SQL_QUERY_ID, "id"), null ) ).lhs;