GroupBy: Cap dictionary-building selector memory usage. (#12309)

* GroupBy: Cap dictionary-building selector memory usage.

New context parameter "maxSelectorDictionarySize" controls when the
per-segment processing code should return early and trigger a trip
to the merge buffer.

Includes:

- Vectorized and nonvectorized implementations.
- Adjustments to GroupByQueryRunnerTest to exercise this code in
  the v2SmallDictionary suite. (Both the selector dictionary and
  the merging dictionary will be small in that suite.)
- Tests for the new config parameter.

* Fix issues from tests.

* Add "pre-existing" to dictionary.

* Simplify GroupByColumnSelectorStrategy interface by removing one of the writeToKeyBuffer methods.

* Adjustments from review comments.
This commit is contained in:
Gian Merlino 2022-03-08 13:13:11 -08:00 committed by GitHub
parent baea3ec614
commit 875e0696e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 601 additions and 227 deletions

View File

@ -2047,13 +2047,15 @@ Supported runtime properties:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
Supported query contexts:
|Key|Description|
|---|-----------|
|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|

View File

@ -275,13 +275,18 @@ as the index, so the aggregated values in the array can be accessed directly wit
### Memory tuning and resource limits
When using groupBy v2, three parameters control resource usage and limits:
When using groupBy v2, four parameters control resource usage and limits:
- `druid.processing.buffer.sizeBytes`: size of the off-heap hash table used for aggregation, per query, in bytes. At
most `druid.processing.numMergeBuffers` of these will be created at once, which also serves as an upper limit on the
number of concurrently running groupBy queries.
- `druid.query.groupBy.maxMergingDictionarySize`: size of the on-heap dictionary used when grouping on strings, per query,
in bytes. Note that this is based on a rough estimate of the dictionary size, not the actual size.
- `druid.query.groupBy.maxSelectorDictionarySize`: size of the on-heap segment-level dictionary used when grouping on
string or array-valued expressions that do not have pre-existing dictionaries. There is at most one dictionary per
processing thread; therefore there are up to `druid.processing.numThreads` of these. Note that the size is based on a
rough estimate of the dictionary footprint.
- `druid.query.groupBy.maxMergingDictionarySize`: size of the on-heap query-level dictionary used when grouping on
any string expression. There is at most one dictionary per concurrently-running query; therefore there are up to
`druid.server.http.numThreads` of these. Note that the size is based on a rough estimate of the dictionary footprint.
- `druid.query.groupBy.maxOnDiskStorage`: amount of space on disk used for aggregation, per query, in bytes. By default,
this is 0, which means aggregation will not use disk.
@ -381,13 +386,15 @@ Supported runtime properties:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [Memory tuning and resource limits](#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
Supported query contexts:
|Key|Description|
|---|-----------|
|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|

View File

@ -24,6 +24,7 @@ import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
/**
*
*/
public class GroupByQueryConfig
{
@ -42,6 +43,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_BUFFER_GROUPER_MAX_LOAD_FACTOR = "bufferGrouperMaxLoadFactor";
private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
private static final String CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE = "maxSelectorDictionarySize";
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
@ -69,6 +71,11 @@ public class GroupByQueryConfig
@JsonProperty
private int bufferGrouperInitialBuckets = 0;
@JsonProperty
// Size of on-heap string dictionary for merging, per-processing-thread; when exceeded, partial results will be
// emitted to the merge buffer early.
private long maxSelectorDictionarySize = 100_000_000L;
@JsonProperty
// Size of on-heap string dictionary for merging, per-query; when exceeded, partial results will be spilled to disk
private long maxMergingDictionarySize = 100_000_000L;
@ -151,6 +158,11 @@ public class GroupByQueryConfig
return bufferGrouperInitialBuckets;
}
public long getMaxSelectorDictionarySize()
{
return maxSelectorDictionarySize;
}
public long getMaxMergingDictionarySize()
{
return maxMergingDictionarySize;
@ -230,6 +242,13 @@ public class GroupByQueryConfig
((Number) query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(),
getMaxOnDiskStorage()
);
newConfig.maxSelectorDictionarySize = Math.min(
((Number) query.getContextValue(
CTX_KEY_MAX_SELECTOR_DICTIONARY_SIZE,
getMaxSelectorDictionarySize()
)).longValue(),
getMaxSelectorDictionarySize()
);
newConfig.maxMergingDictionarySize = Math.min(
((Number) query.getContextValue(
CTX_KEY_MAX_MERGING_DICTIONARY_SIZE,
@ -243,7 +262,10 @@ public class GroupByQueryConfig
isApplyLimitPushDownToSegment()
);
newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation());
newConfig.forcePushDownNestedQuery = query.getContextBoolean(CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, isForcePushDownNestedQuery());
newConfig.forcePushDownNestedQuery = query.getContextBoolean(
CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY,
isForcePushDownNestedQuery()
);
newConfig.intermediateCombineDegree = query.getContextValue(
CTX_KEY_INTERMEDIATE_COMBINE_DEGREE,
getIntermediateCombineDegree()

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.segment.DimensionDictionary;
import java.util.ArrayList;
import java.util.List;
/**
* Utilities for parts of the groupBy engine that need to build dictionaries.
*/
public class DictionaryBuilding
{
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Long.BYTES * 5 + Integer.BYTES;
/**
* Creates a forward dictionary (dictionary ID -> value).
*/
public static <T> List<T> createDictionary()
{
return new ArrayList<>();
}
/**
* Creates a reverse dictionary (value -> dictionary ID). If a value is not present in the reverse dictionary,
* {@link Object2IntMap#getInt} will return {@link DimensionDictionary#ABSENT_VALUE_ID}.
*/
public static <T> Object2IntMap<T> createReverseDictionary()
{
final Object2IntOpenHashMap<T> m = new Object2IntOpenHashMap<>();
m.defaultReturnValue(DimensionDictionary.ABSENT_VALUE_ID);
return m;
}
/**
* Estimated footprint of a new entry.
*/
public static int estimateEntryFootprint(final int valueFootprint)
{
return valueFootprint + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
}
}

View File

@ -623,7 +623,10 @@ public class GroupByQueryEngineV2
private final ByteBuffer keyBuffer;
private int stackPointer = Integer.MIN_VALUE;
protected boolean currentRowWasPartiallyAggregated = false;
private boolean currentRowWasPartiallyAggregated = false;
// Sum of internal state footprint across all "dims".
private long selectorInternalFootprint = 0;
public HashAggregateIterator(
GroupByQuery query,
@ -717,12 +720,19 @@ public class GroupByQueryEngineV2
@Override
protected void aggregateSingleValueDims(Grouper<ByteBuffer> grouper)
{
if (!currentRowWasPartiallyAggregated) {
for (GroupByColumnSelectorPlus dim : dims) {
dim.getColumnSelectorStrategy().reset();
}
selectorInternalFootprint = 0;
}
while (!cursor.isDone()) {
for (GroupByColumnSelectorPlus dim : dims) {
final GroupByColumnSelectorStrategy strategy = dim.getColumnSelectorStrategy();
strategy.writeToKeyBuffer(
selectorInternalFootprint += strategy.writeToKeyBuffer(
dim.getKeyBufferPosition(),
strategy.getOnlyValue(dim.getSelector()),
dim.getSelector(),
keyBuffer
);
}
@ -731,13 +741,27 @@ public class GroupByQueryEngineV2
if (!grouper.aggregate(keyBuffer).isOk()) {
return;
}
cursor.advance();
// Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes
// us to go past the limit.)
if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) {
return;
}
}
}
@Override
protected void aggregateMultiValueDims(Grouper<ByteBuffer> grouper)
{
if (!currentRowWasPartiallyAggregated) {
for (GroupByColumnSelectorPlus dim : dims) {
dim.getColumnSelectorStrategy().reset();
}
selectorInternalFootprint = 0;
}
while (!cursor.isDone()) {
if (!currentRowWasPartiallyAggregated) {
// Set up stack, valuess, and first grouping in keyBuffer for this row
@ -745,7 +769,7 @@ public class GroupByQueryEngineV2
for (int i = 0; i < dims.length; i++) {
GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy();
strategy.initColumnValues(
selectorInternalFootprint += strategy.initColumnValues(
dims[i].getSelector(),
i,
valuess
@ -808,6 +832,12 @@ public class GroupByQueryEngineV2
// Advance to next row
cursor.advance();
currentRowWasPartiallyAggregated = false;
// Check selectorInternalFootprint after advancing the cursor. (We reset after the first row that causes
// us to go past the limit.)
if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) {
return;
}
}
}
@ -882,6 +912,9 @@ public class GroupByQueryEngineV2
private void aggregateSingleValueDims(IntGrouper grouper)
{
// No need to track strategy internal state footprint, because array-based grouping does not use strategies.
// It accesses dimension selectors directly and only works on truly dictionary-coded columns.
while (!cursor.isDone()) {
final int key;
if (dim != null) {
@ -900,6 +933,9 @@ public class GroupByQueryEngineV2
private void aggregateMultiValueDims(IntGrouper grouper)
{
// No need to track strategy internal state footprint, because array-based grouping does not use strategies.
// It accesses dimension selectors directly and only works on truly dictionary-coded columns.
if (dim == null) {
throw new ISE("dim must exist");
}

View File

@ -29,7 +29,6 @@ import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.SettableSupplier;
@ -63,6 +62,7 @@ import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionary;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.RowAdapter;
@ -101,9 +101,6 @@ import java.util.stream.IntStream;
*/
public class RowBasedGrouperHelper
{
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Long.BYTES * 5 + Integer.BYTES;
private static final int SINGLE_THREAD_CONCURRENCY_HINT = -1;
private static final int UNKNOWN_THREAD_PRIORITY = -1;
private static final long UNKNOWN_TIMEOUT = -1L;
@ -1144,14 +1141,11 @@ public class RowBasedGrouperHelper
static long estimateStringKeySize(@Nullable String key)
{
long length = key == null ? 0 : key.length();
return length * Character.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
return DictionaryBuilding.estimateEntryFootprint((key == null ? 0 : key.length()) * Character.BYTES);
}
private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedGrouperHelper.RowBasedKey>
{
private static final int UNKNOWN_DICTIONARY_ID = -1;
private final boolean includeTimestamp;
private final boolean sortByDimsFirst;
private final List<DimensionSpec> dimensions;
@ -1203,20 +1197,14 @@ public class RowBasedGrouperHelper
this.valueTypes = valueTypes;
this.limitSpec = limitSpec;
this.enableRuntimeDictionaryGeneration = dictionary == null;
this.dictionary = enableRuntimeDictionaryGeneration ? new ArrayList<>() : dictionary;
this.reverseDictionary = enableRuntimeDictionaryGeneration ?
new Object2IntOpenHashMap<>() :
new Object2IntOpenHashMap<>(dictionary.size());
this.dictionary = enableRuntimeDictionaryGeneration ? DictionaryBuilding.createDictionary() : dictionary;
this.reverseDictionary = DictionaryBuilding.createReverseDictionary();
this.arrayDictionary = new ArrayList<>();
this.reverseArrayDictionary = new Object2IntOpenHashMap<>();
this.arrayDictionary = DictionaryBuilding.createDictionary();
this.reverseArrayDictionary = DictionaryBuilding.createReverseDictionary();
this.listDictionary = new ArrayList<>();
this.reverseListDictionary = new Object2IntOpenHashMap<>();
this.reverseDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID);
this.reverseArrayDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID);
this.reverseListDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_ID);
this.listDictionary = DictionaryBuilding.createDictionary();
this.reverseListDictionary = DictionaryBuilding.createReverseDictionary();
this.maxDictionarySize = maxDictionarySize;
this.serdeHelpers = makeSerdeHelpers(limitSpec != null, enableRuntimeDictionaryGeneration);
@ -1534,7 +1522,7 @@ public class RowBasedGrouperHelper
{
final ComparableList comparableList = (ComparableList) key.getKey()[idx];
int id = reverseDictionary.getInt(comparableList);
if (id == UNKNOWN_DICTIONARY_ID) {
if (id == DimensionDictionary.ABSENT_VALUE_ID) {
id = listDictionary.size();
reverseListDictionary.put(comparableList, id);
listDictionary.add(comparableList);
@ -1610,7 +1598,7 @@ public class RowBasedGrouperHelper
private int addToArrayDictionary(final ComparableStringArray s)
{
int idx = reverseArrayDictionary.getInt(s);
if (idx == UNKNOWN_DICTIONARY_ID) {
if (idx == DimensionDictionary.ABSENT_VALUE_ID) {
idx = arrayDictionary.size();
reverseArrayDictionary.put(s, idx);
arrayDictionary.add(s);
@ -1700,7 +1688,7 @@ public class RowBasedGrouperHelper
private int addToDictionary(final String s)
{
int idx = reverseDictionary.getInt(s);
if (idx == UNKNOWN_DICTIONARY_ID) {
if (idx == DimensionDictionary.ABSENT_VALUE_ID) {
final long additionalEstimatedSize = estimateStringKeySize(s);
if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) {
return -1;
@ -1732,7 +1720,7 @@ public class RowBasedGrouperHelper
final String stringKey = (String) key.getKey()[idx];
final int dictIndex = reverseDictionary.getInt(stringKey);
if (dictIndex == UNKNOWN_DICTIONARY_ID) {
if (dictIndex == DimensionDictionary.ABSENT_VALUE_ID) {
throw new ISE("Cannot find key[%s] from dictionary", stringKey);
}
keyBuffer.putInt(dictIndex);

View File

@ -33,7 +33,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategy extends ArrayNumericGroupB
{
public ArrayDoubleGroupByColumnSelectorStrategy()
{
super(Double.BYTES);
}
@VisibleForTesting
@ -42,11 +42,11 @@ public class ArrayDoubleGroupByColumnSelectorStrategy extends ArrayNumericGroupB
Object2IntOpenHashMap<List<Double>> reverseDictionary
)
{
super(dictionary, reverseDictionary);
super(dictionary, reverseDictionary, Double.BYTES);
}
@Override
public Object getOnlyValue(ColumnValueSelector selector)
protected int computeDictionaryId(ColumnValueSelector selector)
{
Object object = selector.getObject();
if (object == null) {

View File

@ -31,10 +31,9 @@ import java.util.stream.Collectors;
public class ArrayLongGroupByColumnSelectorStrategy extends ArrayNumericGroupByColumnSelectorStrategy<Long>
{
public ArrayLongGroupByColumnSelectorStrategy()
{
super(Long.BYTES);
}
@VisibleForTesting
@ -43,12 +42,11 @@ public class ArrayLongGroupByColumnSelectorStrategy extends ArrayNumericGroupByC
Object2IntOpenHashMap<List<Long>> reverseDictionary
)
{
super(dictionary, reverseDictionary);
super(dictionary, reverseDictionary, Long.BYTES);
}
@Override
public Object getOnlyValue(ColumnValueSelector selector)
protected int computeDictionaryId(ColumnValueSelector selector)
{
Object object = selector.getObject();
if (object == null) {

View File

@ -20,8 +20,10 @@
package org.apache.druid.query.groupby.epinephelinae.column;
import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
@ -30,31 +32,36 @@ import org.apache.druid.segment.data.ComparableList;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public abstract class ArrayNumericGroupByColumnSelectorStrategy<T extends Comparable> implements GroupByColumnSelectorStrategy
public abstract class ArrayNumericGroupByColumnSelectorStrategy<T extends Comparable>
implements GroupByColumnSelectorStrategy
{
protected static final int GROUP_BY_MISSING_VALUE = -1;
protected final List<List<T>> dictionary;
protected final Object2IntOpenHashMap<List<T>> reverseDictionary;
protected final Object2IntMap<List<T>> reverseDictionary;
protected long estimatedFootprint = 0L;
public ArrayNumericGroupByColumnSelectorStrategy()
private final int valueFootprint;
public ArrayNumericGroupByColumnSelectorStrategy(final int valueFootprint)
{
dictionary = new ArrayList<>();
reverseDictionary = new Object2IntOpenHashMap<>();
reverseDictionary.defaultReturnValue(-1);
this.dictionary = DictionaryBuilding.createDictionary();
this.reverseDictionary = DictionaryBuilding.createReverseDictionary();
this.valueFootprint = valueFootprint;
}
@VisibleForTesting
ArrayNumericGroupByColumnSelectorStrategy(
List<List<T>> dictionary,
Object2IntOpenHashMap<List<T>> reverseDictionary
Object2IntOpenHashMap<List<T>> reverseDictionary,
int valueFootprint
)
{
this.dictionary = dictionary;
this.reverseDictionary = reverseDictionary;
this.valueFootprint = valueFootprint;
}
@Override
@ -83,16 +90,17 @@ public abstract class ArrayNumericGroupByColumnSelectorStrategy<T extends Compar
}
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
{
final int groupingKey = (int) getOnlyValue(selector);
valuess[columnIndex] = groupingKey;
final long priorFootprint = estimatedFootprint;
valuess[columnIndex] = computeDictionaryId(selector);
return (int) (estimatedFootprint - priorFootprint);
}
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,
int columnIndex,
int dimensionIndex,
Object rowObj,
ByteBuffer keyBuffer,
int[] stack
@ -101,9 +109,9 @@ public abstract class ArrayNumericGroupByColumnSelectorStrategy<T extends Compar
final int groupingKey = (int) rowObj;
writeToKeyBuffer(keyBufferPosition, groupingKey, keyBuffer);
if (groupingKey == GROUP_BY_MISSING_VALUE) {
stack[columnIndex] = 0;
stack[dimensionIndex] = 0;
} else {
stack[columnIndex] = 1;
stack[dimensionIndex] = 1;
}
}
@ -119,23 +127,29 @@ public abstract class ArrayNumericGroupByColumnSelectorStrategy<T extends Compar
return false;
}
@Override
public abstract Object getOnlyValue(ColumnValueSelector selector);
protected abstract int computeDictionaryId(ColumnValueSelector selector);
@Override
public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer)
public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
{
keyBuffer.putInt(keyBufferPosition, (int) obj);
final long priorFootprint = estimatedFootprint;
// computeDictionaryId updates estimatedFootprint
keyBuffer.putInt(keyBufferPosition, computeDictionaryId(selector));
return (int) (estimatedFootprint - priorFootprint);
}
int addToIndexedDictionary(List<T> t)
protected int addToIndexedDictionary(List<T> t)
{
final int dictId = reverseDictionary.getInt(t);
if (dictId < 0) {
final int size = dictionary.size();
dictionary.add(t);
reverseDictionary.put(t, size);
// Footprint estimate: one pointer, one value per list entry.
estimatedFootprint += DictionaryBuilding.estimateEntryFootprint(t.size() * (Long.BYTES + valueFootprint));
return size;
}
return dictId;
@ -178,4 +192,18 @@ public abstract class ArrayNumericGroupByColumnSelectorStrategy<T extends Compar
}
};
}
@Override
public void reset()
{
dictionary.clear();
reverseDictionary.clear();
estimatedFootprint = 0;
}
@VisibleForTesting
void writeToKeyBuffer(int keyBufferPosition, int groupingKey, ByteBuffer keyBuffer)
{
keyBuffer.putInt(keyBufferPosition, groupingKey);
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
@ -35,8 +36,7 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
public class ArrayStringGroupByColumnSelectorStrategy
implements GroupByColumnSelectorStrategy
public class ArrayStringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
{
private static final int GROUP_BY_MISSING_VALUE = -1;
@ -50,6 +50,8 @@ public class ArrayStringGroupByColumnSelectorStrategy
// [1,2,3] <-> 1
private final BiMap<ComparableIntArray, Integer> intListToInt;
private long estimatedFootprint = 0L;
@Override
public int getGroupingKeySize()
{
@ -98,20 +100,22 @@ public class ArrayStringGroupByColumnSelectorStrategy
}
@Override
public void initColumnValues(
public int initColumnValues(
ColumnValueSelector selector,
int columnIndex,
Object[] valuess
)
{
final int groupingKey = (int) getOnlyValue(selector);
final long priorFootprint = estimatedFootprint;
final int groupingKey = computeDictionaryId(selector);
valuess[columnIndex] = groupingKey;
return (int) (estimatedFootprint - priorFootprint);
}
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,
int columnIndex,
int dimensionIndex,
Object rowObj,
ByteBuffer keyBuffer,
int[] stack
@ -120,9 +124,9 @@ public class ArrayStringGroupByColumnSelectorStrategy
final int groupingKey = (int) rowObj;
writeToKeyBuffer(keyBufferPosition, groupingKey, keyBuffer);
if (groupingKey == GROUP_BY_MISSING_VALUE) {
stack[columnIndex] = 0;
stack[dimensionIndex] = 0;
} else {
stack[columnIndex] = 1;
stack[dimensionIndex] = 1;
}
}
@ -137,8 +141,11 @@ public class ArrayStringGroupByColumnSelectorStrategy
return false;
}
@Override
public Object getOnlyValue(ColumnValueSelector selector)
/**
* Compute dictionary ID for the given selector. Updates {@link #estimatedFootprint} as necessary.
*/
@VisibleForTesting
int computeDictionaryId(ColumnValueSelector selector)
{
final int[] intRepresentation;
Object object = selector.getObject();
@ -172,9 +179,15 @@ public class ArrayStringGroupByColumnSelectorStrategy
final ComparableIntArray comparableIntArray = ComparableIntArray.of(intRepresentation);
final int dictId = intListToInt.getOrDefault(comparableIntArray, GROUP_BY_MISSING_VALUE);
if (dictId == GROUP_BY_MISSING_VALUE) {
final int dictionarySize = intListToInt.keySet().size();
intListToInt.put(comparableIntArray, dictionarySize);
return dictionarySize;
final int nextId = intListToInt.keySet().size();
intListToInt.put(comparableIntArray, nextId);
// We're not using the dictionary and reverseDictionary from DictionaryBuilding, but the BiMap is close enough
// that we expect this footprint calculation to still be useful. (It doesn't have to be exact.)
estimatedFootprint +=
DictionaryBuilding.estimateEntryFootprint(comparableIntArray.getDelegate().length * Integer.BYTES);
return nextId;
} else {
return dictId;
}
@ -184,18 +197,29 @@ public class ArrayStringGroupByColumnSelectorStrategy
{
final Integer dictId = dictionaryToInt.get(value);
if (dictId == null) {
final int size = dictionaryToInt.size();
dictionaryToInt.put(value, dictionaryToInt.size());
return size;
final int nextId = dictionaryToInt.size();
dictionaryToInt.put(value, nextId);
// We're not using the dictionary and reverseDictionary from DictionaryBuilding, but the BiMap is close enough
// that we expect this footprint calculation to still be useful. (It doesn't have to be exact.)
estimatedFootprint +=
DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
return nextId;
} else {
return dictId;
}
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer)
public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
{
keyBuffer.putInt(keyBufferPosition, (int) obj);
final long priorFootprint = estimatedFootprint;
// computeDictionaryId updates estimatedFootprint
keyBuffer.putInt(keyBufferPosition, computeDictionaryId(selector));
return (int) (estimatedFootprint - priorFootprint);
}
@Override
@ -230,5 +254,18 @@ public class ArrayStringGroupByColumnSelectorStrategy
}
};
}
}
@Override
public void reset()
{
dictionaryToInt.clear();
intListToInt.clear();
estimatedFootprint = 0;
}
@VisibleForTesting
void writeToKeyBuffer(int keyBufferPosition, int groupingKey, ByteBuffer keyBuffer)
{
keyBuffer.putInt(keyBufferPosition, groupingKey);
}
}

View File

@ -20,20 +20,21 @@
package org.apache.druid.query.groupby.epinephelinae.column;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
import org.apache.druid.query.groupby.epinephelinae.Grouper;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionary;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.ArrayBasedIndexedInts;
import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
@ -44,13 +45,8 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
{
private static final int GROUP_BY_MISSING_VALUE = -1;
private int nextId = 0;
private final List<String> dictionary = new ArrayList<>();
private final Object2IntOpenHashMap<String> reverseDictionary = new Object2IntOpenHashMap<>();
{
reverseDictionary.defaultReturnValue(-1);
}
private final List<String> dictionary = DictionaryBuilding.createDictionary();
private final Object2IntMap<String> reverseDictionary = DictionaryBuilding.createReverseDictionary();
public DictionaryBuildingStringGroupByColumnSelectorStrategy()
{
@ -77,10 +73,11 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
}
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
{
final DimensionSelector dimSelector = (DimensionSelector) selector;
final IndexedInts row = dimSelector.getRow();
int stateFootprintIncrease = 0;
ArrayBasedIndexedInts newRow = (ArrayBasedIndexedInts) valuess[columnIndex];
if (newRow == null) {
newRow = new ArrayBasedIndexedInts();
@ -92,19 +89,22 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
final String value = dimSelector.lookupName(row.get(i));
final int dictId = reverseDictionary.getInt(value);
if (dictId < 0) {
final int nextId = dictionary.size();
dictionary.add(value);
reverseDictionary.put(value, nextId);
newRow.setValue(i, nextId);
nextId++;
stateFootprintIncrease +=
DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
} else {
newRow.setValue(i, dictId);
}
}
newRow.setSize(rowSize);
return stateFootprintIncrease;
}
@Override
public Object getOnlyValue(ColumnValueSelector selector)
public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
{
final DimensionSelector dimSelector = (DimensionSelector) selector;
final IndexedInts row = dimSelector.getRow();
@ -112,17 +112,21 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions");
if (row.size() == 0) {
return GROUP_BY_MISSING_VALUE;
writeToKeyBuffer(keyBufferPosition, GROUP_BY_MISSING_VALUE, keyBuffer);
return 0;
}
final String value = dimSelector.lookupName(row.get(0));
final int dictId = reverseDictionary.getInt(value);
if (dictId < 0) {
if (dictId == DimensionDictionary.ABSENT_VALUE_ID) {
final int nextId = dictionary.size();
dictionary.add(value);
reverseDictionary.put(value, nextId);
return nextId++;
writeToKeyBuffer(keyBufferPosition, nextId, keyBuffer);
return DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
} else {
return dictId;
writeToKeyBuffer(keyBufferPosition, dictId, keyBuffer);
return 0;
}
}
@ -138,4 +142,11 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
return realComparator.compare(lhsStr, rhsStr);
};
}
@Override
public void reset()
{
dictionary.clear();
reverseDictionary.clear();
}
}

View File

@ -50,34 +50,30 @@ public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelecto
}
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values)
public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values)
{
values[columnIndex] = selector.getDouble();
return 0;
}
@Override
public Object getOnlyValue(ColumnValueSelector selector)
public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
{
return selector.getDouble();
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, @Nullable Object obj, ByteBuffer keyBuffer)
{
keyBuffer.putDouble(keyBufferPosition, DimensionHandlerUtils.nullToZero((Double) obj));
keyBuffer.putDouble(keyBufferPosition, selector.getDouble());
return 0;
}
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,
int columnIndex,
int dimensionIndex,
Object rowObj,
ByteBuffer keyBuffer,
int[] stack
)
{
writeToKeyBuffer(keyBufferPosition, rowObj, keyBuffer);
stack[columnIndex] = 1;
writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Double) rowObj), keyBuffer);
stack[dimensionIndex] = 1;
}
@Override
@ -102,4 +98,15 @@ public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelecto
stringComparator
);
}
@Override
public void reset()
{
// Nothing to do.
}
private void writeToKeyBuffer(int keyBufferPosition, double value, ByteBuffer keyBuffer)
{
keyBuffer.putDouble(keyBufferPosition, value);
}
}

View File

@ -51,21 +51,17 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector
}
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
{
valuess[columnIndex] = selector.getFloat();
return 0;
}
@Override
public Object getOnlyValue(ColumnValueSelector selector)
public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
{
return selector.getFloat();
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, @Nullable Object obj, ByteBuffer keyBuffer)
{
keyBuffer.putFloat(keyBufferPosition, DimensionHandlerUtils.nullToZero((Float) obj));
keyBuffer.putFloat(keyBufferPosition, selector.getFloat());
return 0;
}
@Override
@ -84,14 +80,14 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,
int columnIndex,
int dimensionIndex,
Object rowObj,
ByteBuffer keyBuffer,
int[] stack
)
{
writeToKeyBuffer(keyBufferPosition, rowObj, keyBuffer);
stack[columnIndex] = 1;
writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Float) rowObj), keyBuffer);
stack[dimensionIndex] = 1;
}
@Override
@ -106,4 +102,15 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector
// this method handles row values after the first in a multivalued row, so just return false
return false;
}
@Override
public void reset()
{
// Nothing to do.
}
private void writeToKeyBuffer(int keyBufferPosition, float value, ByteBuffer keyBuffer)
{
keyBuffer.putFloat(keyBufferPosition, value);
}
}

