From 875e0696e01c4348fa31c77ec6fa333a324a53d8 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 8 Mar 2022 13:13:11 -0800 Subject: [PATCH] GroupBy: Cap dictionary-building selector memory usage. (#12309) * GroupBy: Cap dictionary-building selector memory usage. New context parameter "maxSelectorDictionarySize" controls when the per-segment processing code should return early and trigger a trip to the merge buffer. Includes: - Vectorized and nonvectorized implementations. - Adjustments to GroupByQueryRunnerTest to exercise this code in the v2SmallDictionary suite. (Both the selector dictionary and the merging dictionary will be small in that suite.) - Tests for the new config parameter. * Fix issues from tests. * Add "pre-existing" to dictionary. * Simplify GroupByColumnSelectorStrategy interface by removing one of the writeToKeyBuffer methods. * Adjustments from review comments. --- docs/configuration/index.md | 4 +- docs/querying/groupbyquery.md | 15 +++- .../query/groupby/GroupByQueryConfig.java | 24 +++++- .../epinephelinae/DictionaryBuilding.java | 63 ++++++++++++++++ .../epinephelinae/GroupByQueryEngineV2.java | 44 ++++++++++- .../epinephelinae/RowBasedGrouperHelper.java | 36 +++------ ...ayDoubleGroupByColumnSelectorStrategy.java | 6 +- ...rrayLongGroupByColumnSelectorStrategy.java | 8 +- ...yNumericGroupByColumnSelectorStrategy.java | 68 ++++++++++++----- ...ayStringGroupByColumnSelectorStrategy.java | 73 ++++++++++++++----- ...ngStringGroupByColumnSelectorStrategy.java | 43 +++++++---- .../DoubleGroupByColumnSelectorStrategy.java | 31 +++++--- .../FloatGroupByColumnSelectorStrategy.java | 31 +++++--- .../column/GroupByColumnSelectorStrategy.java | 52 ++++++++----- .../LongGroupByColumnSelectorStrategy.java | 31 +++++--- ...eNumericGroupByColumnSelectorStrategy.java | 51 ++++++++----- .../StringGroupByColumnSelectorStrategy.java | 35 ++++++--- ...alueStringGroupByVectorColumnSelector.java | 26 +++++-- .../DoubleGroupByVectorColumnSelector.java | 10 ++- .../FloatGroupByVectorColumnSelector.java | 10 ++- .../vector/GroupByVectorColumnSelector.java | 20 ++++- .../LongGroupByVectorColumnSelector.java | 10 ++- .../NilGroupByVectorColumnSelector.java | 9 ++- ...ableDoubleGroupByVectorColumnSelector.java | 10 ++- ...lableFloatGroupByVectorColumnSelector.java | 10 ++- ...llableLongGroupByVectorColumnSelector.java | 10 ++- ...alueStringGroupByVectorColumnSelector.java | 10 ++- .../vector/VectorGroupByEngine.java | 15 +++- .../query/groupby/GroupByQueryConfigTest.java | 35 +++++---- .../query/groupby/GroupByQueryRunnerTest.java | 6 ++ ...ubleGroupByColumnSelectorStrategyTest.java | 12 +-- ...LongGroupByColumnSelectorStrategyTest.java | 12 +-- ...ringGroupByColumnSelectorStrategyTest.java | 6 +- website/.spelling | 2 + 34 files changed, 601 insertions(+), 227 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuilding.java diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 727876d3ef3..d22faae18c2 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -2047,13 +2047,15 @@ Supported runtime properties: |Property|Description|Default| |--------|-----------|-------| -|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000| +|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000| +|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000| |`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| Supported query contexts: |Key|Description| |---|-----------| +|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.| |`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.| |`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md index 550238786b6..4defd0001d2 100644 --- a/docs/querying/groupbyquery.md +++ b/docs/querying/groupbyquery.md @@ -275,13 +275,18 @@ as the index, so the aggregated values in the array can be accessed directly wit ### Memory tuning and resource limits -When using groupBy v2, three parameters control resource usage and limits: +When using groupBy v2, four parameters control resource usage and limits: - `druid.processing.buffer.sizeBytes`: size of the off-heap hash table used for aggregation, per query, in bytes. At most `druid.processing.numMergeBuffers` of these will be created at once, which also serves as an upper limit on the number of concurrently running groupBy queries. -- `druid.query.groupBy.maxMergingDictionarySize`: size of the on-heap dictionary used when grouping on strings, per query, -in bytes. Note that this is based on a rough estimate of the dictionary size, not the actual size. +- `druid.query.groupBy.maxSelectorDictionarySize`: size of the on-heap segment-level dictionary used when grouping on +string or array-valued expressions that do not have pre-existing dictionaries. There is at most one dictionary per +processing thread; therefore there are up to `druid.processing.numThreads` of these. Note that the size is based on a +rough estimate of the dictionary footprint. +- `druid.query.groupBy.maxMergingDictionarySize`: size of the on-heap query-level dictionary used when grouping on +any string expression. There is at most one dictionary per concurrently-running query; therefore there are up to +`druid.server.http.numThreads` of these. Note that the size is based on a rough estimate of the dictionary footprint. - `druid.query.groupBy.maxOnDiskStorage`: amount of space on disk used for aggregation, per query, in bytes. By default, this is 0, which means aggregation will not use disk. @@ -381,13 +386,15 @@ Supported runtime properties: |Property|Description|Default| |--------|-----------|-------| -|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000| +|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000| +|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000| |`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| Supported query contexts: |Key|Description| |---|-----------| +|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.| |`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.| |`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java index 5e32e88f230..986f4d2920d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java @@ -24,6 +24,7 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; /** + * */ public class GroupByQueryConfig { @@ -42,6 +43,7 @@ public class GroupByQueryConfig private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor"; private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage"; + private static final String CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE = "maxSelectorDictionarySize"; private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize"; private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation"; private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree"; @@ -69,6 +71,11 @@ public class GroupByQueryConfig @JsonProperty private int bufferGrouperInitialBuckets = 0; + @JsonProperty + // Size of on-heap string dictionary for merging, per-processing-thread; when exceeded, partial results will be + // emitted to the merge buffer early. + private long maxSelectorDictionarySize = 100_000_000L; + @JsonProperty // Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk private long maxMergingDictionarySize = 100_000_000L; @@ -151,6 +158,11 @@ public class GroupByQueryConfig return bufferGrouperInitialBuckets; } + public long getMaxSelectorDictionarySize() + { + return maxSelectorDictionarySize; + } + public long getMaxMergingDictionarySize() { return maxMergingDictionarySize; @@ -230,6 +242,13 @@ public class GroupByQueryConfig ((Number) query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(), getMaxOnDiskStorage() ); + newConfig.maxSelectorDictionarySize = Math.min( + ((Number) query.getContextValue( + CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE, + getMaxSelectorDictionarySize() + )).longValue(), + getMaxSelectorDictionarySize() + ); newConfig.maxMergingDictionarySize = Math.min( ((Number) query.getContextValue( CTX_KEY_MAX_MERGING_DICTIONARY_SIZE, @@ -243,7 +262,10 @@ public class GroupByQueryConfig isApplyLimitPushDownToSegment() ); newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation()); - newConfig.forcePushDownNestedQuery = query.getContextBoolean(CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, isForcePushDownNestedQuery()); + newConfig.forcePushDownNestedQuery = query.getContextBoolean( + CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, + isForcePushDownNestedQuery() + ); newConfig.intermediateCombineDegree = query.getContextValue( CTX_KEY_INTERMEDIATE_COMBINE_DEGREE, getIntermediateCombineDegree() diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuilding.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuilding.java new file mode 100644 index 00000000000..edbe3528ef9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/DictionaryBuilding.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby.epinephelinae; + +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import org.apache.druid.segment.DimensionDictionary; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utilities for parts of the groupBy engine that need to build dictionaries. + */ +public class DictionaryBuilding +{ + // Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes + private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Long.BYTES * 5 + Integer.BYTES; + + /** + * Creates a forward dictionary (dictionary ID -> value). + */ + public static List createDictionary() + { + return new ArrayList<>(); + } + + /** + * Creates a reverse dictionary (value -> dictionary ID). If a value is not present in the reverse dictionary, + * {@link Object2IntMap#getInt} will return {@link DimensionDictionary#ABSENT_VALUE_ID}. + */ + public static Object2IntMap createReverseDictionary() + { + final Object2IntOpenHashMap m = new Object2IntOpenHashMap<>(); + m.defaultReturnValue(DimensionDictionary.ABSENT_VALUE_ID); + return m; + } + + /** + * Estimated footprint of a new entry. + */ + public static int estimateEntryFootprint(final int valueFootprint) + { + return valueFootprint + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index f38f6ce2c10..602b2eb5164 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -623,7 +623,10 @@ public class GroupByQueryEngineV2 private final ByteBuffer keyBuffer; private int stackPointer = Integer.MIN_VALUE; - protected boolean currentRowWasPartiallyAggregated = false; + private boolean currentRowWasPartiallyAggregated = false; + + // Sum of internal state footprint across all "dims". + private long selectorInternalFootprint = 0; public HashAggregateIterator( GroupByQuery query, @@ -717,12 +720,19 @@ public class GroupByQueryEngineV2 @Override protected void aggregateSingleValueDims(Grouper grouper) { + if (!currentRowWasPartiallyAggregated) { + for (GroupByColumnSelectorPlus dim : dims) { + dim.getColumnSelectorStrategy().reset(); + } + selectorInternalFootprint = 0; + } + while (!cursor.isDone()) { for (GroupByColumnSelectorPlus dim : dims) { final GroupByColumnSelectorStrategy strategy = dim.getColumnSelectorStrategy(); - strategy.writeToKeyBuffer( + selectorInternalFootprint += strategy.writeToKeyBuffer( dim.getKeyBufferPosition(), - strategy.getOnlyValue(dim.getSelector()), + dim.getSelector(), keyBuffer ); } @@ -731,13 +741,27 @@ public class GroupByQueryEngineV2 if (!grouper.aggregate(keyBuffer).isOk()) { return; } + cursor.advance(); + + // Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes + // us to go past the limit.) + if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + return; + } } } @Override protected void aggregateMultiValueDims(Grouper grouper) { + if (!currentRowWasPartiallyAggregated) { + for (GroupByColumnSelectorPlus dim : dims) { + dim.getColumnSelectorStrategy().reset(); + } + selectorInternalFootprint = 0; + } + while (!cursor.isDone()) { if (!currentRowWasPartiallyAggregated) { // Set up stack, valuess, and first grouping in keyBuffer for this row @@ -745,7 +769,7 @@ public class GroupByQueryEngineV2 for (int i = 0; i < dims.length; i++) { GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy(); - strategy.initColumnValues( + selectorInternalFootprint += strategy.initColumnValues( dims[i].getSelector(), i, valuess @@ -808,6 +832,12 @@ public class GroupByQueryEngineV2 // Advance to next row cursor.advance(); currentRowWasPartiallyAggregated = false; + + // Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes + // us to go past the limit.) + if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + return; + } } } @@ -882,6 +912,9 @@ public class GroupByQueryEngineV2 private void aggregateSingleValueDims(IntGrouper grouper) { + // No need to track strategy internal state footprint, because array-based grouping does not use strategies. + // It accesses dimension selectors directly and only works on truly dictionary-coded columns. + while (!cursor.isDone()) { final int key; if (dim != null) { @@ -900,6 +933,9 @@ public class GroupByQueryEngineV2 private void aggregateMultiValueDims(IntGrouper grouper) { + // No need to track strategy internal state footprint, because array-based grouping does not use strategies. + // It accesses dimension selectors directly and only works on truly dictionary-coded columns. + if (dim == null) { throw new ISE("dim must exist"); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 1f825f503ee..95227158bae 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -29,7 +29,6 @@ import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ListeningExecutorService; import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.objects.Object2IntMap; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.guava.SettableSupplier; @@ -63,6 +62,7 @@ import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionDictionary; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.RowAdapter; @@ -101,9 +101,6 @@ import java.util.stream.IntStream; */ public class RowBasedGrouperHelper { - // Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes - private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Long.BYTES * 5 + Integer.BYTES; - private static final int SINGLE_THREAD_CONCURRENCY_HINT = -1; private static final int UNKNOWN_THREAD_PRIORITY = -1; private static final long UNKNOWN_TIMEOUT = -1L; @@ -1144,14 +1141,11 @@ public class RowBasedGrouperHelper static long estimateStringKeySize(@Nullable String key) { - long length = key == null ? 0 : key.length(); - return length * Character.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY; + return DictionaryBuilding.estimateEntryFootprint((key == null ? 0 : key.length()) * Character.BYTES); } private static class RowBasedKeySerde implements Grouper.KeySerde { - private static final int UNKNOWN_DICTIONARY_ID = -1; - private final boolean includeTimestamp; private final boolean sortByDimsFirst; private final List dimensions; @@ -1203,20 +1197,14 @@ public class RowBasedGrouperHelper this.valueTypes = valueTypes; this.limitSpec = limitSpec; this.enableRuntimeDictionaryGeneration = dictionary == null; - this.dictionary = enableRuntimeDictionaryGeneration ? new ArrayList<>() : dictionary; - this.reverseDictionary = enableRuntimeDictionaryGeneration ? - new Object2IntOpenHashMap<>() : - new Object2IntOpenHashMap<>(dictionary.size()); + this.dictionary = enableRuntimeDictionaryGeneration ? DictionaryBuilding.createDictionary() : dictionary; + this.reverseDictionary = DictionaryBuilding.createReverseDictionary(); - this.arrayDictionary = new ArrayList<>(); - this.reverseArrayDictionary = new Object2IntOpenHashMap<>(); + this.arrayDictionary = DictionaryBuilding.createDictionary(); + this.reverseArrayDictionary = DictionaryBuilding.createReverseDictionary(); - this.listDictionary = new ArrayList<>(); - this.reverseListDictionary = new Object2IntOpenHashMap<>(); - - this.reverseDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID); - this.reverseArrayDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID); - this.reverseListDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID); + this.listDictionary = DictionaryBuilding.createDictionary(); + this.reverseListDictionary = DictionaryBuilding.createReverseDictionary(); this.maxDictionarySize = maxDictionarySize; this.serdeHelpers = makeSerdeHelpers(limitSpec != null, enableRuntimeDictionaryGeneration); @@ -1534,7 +1522,7 @@ public class RowBasedGrouperHelper { final ComparableList comparableList = (ComparableList) key.getKey()[idx]; int id = reverseDictionary.getInt(comparableList); - if (id == UNKNOWN_DICTIONARY_ID) { + if (id == DimensionDictionary.ABSENT_VALUE_ID) { id = listDictionary.size(); reverseListDictionary.put(comparableList, id); listDictionary.add(comparableList); @@ -1610,7 +1598,7 @@ public class RowBasedGrouperHelper private int addToArrayDictionary(final ComparableStringArray s) { int idx = reverseArrayDictionary.getInt(s); - if (idx == UNKNOWN_DICTIONARY_ID) { + if (idx == DimensionDictionary.ABSENT_VALUE_ID) { idx = arrayDictionary.size(); reverseArrayDictionary.put(s, idx); arrayDictionary.add(s); @@ -1700,7 +1688,7 @@ public class RowBasedGrouperHelper private int addToDictionary(final String s) { int idx = reverseDictionary.getInt(s); - if (idx == UNKNOWN_DICTIONARY_ID) { + if (idx == DimensionDictionary.ABSENT_VALUE_ID) { final long additionalEstimatedSize = estimateStringKeySize(s); if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) { return -1; @@ -1732,7 +1720,7 @@ public class RowBasedGrouperHelper final String stringKey = (String) key.getKey()[idx]; final int dictIndex = reverseDictionary.getInt(stringKey); - if (dictIndex == UNKNOWN_DICTIONARY_ID) { + if (dictIndex == DimensionDictionary.ABSENT_VALUE_ID) { throw new ISE("Cannot find key[%s] from dictionary", stringKey); } keyBuffer.putInt(dictIndex); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategy.java index 46cfe0d9c09..6c56406c6f7 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategy.java @@ -33,7 +33,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategy extends ArrayNumericGroupB { public ArrayDoubleGroupByColumnSelectorStrategy() { - + super(Double.BYTES); } @VisibleForTesting @@ -42,11 +42,11 @@ public class ArrayDoubleGroupByColumnSelectorStrategy extends ArrayNumericGroupB Object2IntOpenHashMap> reverseDictionary ) { - super(dictionary, reverseDictionary); + super(dictionary, reverseDictionary, Double.BYTES); } @Override - public Object getOnlyValue(ColumnValueSelector selector) + protected int computeDictionaryId(ColumnValueSelector selector) { Object object = selector.getObject(); if (object == null) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategy.java index ff137c68264..23b4ceb17ff 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategy.java @@ -31,10 +31,9 @@ import java.util.stream.Collectors; public class ArrayLongGroupByColumnSelectorStrategy extends ArrayNumericGroupByColumnSelectorStrategy { - public ArrayLongGroupByColumnSelectorStrategy() { - + super(Long.BYTES); } @VisibleForTesting @@ -43,12 +42,11 @@ public class ArrayLongGroupByColumnSelectorStrategy extends ArrayNumericGroupByC Object2IntOpenHashMap> reverseDictionary ) { - super(dictionary, reverseDictionary); + super(dictionary, reverseDictionary, Long.BYTES); } - @Override - public Object getOnlyValue(ColumnValueSelector selector) + protected int computeDictionaryId(ColumnValueSelector selector) { Object object = selector.getObject(); if (object == null) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayNumericGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayNumericGroupByColumnSelectorStrategy.java index 55dc29feda7..dfaac786aaf 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayNumericGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayNumericGroupByColumnSelectorStrategy.java @@ -20,8 +20,10 @@ package org.apache.druid.query.groupby.epinephelinae.column; import com.google.common.annotations.VisibleForTesting; +import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding; import org.apache.druid.query.groupby.epinephelinae.Grouper; import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; @@ -30,31 +32,36 @@ import org.apache.druid.segment.data.ComparableList; import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; -public abstract class ArrayNumericGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy +public abstract class ArrayNumericGroupByColumnSelectorStrategy + implements GroupByColumnSelectorStrategy { protected static final int GROUP_BY_MISSING_VALUE = -1; protected final List> dictionary; - protected final Object2IntOpenHashMap> reverseDictionary; + protected final Object2IntMap> reverseDictionary; + protected long estimatedFootprint = 0L; - public ArrayNumericGroupByColumnSelectorStrategy() + private final int valueFootprint; + + public ArrayNumericGroupByColumnSelectorStrategy(final int valueFootprint) { - dictionary = new ArrayList<>(); - reverseDictionary = new Object2IntOpenHashMap<>(); - reverseDictionary.defaultReturnValue(-1); + this.dictionary = DictionaryBuilding.createDictionary(); + this.reverseDictionary = DictionaryBuilding.createReverseDictionary(); + this.valueFootprint = valueFootprint; } @VisibleForTesting ArrayNumericGroupByColumnSelectorStrategy( List> dictionary, - Object2IntOpenHashMap> reverseDictionary + Object2IntOpenHashMap> reverseDictionary, + int valueFootprint ) { this.dictionary = dictionary; this.reverseDictionary = reverseDictionary; + this.valueFootprint = valueFootprint; } @Override @@ -83,16 +90,17 @@ public abstract class ArrayNumericGroupByColumnSelectorStrategy t) + protected int addToIndexedDictionary(List t) { final int dictId = reverseDictionary.getInt(t); if (dictId < 0) { final int size = dictionary.size(); dictionary.add(t); reverseDictionary.put(t, size); + + // Footprint estimate: one pointer, one value per list entry. + estimatedFootprint += DictionaryBuilding.estimateEntryFootprint(t.size() * (Long.BYTES + valueFootprint)); return size; } return dictId; @@ -178,4 +192,18 @@ public abstract class ArrayNumericGroupByColumnSelectorStrategy 1 private final BiMap intListToInt; + private long estimatedFootprint = 0L; + @Override public int getGroupingKeySize() { @@ -98,20 +100,22 @@ public class ArrayStringGroupByColumnSelectorStrategy } @Override - public void initColumnValues( + public int initColumnValues( ColumnValueSelector selector, int columnIndex, Object[] valuess ) { - final int groupingKey = (int) getOnlyValue(selector); + final long priorFootprint = estimatedFootprint; + final int groupingKey = computeDictionaryId(selector); valuess[columnIndex] = groupingKey; + return (int) (estimatedFootprint - priorFootprint); } @Override public void initGroupingKeyColumnValue( int keyBufferPosition, - int columnIndex, + int dimensionIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack @@ -120,9 +124,9 @@ public class ArrayStringGroupByColumnSelectorStrategy final int groupingKey = (int) rowObj; writeToKeyBuffer(keyBufferPosition, groupingKey, keyBuffer); if (groupingKey == GROUP_BY_MISSING_VALUE) { - stack[columnIndex] = 0; + stack[dimensionIndex] = 0; } else { - stack[columnIndex] = 1; + stack[dimensionIndex] = 1; } } @@ -137,8 +141,11 @@ public class ArrayStringGroupByColumnSelectorStrategy return false; } - @Override - public Object getOnlyValue(ColumnValueSelector selector) + /** + * Compute dictionary ID for the given selector. Updates {@link #estimatedFootprint} as necessary. + */ + @VisibleForTesting + int computeDictionaryId(ColumnValueSelector selector) { final int[] intRepresentation; Object object = selector.getObject(); @@ -172,9 +179,15 @@ public class ArrayStringGroupByColumnSelectorStrategy final ComparableIntArray comparableIntArray = ComparableIntArray.of(intRepresentation); final int dictId = intListToInt.getOrDefault(comparableIntArray, GROUP_BY_MISSING_VALUE); if (dictId == GROUP_BY_MISSING_VALUE) { - final int dictionarySize = intListToInt.keySet().size(); - intListToInt.put(comparableIntArray, dictionarySize); - return dictionarySize; + final int nextId = intListToInt.keySet().size(); + intListToInt.put(comparableIntArray, nextId); + + // We're not using the dictionary and reverseDictionary from DictionaryBuilding, but the BiMap is close enough + // that we expect this footprint calculation to still be useful. (It doesn't have to be exact.) + estimatedFootprint += + DictionaryBuilding.estimateEntryFootprint(comparableIntArray.getDelegate().length * Integer.BYTES); + + return nextId; } else { return dictId; } @@ -184,18 +197,29 @@ public class ArrayStringGroupByColumnSelectorStrategy { final Integer dictId = dictionaryToInt.get(value); if (dictId == null) { - final int size = dictionaryToInt.size(); - dictionaryToInt.put(value, dictionaryToInt.size()); - return size; + final int nextId = dictionaryToInt.size(); + dictionaryToInt.put(value, nextId); + + // We're not using the dictionary and reverseDictionary from DictionaryBuilding, but the BiMap is close enough + // that we expect this footprint calculation to still be useful. (It doesn't have to be exact.) + estimatedFootprint += + DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES); + + return nextId; } else { return dictId; } } @Override - public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer) + public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer) { - keyBuffer.putInt(keyBufferPosition, (int) obj); + final long priorFootprint = estimatedFootprint; + + // computeDictionaryId updates estimatedFootprint + keyBuffer.putInt(keyBufferPosition, computeDictionaryId(selector)); + + return (int) (estimatedFootprint - priorFootprint); } @Override @@ -230,5 +254,18 @@ public class ArrayStringGroupByColumnSelectorStrategy } }; } -} + @Override + public void reset() + { + dictionaryToInt.clear(); + intListToInt.clear(); + estimatedFootprint = 0; + } + + @VisibleForTesting + void writeToKeyBuffer(int keyBufferPosition, int groupingKey, ByteBuffer keyBuffer) + { + keyBuffer.putInt(keyBufferPosition, groupingKey); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java index 025819cb6c0..dfc5149d52a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java @@ -20,20 +20,21 @@ package org.apache.druid.query.groupby.epinephelinae.column; import com.google.common.base.Preconditions; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding; import org.apache.druid.query.groupby.epinephelinae.Grouper; import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionDictionary; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.data.ArrayBasedIndexedInts; import org.apache.druid.segment.data.IndexedInts; import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; /** @@ -44,13 +45,8 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin { private static final int GROUP_BY_MISSING_VALUE = -1; - private int nextId = 0; - private final List dictionary = new ArrayList<>(); - private final Object2IntOpenHashMap reverseDictionary = new Object2IntOpenHashMap<>(); - - { - reverseDictionary.defaultReturnValue(-1); - } + private final List dictionary = DictionaryBuilding.createDictionary(); + private final Object2IntMap reverseDictionary = DictionaryBuilding.createReverseDictionary(); public DictionaryBuildingStringGroupByColumnSelectorStrategy() { @@ -77,10 +73,11 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin } @Override - public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) + public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) { final DimensionSelector dimSelector = (DimensionSelector) selector; final IndexedInts row = dimSelector.getRow(); + int stateFootprintIncrease = 0; ArrayBasedIndexedInts newRow = (ArrayBasedIndexedInts) valuess[columnIndex]; if (newRow == null) { newRow = new ArrayBasedIndexedInts(); @@ -92,19 +89,22 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin final String value = dimSelector.lookupName(row.get(i)); final int dictId = reverseDictionary.getInt(value); if (dictId < 0) { + final int nextId = dictionary.size(); dictionary.add(value); reverseDictionary.put(value, nextId); newRow.setValue(i, nextId); - nextId++; + stateFootprintIncrease += + DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES); } else { newRow.setValue(i, dictId); } } newRow.setSize(rowSize); + return stateFootprintIncrease; } @Override - public Object getOnlyValue(ColumnValueSelector selector) + public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer) { final DimensionSelector dimSelector = (DimensionSelector) selector; final IndexedInts row = dimSelector.getRow(); @@ -112,17 +112,21 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions"); if (row.size() == 0) { - return GROUP_BY_MISSING_VALUE; + writeToKeyBuffer(keyBufferPosition, GROUP_BY_MISSING_VALUE, keyBuffer); + return 0; } final String value = dimSelector.lookupName(row.get(0)); final int dictId = reverseDictionary.getInt(value); - if (dictId < 0) { + if (dictId == DimensionDictionary.ABSENT_VALUE_ID) { + final int nextId = dictionary.size(); dictionary.add(value); reverseDictionary.put(value, nextId); - return nextId++; + writeToKeyBuffer(keyBufferPosition, nextId, keyBuffer); + return DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES); } else { - return dictId; + writeToKeyBuffer(keyBufferPosition, dictId, keyBuffer); + return 0; } } @@ -138,4 +142,11 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin return realComparator.compare(lhsStr, rhsStr); }; } + + @Override + public void reset() + { + dictionary.clear(); + reverseDictionary.clear(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java index 86522b5b47b..6fcbf9ca9d5 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java @@ -50,34 +50,30 @@ public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelecto } @Override - public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values) + public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values) { values[columnIndex] = selector.getDouble(); + return 0; } @Override - public Object getOnlyValue(ColumnValueSelector selector) + public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer) { - return selector.getDouble(); - } - - @Override - public void writeToKeyBuffer(int keyBufferPosition, @Nullable Object obj, ByteBuffer keyBuffer) - { - keyBuffer.putDouble(keyBufferPosition, DimensionHandlerUtils.nullToZero((Double) obj)); + keyBuffer.putDouble(keyBufferPosition, selector.getDouble()); + return 0; } @Override public void initGroupingKeyColumnValue( int keyBufferPosition, - int columnIndex, + int dimensionIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack ) { - writeToKeyBuffer(keyBufferPosition, rowObj, keyBuffer); - stack[columnIndex] = 1; + writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Double) rowObj), keyBuffer); + stack[dimensionIndex] = 1; } @Override @@ -102,4 +98,15 @@ public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelecto stringComparator ); } + + @Override + public void reset() + { + // Nothing to do. + } + + private void writeToKeyBuffer(int keyBufferPosition, double value, ByteBuffer keyBuffer) + { + keyBuffer.putDouble(keyBufferPosition, value); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java index 49fc1fb11bd..a01c3c3bd1b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java @@ -51,21 +51,17 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector } @Override - public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) + public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) { valuess[columnIndex] = selector.getFloat(); + return 0; } @Override - public Object getOnlyValue(ColumnValueSelector selector) + public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer) { - return selector.getFloat(); - } - - @Override - public void writeToKeyBuffer(int keyBufferPosition, @Nullable Object obj, ByteBuffer keyBuffer) - { - keyBuffer.putFloat(keyBufferPosition, DimensionHandlerUtils.nullToZero((Float) obj)); + keyBuffer.putFloat(keyBufferPosition, selector.getFloat()); + return 0; } @Override @@ -84,14 +80,14 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector @Override public void initGroupingKeyColumnValue( int keyBufferPosition, - int columnIndex, + int dimensionIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack ) { - writeToKeyBuffer(keyBufferPosition, rowObj, keyBuffer); - stack[columnIndex] = 1; + writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Float) rowObj), keyBuffer); + stack[dimensionIndex] = 1; } @Override @@ -106,4 +102,15 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector // this method handles row values after the first in a multivalued row, so just return false return false; } + + @Override + public void reset() + { + // Nothing to do. + } + + private void writeToKeyBuffer(int keyBufferPosition, float value, ByteBuffer keyBuffer) + { + keyBuffer.putFloat(keyBufferPosition, value); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java index 58250090f48..1a2bfc580a3 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java @@ -33,6 +33,14 @@ import java.nio.ByteBuffer; * GroupByQueryEngineV2. * * Each GroupByColumnSelectorStrategy is associated with a single dimension. + * + * Strategies may have internal state, such as the dictionary maintained by + * {@link DictionaryBuildingStringGroupByColumnSelectorStrategy}. Callers should assume that the internal + * state footprint starts out empty (zero bytes) and is also reset to zero on each call to {@link #reset()}. Each call + * to {@link #initColumnValues} or {@link #writeToKeyBuffer(int, ColumnValueSelector, ByteBuffer)} returns the + * incremental increase in internal state footprint that happened as a result of that particular call. + * + * @see org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector the vectorized version */ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy { @@ -77,8 +85,11 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy * @param selector Value selector for a column. * @param columnIndex Index of the column within the row values array * @param valuess Row values array, one index per column + * + * @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if + * memory did not increase as a result of this operation. Will not be negative. */ - void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess); + int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess); /** * Read the first value within a row values object (e. g. {@link org.apache.druid.segment.data.IndexedInts}, as the value in @@ -88,14 +99,14 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy * If the size of the row is > 0, write 1 to stack[] at columnIndex, otherwise write 0. * * @param keyBufferPosition Starting offset for this column's value within the grouping key. - * @param columnIndex Index of the column within the row values array + * @param dimensionIndex Index of this dimension within the {@code stack} array * @param rowObj Row value object for this column * @param keyBuffer grouping key * @param stack array containing the current within-row value index for each column */ void initGroupingKeyColumnValue( int keyBufferPosition, - int columnIndex, + int dimensionIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack @@ -123,30 +134,35 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy ); /** - * Retrieve a single object using the {@link ColumnValueSelector}. The reading column must have a single value. - * - * @param selector Value selector for a column - * - * @return an object retrieved from the column - */ - Object getOnlyValue(ColumnValueSelector selector); - - /** - * Write a given object to the keyBuffer at keyBufferPosition. + * Write a single object from the given selector to the keyBuffer at keyBufferPosition. The reading column must + * have a single value. The position of the keyBuffer may be modified. * * @param keyBufferPosition starting offset for this column's value within the grouping key - * @param obj row value object retrieved from {@link #getOnlyValue(ColumnValueSelector)} + * @param selector selector to retrieve row value object from * @param keyBuffer grouping key + * + * @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if + * memory did not increase as a result of this operation. Will not be negative. */ - void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer); + int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer); /** * Return BufferComparator for values written using this strategy when limit is pushed down to segment scan. + * * @param keyBufferPosition starting offset for this column's value within the grouping key - * @param stringComparator stringComparator from LimitSpec for this column. If this is null, implementations - * will use the {@link org.apache.druid.query.ordering.StringComparators#LEXICOGRAPHIC} - * comparator. + * @param stringComparator stringComparator from LimitSpec for this column. If this is null, implementations + * will use the {@link org.apache.druid.query.ordering.StringComparators#LEXICOGRAPHIC} + * comparator. + * * @return BufferComparator for comparing values written */ Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator); + + /** + * Reset any internal state held by this selector. + * + * After this method is called, any row objects or key objects generated by any methods of this class must be + * considered unreadable. Calling {@link #processValueFromGroupingKey} on that memory has undefined behavior. + */ + void reset(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java index b70c5c22896..95d57e03da1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java @@ -51,21 +51,17 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS } @Override - public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) + public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) { valuess[columnIndex] = selector.getLong(); + return 0; } @Override - public Object getOnlyValue(ColumnValueSelector selector) + public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer) { - return selector.getLong(); - } - - @Override - public void writeToKeyBuffer(int keyBufferPosition, @Nullable Object obj, ByteBuffer keyBuffer) - { - keyBuffer.putLong(keyBufferPosition, DimensionHandlerUtils.nullToZero((Long) obj)); + keyBuffer.putLong(keyBufferPosition, selector.getLong()); + return 0; } @Override @@ -84,14 +80,14 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS @Override public void initGroupingKeyColumnValue( int keyBufferPosition, - int columnIndex, + int dimensionIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack ) { - writeToKeyBuffer(keyBufferPosition, rowObj, keyBuffer); - stack[columnIndex] = 1; + writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Long) rowObj), keyBuffer); + stack[dimensionIndex] = 1; } @Override @@ -106,4 +102,15 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS // this method handles row values after the first in a multivalued row, so just return false return false; } + + @Override + public void reset() + { + // Nothing to do. + } + + public void writeToKeyBuffer(int keyBufferPosition, long value, ByteBuffer keyBuffer) + { + keyBuffer.putLong(keyBufferPosition, value); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java index 663a0dd2b21..34af7621139 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java @@ -38,10 +38,13 @@ import java.nio.ByteBuffer; public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy { private final GroupByColumnSelectorStrategy delegate; + private final byte[] nullKeyBytes; public NullableNumericGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate) { this.delegate = delegate; + this.nullKeyBytes = new byte[delegate.getGroupingKeySize() + 1]; + this.nullKeyBytes[0] = NullHandling.IS_NULL_BYTE; } @Override @@ -66,34 +69,27 @@ public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColu } @Override - public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values) + public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values) { if (selector.isNull()) { values[columnIndex] = null; + return 0; } else { - delegate.initColumnValues(selector, columnIndex, values); + return delegate.initColumnValues(selector, columnIndex, values); } } @Override - @Nullable - public Object getOnlyValue(ColumnValueSelector selector) + public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer) { if (selector.isNull()) { - return null; - } - return delegate.getOnlyValue(selector); - } - - @Override - public void writeToKeyBuffer(int keyBufferPosition, @Nullable Object obj, ByteBuffer keyBuffer) - { - if (obj == null) { - keyBuffer.put(keyBufferPosition, NullHandling.IS_NULL_BYTE); + keyBuffer.position(keyBufferPosition); + keyBuffer.put(nullKeyBytes); + return 0; } else { keyBuffer.put(keyBufferPosition, NullHandling.IS_NOT_NULL_BYTE); + return delegate.writeToKeyBuffer(keyBufferPosition + Byte.BYTES, selector, keyBuffer); } - delegate.writeToKeyBuffer(keyBufferPosition + Byte.BYTES, obj, keyBuffer); } @Override @@ -111,14 +107,27 @@ public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColu @Override public void initGroupingKeyColumnValue( int keyBufferPosition, - int columnIndex, + int dimensionIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack ) { - writeToKeyBuffer(keyBufferPosition, rowObj, keyBuffer); - stack[columnIndex] = 1; + if (rowObj == null) { + keyBuffer.position(keyBufferPosition); + keyBuffer.put(nullKeyBytes); + } else { + keyBuffer.put(keyBufferPosition, NullHandling.IS_NOT_NULL_BYTE); + + // No need to update stack ourselves; we expect the delegate to do this. + delegate.initGroupingKeyColumnValue( + keyBufferPosition + Byte.BYTES, + dimensionIndex, + rowObj, + keyBuffer, + stack + ); + } } @Override @@ -133,4 +142,10 @@ public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColu // this method handles row values after the first in a multivalued row, so just return false return false; } + + @Override + public void reset() + { + delegate.reset(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java index a128d10eba3..8c25c775d71 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java @@ -76,32 +76,39 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto } @Override - public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) + public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) { DimensionSelector dimSelector = (DimensionSelector) selector; IndexedInts row = dimSelector.getRow(); valuess[columnIndex] = row; + return 0; + } + + /** + * Writes a dictionary ID to the grouping key. + * + * Protected so subclasses can access it, like {@link DictionaryBuildingStringGroupByColumnSelectorStrategy}. + */ + protected void writeToKeyBuffer(int keyBufferPosition, int dictId, ByteBuffer keyBuffer) + { + keyBuffer.putInt(keyBufferPosition, dictId); } @Override - public Object getOnlyValue(ColumnValueSelector selector) + public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer) { final DimensionSelector dimSelector = (DimensionSelector) selector; final IndexedInts row = dimSelector.getRow(); Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions"); - return row.size() == 1 ? row.get(0) : GROUP_BY_MISSING_VALUE; - } - - @Override - public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer) - { - keyBuffer.putInt(keyBufferPosition, (int) obj); + final int dictId = row.size() == 1 ? row.get(0) : GROUP_BY_MISSING_VALUE; + keyBuffer.putInt(keyBufferPosition, dictId); + return 0; } @Override public void initGroupingKeyColumnValue( int keyBufferPosition, - int columnIndex, + int dimensionIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack @@ -111,7 +118,7 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto int rowSize = row.size(); initializeGroupingKeyV2Dimension(row, rowSize, keyBuffer, keyBufferPosition); - stack[columnIndex] = rowSize == 0 ? 0 : 1; + stack[dimensionIndex] = rowSize == 0 ? 0 : 1; } @Override @@ -172,4 +179,10 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto }; } } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java index d83166f328f..d5db5c984c5 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java @@ -24,6 +24,7 @@ import org.apache.datasketches.memory.Memory; import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding; import org.apache.druid.segment.vector.VectorObjectSelector; import java.util.ArrayList; @@ -42,20 +43,15 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl private final VectorObjectSelector selector; - private int nextId = 0; private final List dictionary = new ArrayList<>(); private final Object2IntOpenHashMap reverseDictionary = new Object2IntOpenHashMap<>(); - { - reverseDictionary.defaultReturnValue(-1); - } - public DictionaryBuildingSingleValueStringGroupByVectorColumnSelector(VectorObjectSelector selector) { this.selector = selector; + this.reverseDictionary.defaultReturnValue(-1); } - @Override public int getGroupingKeySize() { @@ -63,7 +59,7 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl } @Override - public void writeKeys( + public int writeKeys( final WritableMemory keySpace, final int keySize, final int keyOffset, @@ -72,19 +68,26 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl ) { final Object[] vector = selector.getObjectVector(); + int stateFootprintIncrease = 0; for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) { final String value = (String) vector[i]; final int dictId = reverseDictionary.getInt(value); if (dictId < 0) { + final int nextId = dictionary.size(); dictionary.add(value); reverseDictionary.put(value, nextId); keySpace.putInt(j, nextId); - nextId++; + + // Use same ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY as the nonvectorized version; dictionary structure is the same. + stateFootprintIncrease += + DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES); } else { keySpace.putInt(j, dictId); } } + + return stateFootprintIncrease; } @Override @@ -104,4 +107,11 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl resultRow.set(resultRowPosition, NullHandling.defaultStringValue()); } } + + @Override + public void reset() + { + dictionary.clear(); + reverseDictionary.clear(); + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java index 437d2f82eb6..7830483f46d 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DoubleGroupByVectorColumnSelector.java @@ -40,7 +40,7 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel } @Override - public void writeKeys( + public int writeKeys( final WritableMemory keySpace, final int keySize, final int keyOffset, @@ -57,6 +57,8 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel keySpace.putDouble(j, vector[i]); } } + + return 0; } @Override @@ -69,4 +71,10 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel { resultRow.set(resultRowPosition, keyMemory.getDouble(keyOffset)); } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java index c404932746e..c0dc92b6b99 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/FloatGroupByVectorColumnSelector.java @@ -40,7 +40,7 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele } @Override - public void writeKeys( + public int writeKeys( final WritableMemory keySpace, final int keySize, final int keyOffset, @@ -57,6 +57,8 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele keySpace.putFloat(j, vector[i]); } } + + return 0; } @Override @@ -69,4 +71,10 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele { resultRow.set(resultRowPosition, keyMemory.getFloat(keyOffset)); } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java index 707980f1011..70beec0e364 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java @@ -26,7 +26,14 @@ import org.apache.druid.query.groupby.ResultRow; /** * Column processor for groupBy dimensions. * + * Processors may have internal state, such as the dictionary maintained by + * {@link DictionaryBuildingSingleValueStringGroupByVectorColumnSelector}. Callers should assume that the internal + * state footprint starts out empty (zero bytes) and is also reset to zero on each call to {@link #reset()}. Each call + * to {@link #writeKeys} returns the incremental increase in internal state footprint that happened as a result + * of that particular call. + * * @see GroupByVectorColumnProcessorFactory + * @see org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy the nonvectorized version */ public interface GroupByVectorColumnSelector { @@ -44,10 +51,13 @@ public interface GroupByVectorColumnSelector * @param keyOffset starting position for the first key part within keySpace * @param startRow starting row (inclusive) within the current vector * @param endRow ending row (exclusive) within the current vector + * + * @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if + * memory did not increase as a result of this operation. Will not be negative. */ // False positive unused inspection warning for "keySize": https://youtrack.jetbrains.com/issue/IDEA-231034 @SuppressWarnings("unused") - void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow); + int writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow); /** * Write key parts for this column into a particular result row. @@ -63,4 +73,12 @@ public interface GroupByVectorColumnSelector ResultRow resultRow, int resultRowPosition ); + + /** + * Reset any internal state held by this selector. + * + * After this method is called, any memory previously written by {@link #writeKeys} must be considered unreadable. + * Calling {@link #writeKeyToResultRow} on that memory has undefined behavior. + */ + void reset(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java index f806ef8481b..c8756967525 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/LongGroupByVectorColumnSelector.java @@ -40,7 +40,7 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec } @Override - public void writeKeys( + public int writeKeys( final WritableMemory keySpace, final int keySize, final int keyOffset, @@ -57,6 +57,8 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec keySpace.putLong(j, vector[i]); } } + + return 0; } @Override @@ -69,4 +71,10 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec { resultRow.set(resultRowPosition, keyMemory.getLong(keyOffset)); } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java index e70eaa796aa..26c33697fc1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java @@ -42,9 +42,10 @@ public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelect } @Override - public void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow) + public int writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow) { // Nothing to do. + return 0; } @Override @@ -52,4 +53,10 @@ public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelect { resultRow.set(resultRowPosition, null); } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableDoubleGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableDoubleGroupByVectorColumnSelector.java index 26a5844f99c..db9d470f3d3 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableDoubleGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableDoubleGroupByVectorColumnSelector.java @@ -41,7 +41,7 @@ public class NullableDoubleGroupByVectorColumnSelector implements GroupByVectorC } @Override - public void writeKeys( + public int writeKeys( final WritableMemory keySpace, final int keySize, final int keyOffset, @@ -63,6 +63,8 @@ public class NullableDoubleGroupByVectorColumnSelector implements GroupByVectorC keySpace.putDouble(j + 1, vector[i]); } } + + return 0; } @Override @@ -79,4 +81,10 @@ public class NullableDoubleGroupByVectorColumnSelector implements GroupByVectorC resultRow.set(resultRowPosition, keyMemory.getDouble(keyOffset + 1)); } } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableFloatGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableFloatGroupByVectorColumnSelector.java index a2a0c609474..238e4cf01f0 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableFloatGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableFloatGroupByVectorColumnSelector.java @@ -41,7 +41,7 @@ public class NullableFloatGroupByVectorColumnSelector implements GroupByVectorCo } @Override - public void writeKeys( + public int writeKeys( final WritableMemory keySpace, final int keySize, final int keyOffset, @@ -63,6 +63,8 @@ public class NullableFloatGroupByVectorColumnSelector implements GroupByVectorCo keySpace.putFloat(j + 1, vector[i]); } } + + return 0; } @Override @@ -79,4 +81,10 @@ public class NullableFloatGroupByVectorColumnSelector implements GroupByVectorCo resultRow.set(resultRowPosition, keyMemory.getFloat(keyOffset + 1)); } } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableLongGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableLongGroupByVectorColumnSelector.java index ab020e282f6..8ea0b7b8511 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableLongGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NullableLongGroupByVectorColumnSelector.java @@ -41,7 +41,7 @@ public class NullableLongGroupByVectorColumnSelector implements GroupByVectorCol } @Override - public void writeKeys( + public int writeKeys( final WritableMemory keySpace, final int keySize, final int keyOffset, @@ -63,6 +63,8 @@ public class NullableLongGroupByVectorColumnSelector implements GroupByVectorCol keySpace.putLong(j + 1, vector[i]); } } + + return 0; } @Override @@ -79,4 +81,10 @@ public class NullableLongGroupByVectorColumnSelector implements GroupByVectorCol resultRow.set(resultRowPosition, keyMemory.getLong(keyOffset + 1)); } } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java index 2dfdb0e25cf..f82f4db4132 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/SingleValueStringGroupByVectorColumnSelector.java @@ -40,7 +40,7 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect } @Override - public void writeKeys( + public int writeKeys( final WritableMemory keySpace, final int keySize, final int keyOffset, @@ -57,6 +57,8 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect keySpace.putInt(j, vector[i]); } } + + return 0; } @Override @@ -70,4 +72,10 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect final int id = keyMemory.getInt(keyOffset); resultRow.set(resultRowPosition, selector.lookupName(id)); } + + @Override + public void reset() + { + // Nothing to do. + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java index 1ead1d851cb..c8674258a0e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java @@ -244,8 +244,13 @@ public class VectorGroupByEngine @Nullable private Interval bucketInterval; + // -1 if the current vector was fully aggregated after a call to "initNewDelegate". Otherwise, the number of + // rows of the current vector that were aggregated. private int partiallyAggregatedRows = -1; + // Sum of internal state footprint across all "selectors". + private long selectorInternalFootprint = 0; + @Nullable private CloseableGrouperIterator delegate = null; @@ -304,6 +309,8 @@ public class VectorGroupByEngine if (delegate != null) { delegate.close(); vectorGrouper.reset(); + selectors.forEach(GroupByVectorColumnSelector::reset); + selectorInternalFootprint = 0; } delegate = initNewDelegate(); @@ -390,7 +397,11 @@ public class VectorGroupByEngine // Write keys to the keySpace. int keyOffset = 0; for (final GroupByVectorColumnSelector selector : selectors) { - selector.writeKeys(keySpace, keySize, keyOffset, startOffset, granulizer.getEndOffset()); + // Update selectorInternalFootprint now, but check it later. (We reset on the first vector that causes us + // to go past the limit.) + selectorInternalFootprint += + selector.writeKeys(keySpace, keySize, keyOffset, startOffset, granulizer.getEndOffset()); + keyOffset += selector.getGroupingKeySize(); } @@ -420,6 +431,8 @@ public class VectorGroupByEngine // Advance bucketInterval. bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null; break; + } else if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) { + break; } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java index fb0090f4212..0ce29983cc6 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryConfigTest.java @@ -39,8 +39,9 @@ public class GroupByQueryConfigTest .put("maxIntermediateRows", "2") .put("maxResults", "3") .put("maxOnDiskStorage", "4") - .put("maxMergingDictionarySize", "5") - .put("bufferGrouperMaxLoadFactor", "6") + .put("maxSelectorDictionarySize", "5") + .put("maxMergingDictionarySize", "6") + .put("bufferGrouperMaxLoadFactor", "7") .build(); @Test @@ -54,8 +55,9 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config.getMaxIntermediateRows()); Assert.assertEquals(3, config.getMaxResults()); Assert.assertEquals(4, config.getMaxOnDiskStorage()); - Assert.assertEquals(5, config.getMaxMergingDictionarySize()); - Assert.assertEquals(6.0, config.getBufferGrouperMaxLoadFactor(), 0.0); + Assert.assertEquals(5, config.getMaxSelectorDictionarySize()); + Assert.assertEquals(6, config.getMaxMergingDictionarySize()); + Assert.assertEquals(7.0, config.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertFalse(config.isApplyLimitPushDownToSegment()); } @@ -77,8 +79,9 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config2.getMaxIntermediateRows()); Assert.assertEquals(3, config2.getMaxResults()); Assert.assertEquals(4, config2.getMaxOnDiskStorage()); - Assert.assertEquals(5, config2.getMaxMergingDictionarySize()); - Assert.assertEquals(6.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); + Assert.assertEquals(5, config2.getMaxSelectorDictionarySize()); + Assert.assertEquals(6, config2.getMaxMergingDictionarySize()); + Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertFalse(config2.isApplyLimitPushDownToSegment()); } @@ -92,13 +95,14 @@ public class GroupByQueryConfigTest .setInterval(Intervals.of("2000/P1D")) .setGranularity(Granularities.ALL) .setContext( - ImmutableMap.of( - "groupByStrategy", "v1", - "maxOnDiskStorage", 0, - "maxResults", 2, - "maxMergingDictionarySize", 3, - "applyLimitPushDownToSegment", true - ) + ImmutableMap.builder() + .put("groupByStrategy", "v1") + .put("maxOnDiskStorage", 0) + .put("maxResults", 2) + .put("maxSelectorDictionarySize", 3) + .put("maxMergingDictionarySize", 4) + .put("applyLimitPushDownToSegment", true) + .build() ) .build() ); @@ -109,8 +113,9 @@ public class GroupByQueryConfigTest Assert.assertEquals(2, config2.getMaxIntermediateRows()); Assert.assertEquals(2, config2.getMaxResults()); Assert.assertEquals(0, config2.getMaxOnDiskStorage()); - Assert.assertEquals(3, config2.getMaxMergingDictionarySize()); - Assert.assertEquals(6.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); + Assert.assertEquals(3, config2.getMaxSelectorDictionarySize()); + Assert.assertEquals(4, config2.getMaxMergingDictionarySize()); + Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0); Assert.assertTrue(config2.isApplyLimitPushDownToSegment()); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 5f1deeeb593..ee1340dada3 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -297,6 +297,12 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest return GroupByStrategySelector.STRATEGY_V2; } + @Override + public long getMaxSelectorDictionarySize() + { + return 20; + } + @Override public long getMaxMergingDictionarySize() { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategyTest.java index 7b66225f004..4acdef07483 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategyTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayDoubleGroupByColumnSelectorStrategyTest.java @@ -115,7 +115,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(1.0, 2.0)); - Assert.assertEquals(0, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(0, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); @@ -123,7 +123,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest buffer1.putInt(0); strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0); - Assert.assertEquals(new ComparableList(ImmutableList.of(1.0, 2.0)), row.get(0)); + Assert.assertEquals(new ComparableList<>(ImmutableList.of(1.0, 2.0)), row.get(0)); } @@ -132,7 +132,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(4.0, 2.0)); - Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); @@ -140,7 +140,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest buffer1.putInt(3); strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0); - Assert.assertEquals(new ComparableList(ImmutableList.of(4.0, 2.0)), row.get(0)); + Assert.assertEquals(new ComparableList<>(ImmutableList.of(4.0, 2.0)), row.get(0)); } @Test @@ -148,14 +148,14 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{4.0D, 2.0D}); - Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); ResultRow row = ResultRow.create(1); buffer1.putInt(3); strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0); - Assert.assertEquals(new ComparableList(ImmutableList.of(4.0, 2.0)), row.get(0)); + Assert.assertEquals(new ComparableList<>(ImmutableList.of(4.0, 2.0)), row.get(0)); } @After diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategyTest.java index 51325b50031..74be5380c84 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategyTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayLongGroupByColumnSelectorStrategyTest.java @@ -118,7 +118,7 @@ public class ArrayLongGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(1L, 2L)); - Assert.assertEquals(0, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(0, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); @@ -126,7 +126,7 @@ public class ArrayLongGroupByColumnSelectorStrategyTest buffer1.putInt(0); strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0); - Assert.assertEquals(new ComparableList(ImmutableList.of(1L, 2L)), row.get(0)); + Assert.assertEquals(new ComparableList<>(ImmutableList.of(1L, 2L)), row.get(0)); } @@ -135,7 +135,7 @@ public class ArrayLongGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(4L, 2L)); - Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); @@ -143,7 +143,7 @@ public class ArrayLongGroupByColumnSelectorStrategyTest buffer1.putInt(3); strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0); - Assert.assertEquals(new ComparableList(ImmutableList.of(4L, 2L)), row.get(0)); + Assert.assertEquals(new ComparableList<>(ImmutableList.of(4L, 2L)), row.get(0)); } @Test @@ -151,14 +151,14 @@ public class ArrayLongGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{4L, 2L}); - Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); ResultRow row = ResultRow.create(1); buffer1.putInt(3); strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0); - Assert.assertEquals(new ComparableList(ImmutableList.of(4L, 2L)), row.get(0)); + Assert.assertEquals(new ComparableList<>(ImmutableList.of(4L, 2L)), row.get(0)); } @After diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategyTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategyTest.java index a1d70b54fc3..3be1354412d 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategyTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/column/ArrayStringGroupByColumnSelectorStrategyTest.java @@ -127,7 +127,7 @@ public class ArrayStringGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of("a", "b")); - Assert.assertEquals(0, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(0, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); @@ -144,7 +144,7 @@ public class ArrayStringGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of("f", "a")); - Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); @@ -160,7 +160,7 @@ public class ArrayStringGroupByColumnSelectorStrategyTest { ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class); Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{"f", "a"}); - Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector)); + Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector)); GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class); Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0); diff --git a/website/.spelling b/website/.spelling index e3bc69e21f4..25b64b407dd 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1554,6 +1554,7 @@ D1 D2 D3 druid.query.groupBy.defaultStrategy +druid.query.groupBy.maxSelectorDictionarySize druid.query.groupBy.maxMergingDictionarySize druid.query.groupBy.maxOnDiskStorage druid.query.groupBy.maxResults. @@ -1563,6 +1564,7 @@ maxResults orderby orderbys outputName +pre-existing pushdown row1 subtotalsSpec