Automatic sizing for GroupBy dictionaries. (#12763)

* Automatic sizing for GroupBy dictionary sizes.

Merging and selector dictionary sizes currently both default to 100MB.
This is not optimal, because it can lead to OOM on small servers and
insufficient resource utilization on larger servers. It also invites
end users to try to tune it when queries run out of dictionary space,
which can make things worse if the end user sets it to too high.

So, this patch:

- Adds automatic tuning for selector and merge dictionaries. Selectors
  use up to 15% of the heap and merge buffers use up to 30% of the heap
  (aggregate across all queries).

- Updates out-of-memory error messages to emphasize enabling disk
  spilling vs. increasing memory parameters. With the memory parameters
  automatically sized, it is more likely that an end user will get
  benefit from enabling disk spilling.

- Removes the query context parameters that allow lowering of configured
  dictionary sizes. These complicate the calculation, and I don't see a
  reasonable use case for them.

* Adjust tests.

* Review adjustments.

* Additional comment.

* Remove unused import.
This commit is contained in:
Gian Merlino 2022-07-11 08:20:50 -07:00 committed by GitHub
parent d2576584a0
commit 97207cdcc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 219 additions and 121 deletions

View File

@ -386,19 +386,16 @@ Supported runtime properties:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. If set to `0` (automatic), each query's dictionary can use 10% of the Java heap divided by `druid.processing.numMergeBuffers`, or 1GB, whichever is smaller.<br /><br />See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details on changing this property.|0 (automatic)|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. If set to `0` (automatic), each query's dictionary uses 30% of the Java heap divided by `druid.processing.numMergeBuffers`, or 1GB, whichever is smaller.<br /><br />See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details on changing this property.|0 (automatic)|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
Supported query contexts:
|Key|Description|
|---|-----------|
|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
### Advanced configurations
#### Common configurations for all groupBy strategies

View File

@ -20,14 +20,19 @@
package org.apache.druid.query.groupby;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.utils.JvmUtils;
/**
*
*/
public class GroupByQueryConfig
{
public static final long AUTOMATIC = 0;
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown";
public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown";
@ -36,20 +41,32 @@ public class GroupByQueryConfig
public static final String CTX_KEY_EXECUTING_NESTED_QUERY = "executingNestedQuery";
public static final String CTX_KEY_ARRAY_RESULT_ROWS = "resultAsArray";
public static final String CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING = "groupByEnableMultiValueUnnesting";
public static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
private static final String CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS = "bufferGrouperInitialBuckets";
private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor";
private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
private static final String CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE = "maxSelectorDictionarySize";
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
private static final String CTX_KEY_MERGE_THREAD_LOCAL = "mergeThreadLocal";
// Constants for sizing merging and selector dictionaries. Rationale for these constants:
// 1) In no case do we want total aggregate dictionary size to exceed 40% of max memory.
// 2) In no case do we want any dictionary to exceed 1GB of memory: if heaps are giant, better to spill at
// "reasonable" sizes rather than get giant dictionaries. (There is probably some other reason the user
// wanted a giant heap, so we shouldn't monopolize it with dictionaries.)
// 3) Use somewhat more memory for merging dictionary vs. selector dictionaries, because if a merging
// dictionary is full we must spill to disk, whereas if a selector dictionary is full we simply emit
// early to the merge buffer. So, a merging dictionary filling up has a more severe impact on
// query performance.
private static final double MERGING_DICTIONARY_HEAP_FRACTION = 0.3;
private static final double SELECTOR_DICTIONARY_HEAP_FRACTION = 0.1;
private static final long MIN_AUTOMATIC_DICTIONARY_SIZE = 1;
private static final long MAX_AUTOMATIC_DICTIONARY_SIZE = 1_000_000_000;
@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
@ -75,11 +92,11 @@ public class GroupByQueryConfig
@JsonProperty
// Size of on-heap string dictionary for merging, per-processing-thread; when exceeded, partial results will be
// emitted to the merge buffer early.
private long maxSelectorDictionarySize = 100_000_000L;
private HumanReadableBytes maxSelectorDictionarySize = HumanReadableBytes.valueOf(AUTOMATIC);
@JsonProperty
// Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk
private long maxMergingDictionarySize = 100_000_000L;
private HumanReadableBytes maxMergingDictionarySize = HumanReadableBytes.valueOf(AUTOMATIC);
@JsonProperty
// Max on-disk temporary storage, per-query; when exceeded, the query fails
@ -165,14 +182,80 @@ public class GroupByQueryConfig
return bufferGrouperInitialBuckets;
}
public long getMaxSelectorDictionarySize()
/**
* For unit tests. Production code should use {@link #getActualMaxSelectorDictionarySize}.
*/
long getConfiguredMaxSelectorDictionarySize()
{
return maxSelectorDictionarySize;
return maxSelectorDictionarySize.getBytes();
}
public long getMaxMergingDictionarySize()
/**
* For unit tests. Production code should use {@link #getActualMaxSelectorDictionarySize}.
*/
long getActualMaxSelectorDictionarySize(final long maxHeapSize, final int numConcurrentQueries)
{
return maxMergingDictionarySize;
if (maxSelectorDictionarySize.getBytes() == AUTOMATIC) {
final long heapForDictionaries = (long) (maxHeapSize * SELECTOR_DICTIONARY_HEAP_FRACTION);
return Math.max(
MIN_AUTOMATIC_DICTIONARY_SIZE,
Math.min(
MAX_AUTOMATIC_DICTIONARY_SIZE,
heapForDictionaries / numConcurrentQueries
)
);
} else {
return maxSelectorDictionarySize.getBytes();
}
}
public long getActualMaxSelectorDictionarySize(final DruidProcessingConfig processingConfig)
{
return getActualMaxSelectorDictionarySize(
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
// numMergeBuffers is the number of groupBy queries that can run simultaneously
processingConfig.getNumMergeBuffers()
);
}
/**
* For unit tests. Production code should use {@link #getActualMaxMergingDictionarySize}.
*/
long getConfiguredMaxMergingDictionarySize()
{
return maxMergingDictionarySize.getBytes();
}
/**
* For unit tests. Production code should use {@link #getActualMaxMergingDictionarySize}.
*/
public long getActualMaxMergingDictionarySize(final long maxHeapSize, final int numConcurrentQueries)
{
if (maxMergingDictionarySize.getBytes() == AUTOMATIC) {
final long heapForDictionaries = (long) (maxHeapSize * MERGING_DICTIONARY_HEAP_FRACTION);
return Math.max(
MIN_AUTOMATIC_DICTIONARY_SIZE,
Math.min(
MAX_AUTOMATIC_DICTIONARY_SIZE,
heapForDictionaries / numConcurrentQueries
)
);
} else {
return maxMergingDictionarySize.getBytes();
}
}
public long getActualMaxMergingDictionarySize(final DruidProcessingConfig processingConfig)
{
return getActualMaxMergingDictionarySize(
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
// numMergeBuffers is the number of groupBy queries that can run simultaneously
processingConfig.getNumMergeBuffers()
);
}
public long getMaxOnDiskStorage()
@ -259,20 +342,8 @@ public class GroupByQueryConfig
((Number) query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(),
getMaxOnDiskStorage()
);
newConfig.maxSelectorDictionarySize = Math.min(
((Number) query.getContextValue(
CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE,
getMaxSelectorDictionarySize()
)).longValue(),
getMaxSelectorDictionarySize()
);
newConfig.maxMergingDictionarySize = Math.min(
((Number) query.getContextValue(
CTX_KEY_MAX_MERGING_DICTIONARY_SIZE,
getMaxMergingDictionarySize()
)).longValue(),
getMaxMergingDictionarySize()
);
newConfig.maxSelectorDictionarySize = maxSelectorDictionarySize; // No overrides
newConfig.maxMergingDictionarySize = maxMergingDictionarySize; // No overrides
newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
newConfig.applyLimitPushDownToSegment = query.getContextBoolean(
CTX_KEY_APPLY_LIMIT_PUSH_DOWN_TO_SEGMENT,

View File

@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
@ -87,6 +88,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
private static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";
private final GroupByQueryConfig config;
private final DruidProcessingConfig processingConfig;
private final Iterable<QueryRunner<ResultRow>> queryables;
private final QueryProcessingPool queryProcessingPool;
private final QueryWatcher queryWatcher;
@ -98,6 +100,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
public GroupByMergingQueryRunnerV2(
GroupByQueryConfig config,
DruidProcessingConfig processingConfig,
QueryProcessingPool queryProcessingPool,
QueryWatcher queryWatcher,
Iterable<QueryRunner<ResultRow>> queryables,
@ -109,6 +112,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
)
{
this.config = config;
this.processingConfig = processingConfig;
this.queryProcessingPool = queryProcessingPool;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
@ -197,6 +201,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
query,
null,
config,
processingConfig,
Suppliers.ofInstance(mergeBufferHolder.get()),
combineBufferHolder,
concurrencyHint,

View File

@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -121,6 +122,7 @@ public class GroupByQueryEngineV2
@Nullable final StorageAdapter storageAdapter,
final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool,
final GroupByQueryConfig querySpecificConfig,
final DruidProcessingConfig processingConfig,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
@ -164,6 +166,7 @@ public class GroupByQueryEngineV2
filter,
interval,
querySpecificConfig,
processingConfig,
groupByQueryMetrics
);
} else {
@ -173,6 +176,7 @@ public class GroupByQueryEngineV2
bufferHolder.get(),
fudgeTimestamp,
querySpecificConfig,
processingConfig,
filter,
interval,
groupByQueryMetrics
@ -193,6 +197,7 @@ public class GroupByQueryEngineV2
final ByteBuffer processingBuffer,
@Nullable final DateTime fudgeTimestamp,
final GroupByQueryConfig querySpecificConfig,
final DruidProcessingConfig processingConfig,
@Nullable final Filter filter,
final Interval interval,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
@ -237,6 +242,7 @@ public class GroupByQueryEngineV2
return new ArrayAggregateIterator(
query,
querySpecificConfig,
processingConfig,
cursor,
processingBuffer,
fudgeTimestamp,
@ -248,6 +254,7 @@ public class GroupByQueryEngineV2
return new HashAggregateIterator(
query,
querySpecificConfig,
processingConfig,
cursor,
processingBuffer,
fudgeTimestamp,
@ -465,11 +472,12 @@ public class GroupByQueryEngineV2
protected CloseableGrouperIterator<KeyType, ResultRow> delegate = null;
protected final boolean allSingleValueDims;
protected final boolean allowMultiValueGrouping;
protected final long maxSelectorFootprint;
public GroupByEngineIterator(
final GroupByQuery query,
final GroupByQueryConfig querySpecificConfig,
final DruidProcessingConfig processingConfig,
final Cursor cursor,
final ByteBuffer buffer,
@Nullable final DateTime fudgeTimestamp,
@ -479,6 +487,7 @@ public class GroupByQueryEngineV2
{
this.query = query;
this.querySpecificConfig = querySpecificConfig;
this.maxSelectorFootprint = querySpecificConfig.getActualMaxSelectorDictionarySize(processingConfig);
this.cursor = cursor;
this.buffer = buffer;
this.keySerde = new GroupByEngineKeySerde(dims, query);
@ -632,6 +641,7 @@ public class GroupByQueryEngineV2
public HashAggregateIterator(
GroupByQuery query,
GroupByQueryConfig querySpecificConfig,
DruidProcessingConfig processingConfig,
Cursor cursor,
ByteBuffer buffer,
@Nullable DateTime fudgeTimestamp,
@ -639,7 +649,7 @@ public class GroupByQueryEngineV2
boolean allSingleValueDims
)
{
super(query, querySpecificConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims);
super(query, querySpecificConfig, processingConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims);
final int dimCount = query.getDimensions().size();
stack = new int[dimCount];
@ -747,7 +757,7 @@ public class GroupByQueryEngineV2
// Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes
// us to go past the limit.)
if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) {
if (selectorInternalFootprint > maxSelectorFootprint) {
return;
}
}
@ -836,7 +846,7 @@ public class GroupByQueryEngineV2
// Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes
// us to go past the limit.)
if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) {
if (selectorInternalFootprint > maxSelectorFootprint) {
return;
}
}
@ -870,6 +880,7 @@ public class GroupByQueryEngineV2
public ArrayAggregateIterator(
GroupByQuery query,
GroupByQueryConfig querySpecificConfig,
DruidProcessingConfig processingConfig,
Cursor cursor,
ByteBuffer buffer,
@Nullable DateTime fudgeTimestamp,
@ -878,7 +889,7 @@ public class GroupByQueryEngineV2
int cardinality
)
{
super(query, querySpecificConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims);
super(query, querySpecificConfig, processingConfig, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims);
this.cardinality = cardinality;
if (dims.length == 1) {
this.dim = dims[0];

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
@ -37,7 +38,6 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBas
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@ -89,6 +89,7 @@ public class GroupByRowProcessor
final GroupByQuery subquery,
final Sequence<ResultRow> rows,
final GroupByQueryConfig config,
final DruidProcessingConfig processingConfig,
final GroupByQueryResource resource,
final ObjectMapper spillMapper,
final String processingTmpDir,
@ -114,6 +115,7 @@ public class GroupByRowProcessor
query,
subquery,
querySpecificConfig,
processingConfig,
new Supplier<ByteBuffer>()
{
@Override

View File

@ -31,16 +31,21 @@ public class Groupers
private static final AggregateResult DICTIONARY_FULL_ZERO_COUNT = AggregateResult.partial(
0,
"Not enough dictionary space to execute this query. Try increasing "
+ "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
"Not enough dictionary memory to execute this query. Try enabling "
+ "disk spilling by setting druid.query.groupBy.maxOnDiskStorage to an amount of bytes "
+ "available on your machine for on-disk scratch files. "
+ "See https://druid.apache.org/docs/latest/querying/groupbyquery.html#memory-tuning-and-resource-limits "
+ "for details."
);
private static final AggregateResult HASH_TABLE_FULL_ZERO_COUNT = AggregateResult.partial(
0,
"Not enough aggregation buffer space to execute this query. Try increasing "
+ "druid.processing.buffer.sizeBytes or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
"Not enough merge buffer memory to execute this query. Try enabling "
+ "disk spilling by setting druid.query.groupBy.maxOnDiskStorage to an amount of bytes "
+ "available on your machine for on-disk scratch files. Or, if you have additional off-heap memory "
+ "available, consider increasing druid.processing.buffer.sizeBytes. "
+ "See https://druid.apache.org/docs/latest/querying/groupbyquery.html#memory-tuning-and-resource-limits "
+ "for details."
);
private static final int USED_FLAG_MASK = 0x7fffffff;

View File

@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.GroupingAggregatorFactory;
import org.apache.druid.query.dimension.ColumnSelectorStrategy;
@ -117,6 +118,7 @@ public class RowBasedGrouperHelper
final GroupByQuery query,
@Nullable final GroupByQuery subquery,
final GroupByQueryConfig config,
final DruidProcessingConfig processingConfig,
final Supplier<ByteBuffer> bufferSupplier,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
@ -127,6 +129,7 @@ public class RowBasedGrouperHelper
query,
subquery,
config,
processingConfig,
bufferSupplier,
null,
SINGLE_THREAD_CONCURRENCY_HINT,
@ -158,6 +161,7 @@ public class RowBasedGrouperHelper
* @param subquery optional subquery that we are receiving results from (see combining vs. subquery
* mode above)
* @param config groupBy query config
* @param processingConfig processing config
* @param bufferSupplier supplier of merge buffers
* @param combineBufferHolder holder of combine buffers. Unused if concurrencyHint = -1, and may be null in that case
* @param concurrencyHint -1 for single-threaded Grouper, >=1 for concurrent Grouper
@ -174,6 +178,7 @@ public class RowBasedGrouperHelper
final GroupByQuery query,
@Nullable final GroupByQuery subquery,
final GroupByQueryConfig config,
final DruidProcessingConfig processingConfig,
final Supplier<ByteBuffer> bufferSupplier,
@Nullable final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder,
final int concurrencyHint,
@ -235,11 +240,13 @@ public class RowBasedGrouperHelper
aggregatorFactories = query.getAggregatorSpecs().toArray(new AggregatorFactory[0]);
}
final long maxMergingDictionarySize = querySpecificConfig.getActualMaxMergingDictionarySize(processingConfig);
final Grouper.KeySerdeFactory<RowBasedKey> keySerdeFactory = new RowBasedKeySerdeFactory(
includeTimestamp,
query.getContextSortByDimsFirst(),
query.getDimensions(),
querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint),
maxMergingDictionarySize / (concurrencyHint == -1 ? 1 : concurrencyHint),
valueTypes,
aggregatorFactories,
limitSpec
@ -267,7 +274,7 @@ public class RowBasedGrouperHelper
includeTimestamp,
query.getContextSortByDimsFirst(),
query.getDimensions(),
querySpecificConfig.getMaxMergingDictionarySize(), // use entire dictionary space for combining key serde
maxMergingDictionarySize, // use entire dictionary space for combining key serde
valueTypes,
aggregatorFactories,
limitSpec

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.dimension.DimensionSpec;
@ -130,6 +131,7 @@ public class VectorGroupByEngine
@Nullable final Filter filter,
final Interval interval,
final GroupByQueryConfig config,
final DruidProcessingConfig processingConfig,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
@ -190,6 +192,7 @@ public class VectorGroupByEngine
return new VectorGroupByEngineIterator(
query,
config,
processingConfig,
storageAdapter,
cursor,
interval,
@ -228,6 +231,7 @@ public class VectorGroupByEngine
{
private final GroupByQuery query;
private final GroupByQueryConfig querySpecificConfig;
private final DruidProcessingConfig processingConfig;
private final StorageAdapter storageAdapter;
private final VectorCursor cursor;
private final List<GroupByVectorColumnSelector> selectors;
@ -258,7 +262,8 @@ public class VectorGroupByEngine
VectorGroupByEngineIterator(
final GroupByQuery query,
final GroupByQueryConfig config,
final GroupByQueryConfig querySpecificConfig,
final DruidProcessingConfig processingConfig,
final StorageAdapter storageAdapter,
final VectorCursor cursor,
final Interval queryInterval,
@ -268,7 +273,8 @@ public class VectorGroupByEngine
)
{
this.query = query;
this.querySpecificConfig = config;
this.querySpecificConfig = querySpecificConfig;
this.processingConfig = processingConfig;
this.storageAdapter = storageAdapter;
this.cursor = cursor;
this.selectors = selectors;
@ -433,7 +439,7 @@ public class VectorGroupByEngine
// Advance bucketInterval.
bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null;
break;
} else if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) {
} else if (selectorInternalFootprint > querySpecificConfig.getActualMaxSelectorDictionarySize(processingConfig)) {
break;
}
}

View File

@ -441,6 +441,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
wasQueryPushedDown ? queryToRun : subquery,
subqueryResult,
configSupplier.get(),
processingConfig,
resource,
spillMapper,
processingConfig.getTmpDir(),
@ -513,6 +514,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
baseSubtotalQuery,
queryResult,
configSupplier.get(),
processingConfig,
resource,
spillMapper,
processingConfig.getTmpDir(),
@ -575,6 +577,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
subtotalQuery,
resultSupplierOneFinal.results(subTotalDimensionSpec),
configSupplier.get(),
processingConfig,
resource,
spillMapper,
processingConfig.getTmpDir(),
@ -680,6 +683,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
{
return new GroupByMergingQueryRunnerV2(
configSupplier.get(),
processingConfig,
queryProcessingPool,
queryWatcher,
queryRunners,
@ -703,6 +707,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
storageAdapter,
bufferPool,
configSupplier.get().withOverrides(query),
processingConfig,
groupByQueryMetrics
);
}

View File

@ -40,7 +40,7 @@ public class GroupByQueryConfigTest
.put("maxResults", "3")
.put("maxOnDiskStorage", "4")
.put("maxSelectorDictionarySize", "5")
.put("maxMergingDictionarySize", "6")
.put("maxMergingDictionarySize", "6M")
.put("bufferGrouperMaxLoadFactor", "7")
.build();
@ -55,8 +55,8 @@ public class GroupByQueryConfigTest
Assert.assertEquals(2, config.getMaxIntermediateRows());
Assert.assertEquals(3, config.getMaxResults());
Assert.assertEquals(4, config.getMaxOnDiskStorage());
Assert.assertEquals(5, config.getMaxSelectorDictionarySize());
Assert.assertEquals(6, config.getMaxMergingDictionarySize());
Assert.assertEquals(5, config.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(6_000_000, config.getConfiguredMaxMergingDictionarySize());
Assert.assertEquals(7.0, config.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertFalse(config.isApplyLimitPushDownToSegment());
}
@ -79,8 +79,8 @@ public class GroupByQueryConfigTest
Assert.assertEquals(2, config2.getMaxIntermediateRows());
Assert.assertEquals(3, config2.getMaxResults());
Assert.assertEquals(4, config2.getMaxOnDiskStorage());
Assert.assertEquals(5, config2.getMaxSelectorDictionarySize());
Assert.assertEquals(6, config2.getMaxMergingDictionarySize());
Assert.assertEquals(5, config2.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(6_000_000, config2.getConfiguredMaxMergingDictionarySize());
Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertFalse(config2.isApplyLimitPushDownToSegment());
}
@ -113,9 +113,57 @@ public class GroupByQueryConfigTest
Assert.assertEquals(2, config2.getMaxIntermediateRows());
Assert.assertEquals(2, config2.getMaxResults());
Assert.assertEquals(0, config2.getMaxOnDiskStorage());
Assert.assertEquals(3, config2.getMaxSelectorDictionarySize());
Assert.assertEquals(4, config2.getMaxMergingDictionarySize());
Assert.assertEquals(5 /* Can't override */, config2.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(6_000_000 /* Can't override */, config2.getConfiguredMaxMergingDictionarySize());
Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertTrue(config2.isApplyLimitPushDownToSegment());
}
@Test
public void testAutomaticMergingDictionarySize()
{
final GroupByQueryConfig config = MAPPER.convertValue(
ImmutableMap.of("maxMergingDictionarySize", "0"),
GroupByQueryConfig.class
);
Assert.assertEquals(GroupByQueryConfig.AUTOMATIC, config.getConfiguredMaxMergingDictionarySize());
Assert.assertEquals(150_000_000, config.getActualMaxMergingDictionarySize(1_000_000_000, 2));
}
@Test
public void testNonAutomaticMergingDictionarySize()
{
final GroupByQueryConfig config = MAPPER.convertValue(
ImmutableMap.of("maxMergingDictionarySize", "100"),
GroupByQueryConfig.class
);
Assert.assertEquals(100, config.getConfiguredMaxMergingDictionarySize());
Assert.assertEquals(100, config.getActualMaxMergingDictionarySize(1_000_000_000, 2));
}
@Test
public void testAutomaticSelectorDictionarySize()
{
final GroupByQueryConfig config = MAPPER.convertValue(
ImmutableMap.of("maxSelectorDictionarySize", "0"),
GroupByQueryConfig.class
);
Assert.assertEquals(GroupByQueryConfig.AUTOMATIC, config.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(50_000_000, config.getActualMaxSelectorDictionarySize(1_000_000_000, 2));
}
@Test
public void testNonAutomaticSelectorDictionarySize()
{
final GroupByQueryConfig config = MAPPER.convertValue(
ImmutableMap.of("maxSelectorDictionarySize", "100"),
GroupByQueryConfig.class
);
Assert.assertEquals(100, config.getConfiguredMaxSelectorDictionarySize());
Assert.assertEquals(100, config.getActualMaxSelectorDictionarySize(1_000_000_000, 2));
}
}

View File

@ -303,13 +303,13 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
}
@Override
public long getMaxSelectorDictionarySize()
public long getConfiguredMaxSelectorDictionarySize()
{
return 20;
}
@Override
public long getMaxMergingDictionarySize()
public long getConfiguredMaxMergingDictionarySize()
{
return 400;
}
@ -2956,7 +2956,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
List<ResultRow> expectedResults = null;
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Not enough aggregation buffer space to execute this query");
expectedException.expectMessage("Not enough merge buffer memory to execute this query");
} else {
expectedResults = Arrays.asList(
makeRow(query, "2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
@ -3003,68 +3003,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
TestHelper.assertExpectedObjects(expectedResults, results, "overide-maxOnDiskStorage");
}
@Test
public void testNotEnoughDictionarySpaceThroughContextOverride()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.overrideContext(ImmutableMap.of("maxOnDiskStorage", 0, "maxMergingDictionarySize", 1))
.build();
List<ResultRow> expectedResults = null;
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Not enough dictionary space to execute this query");
} else {
expectedResults = Arrays.asList(
makeRow(query, "2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
makeRow(
query,
"2011-04-01",
"alias",
"entertainment",
"rows",
1L,
"idx",
158L
),
makeRow(query, "2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
makeRow(query, "2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
makeRow(query, "2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
makeRow(query, "2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
makeRow(query, "2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
makeRow(query, "2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
makeRow(query, "2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
makeRow(
query,
"2011-04-02",
"alias",
"entertainment",
"rows",
1L,
"idx",
166L
),
makeRow(query, "2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
makeRow(query, "2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
makeRow(query, "2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
makeRow(query, "2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
makeRow(query, "2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
makeRow(query, "2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
}
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "dictionary-space");
}
@Test
public void testNotEnoughDiskSpaceThroughContextOverride()
{
@ -3074,7 +3012,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.overrideContext(ImmutableMap.of("maxOnDiskStorage", 1, "maxMergingDictionarySize", 1))
.overrideContext(ImmutableMap.of("maxOnDiskStorage", 1, GroupByQueryConfig.CTX_KEY_BUFFER_GROUPER_MAX_SIZE, 1))
.build();
List<ResultRow> expectedResults = null;
@ -3084,7 +3022,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
// The error message always mentions disk if you have spilling enabled (maxOnDiskStorage > 0)
expectedException.expectMessage("Not enough disk space to execute this query");
} else {
expectedException.expectMessage("Not enough dictionary space to execute this query");
expectedException.expectMessage("Not enough merge buffer memory to execute this query");
}
} else {
expectedResults = Arrays.asList(
@ -3173,7 +3111,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else {
expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Not enough aggregation buffer space to execute this query");
expectedException.expectMessage("Not enough merge buffer memory to execute this query");
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator;
import org.apache.druid.segment.ColumnProcessors;
@ -82,6 +83,7 @@ public class VectorGroupByEngineIteratorTest extends InitializedNullHandlingTest
final VectorGroupByEngineIterator iterator = new VectorGroupByEngineIterator(
query,
new GroupByQueryConfig(),
GroupByQueryRunnerTest.DEFAULT_PROCESSING_CONFIG,
storageAdapter,
cursor,
interval,

View File

@ -59,6 +59,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
@ -1271,7 +1272,7 @@ public class SqlResourceTest extends CalciteTestBase
false,
false,
false,
ImmutableMap.of("maxMergingDictionarySize", 1, BaseQuery.SQL_QUERY_ID, "id"),
ImmutableMap.of(GroupByQueryConfig.CTX_KEY_BUFFER_GROUPER_MAX_SIZE, 1, BaseQuery.SQL_QUERY_ID, "id"),
null
)
).lhs;