View File

@ -33,6 +33,14 @@ import java.nio.ByteBuffer;
* GroupByQueryEngineV2.
*
* Each GroupByColumnSelectorStrategy is associated with a single dimension.
*
* Strategies may have internal state, such as the dictionary maintained by
* {@link DictionaryBuildingStringGroupByColumnSelectorStrategy}. Callers should assume that the internal
* state footprint starts out empty (zero bytes) and is also reset to zero on each call to {@link #reset()}. Each call
* to {@link #initColumnValues} or {@link #writeToKeyBuffer(int, ColumnValueSelector, ByteBuffer)} returns the
* incremental increase in internal state footprint that happened as a result of that particular call.
*
* @see org.apache.druid.query.groupby.epinephelinae.vector.GroupByVectorColumnSelector the vectorized version
*/
public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
{
@ -77,8 +85,11 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
* @param selector Value selector for a column.
* @param columnIndex Index of the column within the row values array
* @param valuess Row values array, one index per column
*
* @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if
* memory did not increase as a result of this operation. Will not be negative.
*/
void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess);
int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess);
/**
* Read the first value within a row values object (e. g. {@link org.apache.druid.segment.data.IndexedInts}, as the value in
@ -88,14 +99,14 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
* If the size of the row is > 0, write 1 to stack[] at columnIndex, otherwise write 0.
*
* @param keyBufferPosition Starting offset for this column's value within the grouping key.
* @param columnIndex Index of the column within the row values array
* @param dimensionIndex Index of this dimension within the {@code stack} array
* @param rowObj Row value object for this column
* @param keyBuffer grouping key
* @param stack array containing the current within-row value index for each column
*/
void initGroupingKeyColumnValue(
int keyBufferPosition,
int columnIndex,
int dimensionIndex,
Object rowObj,
ByteBuffer keyBuffer,
int[] stack
@ -123,30 +134,35 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
);
/**
* Retrieve a single object using the {@link ColumnValueSelector}. The reading column must have a single value.
*
* @param selector Value selector for a column
*
* @return an object retrieved from the column
*/
Object getOnlyValue(ColumnValueSelector selector);
/**
* Write a given object to the keyBuffer at keyBufferPosition.
* Write a single object from the given selector to the keyBuffer at keyBufferPosition. The reading column must
* have a single value. The position of the keyBuffer may be modified.
*
* @param keyBufferPosition starting offset for this column's value within the grouping key
* @param obj row value object retrieved from {@link #getOnlyValue(ColumnValueSelector)}
* @param selector selector to retrieve row value object from
* @param keyBuffer grouping key
*
* @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if
* memory did not increase as a result of this operation. Will not be negative.
*/
void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer);
int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer);
/**
* Return BufferComparator for values written using this strategy when limit is pushed down to segment scan.
*
* @param keyBufferPosition starting offset for this column's value within the grouping key
* @param stringComparator stringComparator from LimitSpec for this column. If this is null, implementations
* will use the {@link org.apache.druid.query.ordering.StringComparators#LEXICOGRAPHIC}
* comparator.
* @param stringComparator stringComparator from LimitSpec for this column. If this is null, implementations
* will use the {@link org.apache.druid.query.ordering.StringComparators#LEXICOGRAPHIC}
* comparator.
*
* @return BufferComparator for comparing values written
*/
Grouper.BufferComparator bufferComparator(int keyBufferPosition, @Nullable StringComparator stringComparator);
/**
* Reset any internal state held by this selector.
*
* After this method is called, any row objects or key objects generated by any methods of this class must be
* considered unreadable. Calling {@link #processValueFromGroupingKey} on that memory has undefined behavior.
*/
void reset();
}

View File

@ -51,21 +51,17 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS
}
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
{
valuess[columnIndex] = selector.getLong();
return 0;
}
@Override
public Object getOnlyValue(ColumnValueSelector selector)
public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
{
return selector.getLong();
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, @Nullable Object obj, ByteBuffer keyBuffer)
{
keyBuffer.putLong(keyBufferPosition, DimensionHandlerUtils.nullToZero((Long) obj));
keyBuffer.putLong(keyBufferPosition, selector.getLong());
return 0;
}
@Override
@ -84,14 +80,14 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,
int columnIndex,
int dimensionIndex,
Object rowObj,
ByteBuffer keyBuffer,
int[] stack
)
{
writeToKeyBuffer(keyBufferPosition, rowObj, keyBuffer);
stack[columnIndex] = 1;
writeToKeyBuffer(keyBufferPosition, DimensionHandlerUtils.nullToZero((Long) rowObj), keyBuffer);
stack[dimensionIndex] = 1;
}
@Override
@ -106,4 +102,15 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS
// this method handles row values after the first in a multivalued row, so just return false
return false;
}
@Override
public void reset()
{
// Nothing to do.
}
public void writeToKeyBuffer(int keyBufferPosition, long value, ByteBuffer keyBuffer)
{
keyBuffer.putLong(keyBufferPosition, value);
}
}

View File

@ -38,10 +38,13 @@ import java.nio.ByteBuffer;
public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
{
private final GroupByColumnSelectorStrategy delegate;
private final byte[] nullKeyBytes;
public NullableNumericGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate)
{
this.delegate = delegate;
this.nullKeyBytes = new byte[delegate.getGroupingKeySize() + 1];
this.nullKeyBytes[0] = NullHandling.IS_NULL_BYTE;
}
@Override
@ -66,34 +69,27 @@ public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColu
}
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values)
public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values)
{
if (selector.isNull()) {
values[columnIndex] = null;
return 0;
} else {
delegate.initColumnValues(selector, columnIndex, values);
return delegate.initColumnValues(selector, columnIndex, values);
}
}
@Override
@Nullable
public Object getOnlyValue(ColumnValueSelector selector)
public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
{
if (selector.isNull()) {
return null;
}
return delegate.getOnlyValue(selector);
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, @Nullable Object obj, ByteBuffer keyBuffer)
{
if (obj == null) {
keyBuffer.put(keyBufferPosition, NullHandling.IS_NULL_BYTE);
keyBuffer.position(keyBufferPosition);
keyBuffer.put(nullKeyBytes);
return 0;
} else {
keyBuffer.put(keyBufferPosition, NullHandling.IS_NOT_NULL_BYTE);
return delegate.writeToKeyBuffer(keyBufferPosition + Byte.BYTES, selector, keyBuffer);
}
delegate.writeToKeyBuffer(keyBufferPosition + Byte.BYTES, obj, keyBuffer);
}
@Override
@ -111,14 +107,27 @@ public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColu
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,
int columnIndex,
int dimensionIndex,
Object rowObj,
ByteBuffer keyBuffer,
int[] stack
)
{
writeToKeyBuffer(keyBufferPosition, rowObj, keyBuffer);
stack[columnIndex] = 1;
if (rowObj == null) {
keyBuffer.position(keyBufferPosition);
keyBuffer.put(nullKeyBytes);
} else {
keyBuffer.put(keyBufferPosition, NullHandling.IS_NOT_NULL_BYTE);
// No need to update stack ourselves; we expect the delegate to do this.
delegate.initGroupingKeyColumnValue(
keyBufferPosition + Byte.BYTES,
dimensionIndex,
rowObj,
keyBuffer,
stack
);
}
}
@Override
@ -133,4 +142,10 @@ public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColu
// this method handles row values after the first in a multivalued row, so just return false
return false;
}
@Override
public void reset()
{
delegate.reset();
}
}

View File

@ -76,32 +76,39 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto
}
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
public int initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
{
DimensionSelector dimSelector = (DimensionSelector) selector;
IndexedInts row = dimSelector.getRow();
valuess[columnIndex] = row;
return 0;
}
/**
* Writes a dictionary ID to the grouping key.
*
* Protected so subclasses can access it, like {@link DictionaryBuildingStringGroupByColumnSelectorStrategy}.
*/
protected void writeToKeyBuffer(int keyBufferPosition, int dictId, ByteBuffer keyBuffer)
{
keyBuffer.putInt(keyBufferPosition, dictId);
}
@Override
public Object getOnlyValue(ColumnValueSelector selector)
public int writeToKeyBuffer(int keyBufferPosition, ColumnValueSelector selector, ByteBuffer keyBuffer)
{
final DimensionSelector dimSelector = (DimensionSelector) selector;
final IndexedInts row = dimSelector.getRow();
Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions");
return row.size() == 1 ? row.get(0) : GROUP_BY_MISSING_VALUE;
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer)
{
keyBuffer.putInt(keyBufferPosition, (int) obj);
final int dictId = row.size() == 1 ? row.get(0) : GROUP_BY_MISSING_VALUE;
keyBuffer.putInt(keyBufferPosition, dictId);
return 0;
}
@Override
public void initGroupingKeyColumnValue(
int keyBufferPosition,
int columnIndex,
int dimensionIndex,
Object rowObj,
ByteBuffer keyBuffer,
int[] stack
@ -111,7 +118,7 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto
int rowSize = row.size();
initializeGroupingKeyV2Dimension(row, rowSize, keyBuffer, keyBufferPosition);
stack[columnIndex] = rowSize == 0 ? 0 : 1;
stack[dimensionIndex] = rowSize == 0 ? 0 : 1;
}
@Override
@ -172,4 +179,10 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto
};
}
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -24,6 +24,7 @@ import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.DictionaryBuilding;
import org.apache.druid.segment.vector.VectorObjectSelector;
import java.util.ArrayList;
@ -42,20 +43,15 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl
private final VectorObjectSelector selector;
private int nextId = 0;
private final List<String> dictionary = new ArrayList<>();
private final Object2IntOpenHashMap<String> reverseDictionary = new Object2IntOpenHashMap<>();
{
reverseDictionary.defaultReturnValue(-1);
}
public DictionaryBuildingSingleValueStringGroupByVectorColumnSelector(VectorObjectSelector selector)
{
this.selector = selector;
this.reverseDictionary.defaultReturnValue(-1);
}
@Override
public int getGroupingKeySize()
{
@ -63,7 +59,7 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl
}
@Override
public void writeKeys(
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
@ -72,19 +68,26 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl
)
{
final Object[] vector = selector.getObjectVector();
int stateFootprintIncrease = 0;
for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
final String value = (String) vector[i];
final int dictId = reverseDictionary.getInt(value);
if (dictId < 0) {
final int nextId = dictionary.size();
dictionary.add(value);
reverseDictionary.put(value, nextId);
keySpace.putInt(j, nextId);
nextId++;
// Use same ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY as the nonvectorized version; dictionary structure is the same.
stateFootprintIncrease +=
DictionaryBuilding.estimateEntryFootprint((value == null ? 0 : value.length()) * Character.BYTES);
} else {
keySpace.putInt(j, dictId);
}
}
return stateFootprintIncrease;
}
@Override
@ -104,4 +107,11 @@ public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector impl
resultRow.set(resultRowPosition, NullHandling.defaultStringValue());
}
}
@Override
public void reset()
{
dictionary.clear();
reverseDictionary.clear();
}
}

View File

@ -40,7 +40,7 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel
}
@Override
public void writeKeys(
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
@ -57,6 +57,8 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel
keySpace.putDouble(j, vector[i]);
}
}
return 0;
}
@Override
@ -69,4 +71,10 @@ public class DoubleGroupByVectorColumnSelector implements GroupByVectorColumnSel
{
resultRow.set(resultRowPosition, keyMemory.getDouble(keyOffset));
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -40,7 +40,7 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele
}
@Override
public void writeKeys(
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
@ -57,6 +57,8 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele
keySpace.putFloat(j, vector[i]);
}
}
return 0;
}
@Override
@ -69,4 +71,10 @@ public class FloatGroupByVectorColumnSelector implements GroupByVectorColumnSele
{
resultRow.set(resultRowPosition, keyMemory.getFloat(keyOffset));
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -26,7 +26,14 @@ import org.apache.druid.query.groupby.ResultRow;
/**
* Column processor for groupBy dimensions.
*
* Processors may have internal state, such as the dictionary maintained by
* {@link DictionaryBuildingSingleValueStringGroupByVectorColumnSelector}. Callers should assume that the internal
* state footprint starts out empty (zero bytes) and is also reset to zero on each call to {@link #reset()}. Each call
* to {@link #writeKeys} returns the incremental increase in internal state footprint that happened as a result
* of that particular call.
*
* @see GroupByVectorColumnProcessorFactory
* @see org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy the nonvectorized version
*/
public interface GroupByVectorColumnSelector
{
@ -44,10 +51,13 @@ public interface GroupByVectorColumnSelector
* @param keyOffset starting position for the first key part within keySpace
* @param startRow starting row (inclusive) within the current vector
* @param endRow ending row (exclusive) within the current vector
*
* @return estimated increase in internal state footprint, in bytes, as a result of this operation. May be zero if
* memory did not increase as a result of this operation. Will not be negative.
*/
// False positive unused inspection warning for "keySize": https://youtrack.jetbrains.com/issue/IDEA-231034
@SuppressWarnings("unused")
void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow);
int writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow);
/**
* Write key parts for this column into a particular result row.
@ -63,4 +73,12 @@ public interface GroupByVectorColumnSelector
ResultRow resultRow,
int resultRowPosition
);
/**
* Reset any internal state held by this selector.
*
* After this method is called, any memory previously written by {@link #writeKeys} must be considered unreadable.
* Calling {@link #writeKeyToResultRow} on that memory has undefined behavior.
*/
void reset();
}

View File

@ -40,7 +40,7 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec
}
@Override
public void writeKeys(
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
@ -57,6 +57,8 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec
keySpace.putLong(j, vector[i]);
}
}
return 0;
}
@Override
@ -69,4 +71,10 @@ public class LongGroupByVectorColumnSelector implements GroupByVectorColumnSelec
{
resultRow.set(resultRowPosition, keyMemory.getLong(keyOffset));
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -42,9 +42,10 @@ public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelect
}
@Override
public void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow)
public int writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow)
{
// Nothing to do.
return 0;
}
@Override
@ -52,4 +53,10 @@ public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelect
{
resultRow.set(resultRowPosition, null);
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -41,7 +41,7 @@ public class NullableDoubleGroupByVectorColumnSelector implements GroupByVectorC
}
@Override
public void writeKeys(
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
@ -63,6 +63,8 @@ public class NullableDoubleGroupByVectorColumnSelector implements GroupByVectorC
keySpace.putDouble(j + 1, vector[i]);
}
}
return 0;
}
@Override
@ -79,4 +81,10 @@ public class NullableDoubleGroupByVectorColumnSelector implements GroupByVectorC
resultRow.set(resultRowPosition, keyMemory.getDouble(keyOffset + 1));
}
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -41,7 +41,7 @@ public class NullableFloatGroupByVectorColumnSelector implements GroupByVectorCo
}
@Override
public void writeKeys(
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
@ -63,6 +63,8 @@ public class NullableFloatGroupByVectorColumnSelector implements GroupByVectorCo
keySpace.putFloat(j + 1, vector[i]);
}
}
return 0;
}
@Override
@ -79,4 +81,10 @@ public class NullableFloatGroupByVectorColumnSelector implements GroupByVectorCo
resultRow.set(resultRowPosition, keyMemory.getFloat(keyOffset + 1));
}
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -41,7 +41,7 @@ public class NullableLongGroupByVectorColumnSelector implements GroupByVectorCol
}
@Override
public void writeKeys(
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
@ -63,6 +63,8 @@ public class NullableLongGroupByVectorColumnSelector implements GroupByVectorCol
keySpace.putLong(j + 1, vector[i]);
}
}
return 0;
}
@Override
@ -79,4 +81,10 @@ public class NullableLongGroupByVectorColumnSelector implements GroupByVectorCol
resultRow.set(resultRowPosition, keyMemory.getLong(keyOffset + 1));
}
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -40,7 +40,7 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect
}
@Override
public void writeKeys(
public int writeKeys(
final WritableMemory keySpace,
final int keySize,
final int keyOffset,
@ -57,6 +57,8 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect
keySpace.putInt(j, vector[i]);
}
}
return 0;
}
@Override
@ -70,4 +72,10 @@ public class SingleValueStringGroupByVectorColumnSelector implements GroupByVect
final int id = keyMemory.getInt(keyOffset);
resultRow.set(resultRowPosition, selector.lookupName(id));
}
@Override
public void reset()
{
// Nothing to do.
}
}

View File

@ -244,8 +244,13 @@ public class VectorGroupByEngine
@Nullable
private Interval bucketInterval;
// -1 if the current vector was fully aggregated after a call to "initNewDelegate". Otherwise, the number of
// rows of the current vector that were aggregated.
private int partiallyAggregatedRows = -1;
// Sum of internal state footprint across all "selectors".
private long selectorInternalFootprint = 0;
@Nullable
private CloseableGrouperIterator<Memory, ResultRow> delegate = null;
@ -304,6 +309,8 @@ public class VectorGroupByEngine
if (delegate != null) {
delegate.close();
vectorGrouper.reset();
selectors.forEach(GroupByVectorColumnSelector::reset);
selectorInternalFootprint = 0;
}
delegate = initNewDelegate();
@ -390,7 +397,11 @@ public class VectorGroupByEngine
// Write keys to the keySpace.
int keyOffset = 0;
for (final GroupByVectorColumnSelector selector : selectors) {
selector.writeKeys(keySpace, keySize, keyOffset, startOffset, granulizer.getEndOffset());
// Update selectorInternalFootprint now, but check it later. (We reset on the first vector that causes us
// to go past the limit.)
selectorInternalFootprint +=
selector.writeKeys(keySpace, keySize, keyOffset, startOffset, granulizer.getEndOffset());
keyOffset += selector.getGroupingKeySize();
}
@ -420,6 +431,8 @@ public class VectorGroupByEngine
// Advance bucketInterval.
bucketInterval = bucketIterator.hasNext() ? bucketIterator.next() : null;
break;
} else if (selectorInternalFootprint > querySpecificConfig.getMaxSelectorDictionarySize()) {
break;
}
}

View File

@ -39,8 +39,9 @@ public class GroupByQueryConfigTest
.put("maxIntermediateRows", "2")
.put("maxResults", "3")
.put("maxOnDiskStorage", "4")
.put("maxMergingDictionarySize", "5")
.put("bufferGrouperMaxLoadFactor", "6")
.put("maxSelectorDictionarySize", "5")
.put("maxMergingDictionarySize", "6")
.put("bufferGrouperMaxLoadFactor", "7")
.build();
@Test
@ -54,8 +55,9 @@ public class GroupByQueryConfigTest
Assert.assertEquals(2, config.getMaxIntermediateRows());
Assert.assertEquals(3, config.getMaxResults());
Assert.assertEquals(4, config.getMaxOnDiskStorage());
Assert.assertEquals(5, config.getMaxMergingDictionarySize());
Assert.assertEquals(6.0, config.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertEquals(5, config.getMaxSelectorDictionarySize());
Assert.assertEquals(6, config.getMaxMergingDictionarySize());
Assert.assertEquals(7.0, config.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertFalse(config.isApplyLimitPushDownToSegment());
}
@ -77,8 +79,9 @@ public class GroupByQueryConfigTest
Assert.assertEquals(2, config2.getMaxIntermediateRows());
Assert.assertEquals(3, config2.getMaxResults());
Assert.assertEquals(4, config2.getMaxOnDiskStorage());
Assert.assertEquals(5, config2.getMaxMergingDictionarySize());
Assert.assertEquals(6.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertEquals(5, config2.getMaxSelectorDictionarySize());
Assert.assertEquals(6, config2.getMaxMergingDictionarySize());
Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertFalse(config2.isApplyLimitPushDownToSegment());
}
@ -92,13 +95,14 @@ public class GroupByQueryConfigTest
.setInterval(Intervals.of("2000/P1D"))
.setGranularity(Granularities.ALL)
.setContext(
ImmutableMap.of(
"groupByStrategy", "v1",
"maxOnDiskStorage", 0,
"maxResults", 2,
"maxMergingDictionarySize", 3,
"applyLimitPushDownToSegment", true
)
ImmutableMap.<String, Object>builder()
.put("groupByStrategy", "v1")
.put("maxOnDiskStorage", 0)
.put("maxResults", 2)
.put("maxSelectorDictionarySize", 3)
.put("maxMergingDictionarySize", 4)
.put("applyLimitPushDownToSegment", true)
.build()
)
.build()
);
@ -109,8 +113,9 @@ public class GroupByQueryConfigTest
Assert.assertEquals(2, config2.getMaxIntermediateRows());
Assert.assertEquals(2, config2.getMaxResults());
Assert.assertEquals(0, config2.getMaxOnDiskStorage());
Assert.assertEquals(3, config2.getMaxMergingDictionarySize());
Assert.assertEquals(6.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertEquals(3, config2.getMaxSelectorDictionarySize());
Assert.assertEquals(4, config2.getMaxMergingDictionarySize());
Assert.assertEquals(7.0, config2.getBufferGrouperMaxLoadFactor(), 0.0);
Assert.assertTrue(config2.isApplyLimitPushDownToSegment());
}
}

View File

@ -297,6 +297,12 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
return GroupByStrategySelector.STRATEGY_V2;
}
@Override
public long getMaxSelectorDictionarySize()
{
return 20;
}
@Override
public long getMaxMergingDictionarySize()
{

View File

@ -115,7 +115,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(1.0, 2.0));
Assert.assertEquals(0, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(0, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
@ -123,7 +123,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest
buffer1.putInt(0);
strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
Assert.assertEquals(new ComparableList(ImmutableList.of(1.0, 2.0)), row.get(0));
Assert.assertEquals(new ComparableList<>(ImmutableList.of(1.0, 2.0)), row.get(0));
}
@ -132,7 +132,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(4.0, 2.0));
Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
@ -140,7 +140,7 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest
buffer1.putInt(3);
strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
Assert.assertEquals(new ComparableList(ImmutableList.of(4.0, 2.0)), row.get(0));
Assert.assertEquals(new ComparableList<>(ImmutableList.of(4.0, 2.0)), row.get(0));
}
@Test
@ -148,14 +148,14 @@ public class ArrayDoubleGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{4.0D, 2.0D});
Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
ResultRow row = ResultRow.create(1);
buffer1.putInt(3);
strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
Assert.assertEquals(new ComparableList(ImmutableList.of(4.0, 2.0)), row.get(0));
Assert.assertEquals(new ComparableList<>(ImmutableList.of(4.0, 2.0)), row.get(0));
}
@After

View File

@ -118,7 +118,7 @@ public class ArrayLongGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(1L, 2L));
Assert.assertEquals(0, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(0, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
@ -126,7 +126,7 @@ public class ArrayLongGroupByColumnSelectorStrategyTest
buffer1.putInt(0);
strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
Assert.assertEquals(new ComparableList(ImmutableList.of(1L, 2L)), row.get(0));
Assert.assertEquals(new ComparableList<>(ImmutableList.of(1L, 2L)), row.get(0));
}
@ -135,7 +135,7 @@ public class ArrayLongGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of(4L, 2L));
Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
@ -143,7 +143,7 @@ public class ArrayLongGroupByColumnSelectorStrategyTest
buffer1.putInt(3);
strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
Assert.assertEquals(new ComparableList(ImmutableList.of(4L, 2L)), row.get(0));
Assert.assertEquals(new ComparableList<>(ImmutableList.of(4L, 2L)), row.get(0));
}
@Test
@ -151,14 +151,14 @@ public class ArrayLongGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{4L, 2L});
Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
ResultRow row = ResultRow.create(1);
buffer1.putInt(3);
strategy.processValueFromGroupingKey(groupByColumnSelectorPlus, buffer1, row, 0);
Assert.assertEquals(new ComparableList(ImmutableList.of(4L, 2L)), row.get(0));
Assert.assertEquals(new ComparableList<>(ImmutableList.of(4L, 2L)), row.get(0));
}
@After

View File

@ -127,7 +127,7 @@ public class ArrayStringGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of("a", "b"));
Assert.assertEquals(0, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(0, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
@ -144,7 +144,7 @@ public class ArrayStringGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(ImmutableList.of("f", "a"));
Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);
@ -160,7 +160,7 @@ public class ArrayStringGroupByColumnSelectorStrategyTest
{
ColumnValueSelector columnValueSelector = Mockito.mock(ColumnValueSelector.class);
Mockito.when(columnValueSelector.getObject()).thenReturn(new Object[]{"f", "a"});
Assert.assertEquals(3, strategy.getOnlyValue(columnValueSelector));
Assert.assertEquals(3, strategy.computeDictionaryId(columnValueSelector));
GroupByColumnSelectorPlus groupByColumnSelectorPlus = Mockito.mock(GroupByColumnSelectorPlus.class);
Mockito.when(groupByColumnSelectorPlus.getResultRowPosition()).thenReturn(0);

View File

@ -1554,6 +1554,7 @@ D1
D2
D3
druid.query.groupBy.defaultStrategy
druid.query.groupBy.maxSelectorDictionarySize
druid.query.groupBy.maxMergingDictionarySize
druid.query.groupBy.maxOnDiskStorage
druid.query.groupBy.maxResults.
@ -1563,6 +1564,7 @@ maxResults
orderby
orderbys
outputName
pre-existing
pushdown
row1
subtotalsSpec