Use a single dictionary while combining

This commit is contained in:
Jihoon Son 2017-08-25 20:23:59 +09:00
parent 30c0038853
commit c79959f51c
11 changed files with 634 additions and 395 deletions

View File

@ -116,10 +116,10 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 30)
public class GroupByBenchmark
{
@Param({"4"})
@Param({"8"})
private int numSegments;
@Param({"2", "4"})
@Param({"8"})
private int numProcessingThreads;
@Param({"-1"})
@ -128,13 +128,13 @@ public class GroupByBenchmark
@Param({"100000"})
private int rowsPerSegment;
@Param({"basic.A", "basic.nested"})
@Param({"basic.A"})
private String schemaAndQuery;
@Param({"v1", "v2"})
@Param({"v2"})
private String defaultStrategy;
@Param({"all", "day"})
@Param({"all"})
private String queryGranularity;
private static final Logger log = new Logger(GroupByBenchmark.class);

View File

@ -19,16 +19,17 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
public class CloseableGrouperIterator<KeyType, T> implements Iterator<T>, Closeable
{
private final Function<Grouper.Entry<KeyType>, T> transformer;
private final Function<Entry<KeyType>, T> transformer;
private final Closeable closer;
private final Iterator<Grouper.Entry<KeyType>> iterator;

View File

@ -23,18 +23,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.collections.ResourceHolder;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.QueryInterruptedException;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKeySerde;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
@ -51,8 +52,10 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -72,7 +75,6 @@ import java.util.stream.Collectors;
*/
public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
{
private static final Logger LOG = new Logger(ConcurrentGrouper.class);
private static final int MINIMUM_COMBINE_DEGREE = 2;
private final List<SpillingGrouper<KeyType>> groupers;
@ -92,6 +94,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
private final ObjectMapper spillMapper;
private final int concurrencyHint;
private final KeySerdeFactory<KeyType> keySerdeFactory;
private final KeySerdeFactory<KeyType> mergingKeySerdeFactory;
private final DefaultLimitSpec limitSpec;
private final boolean sortHasNonGroupingFields;
private final Comparator<Grouper.Entry<KeyType>> keyObjComparator;
@ -107,6 +110,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
final Supplier<ByteBuffer> bufferSupplier,
final Supplier<ResourceHolder<ByteBuffer>> combineBufferSupplier,
final KeySerdeFactory<KeyType> keySerdeFactory,
final KeySerdeFactory<KeyType> mergingKeySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
final int bufferGrouperMaxSize,
@ -147,6 +151,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
this.spillMapper = spillMapper;
this.concurrencyHint = concurrencyHint;
this.keySerdeFactory = keySerdeFactory;
this.mergingKeySerdeFactory = mergingKeySerdeFactory;
this.limitSpec = limitSpec;
this.sortHasNonGroupingFields = sortHasNonGroupingFields;
this.keyObjComparator = keySerdeFactory.objectComparator(sortHasNonGroupingFields);
@ -267,10 +272,12 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
// Parallel combine is used only when data are spilled. This is because ConcurrentGrouper uses two different modes
// depending on data are spilled or not. If data is not spilled, all inputs are completely aggregated and no more
// aggregation is required.
return parallelCombine(sortedIterators);
} else {
return Groupers.mergeIterators(sortedIterators, sorted ? keyObjComparator : null);
final List<String> mergedDictionary = mergeDictionary();
if (mergedDictionary != null) {
return parallelCombine(sortedIterators, mergedDictionary);
}
}
return Groupers.mergeIterators(sortedIterators, sorted ? keyObjComparator : null);
}
private boolean isParallelizable()
@ -321,6 +328,31 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
.collect(Collectors.toList());
}
/**
* Merge dictionaries of {@link Grouper.KeySerde}s of {@link Grouper}s. The result dictionary contains unique string
* string keys.
*
* @return merged dictionary if its size does not exceed max dictionary size. Otherwise null.
*/
@Nullable
private List<String> mergeDictionary()
{
final long maxDictionarySize = mergingKeySerdeFactory.getMaxDictionarySize();
final Set<String> mergedDictionary = new HashSet<>();
long totalDictionarySize = 0L;
for (SpillingGrouper<KeyType> grouper : groupers) {
final List<String> dictionary = grouper.getDictionary();
totalDictionarySize += dictionary.stream().mapToLong(RowBasedKeySerde::estimateStringKeySize).sum();
if (totalDictionarySize > maxDictionarySize) {
return null;
}
mergedDictionary.addAll(dictionary);
}
return ImmutableList.copyOf(mergedDictionary);
}
/**
* Build a combining tree for the input iterators which combine input entries asynchronously. This method is called
* when data are spilled and thus streaming combine is preferred to avoid too many disk accesses.
@ -329,7 +361,10 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
*
* @return an iterator of the root grouper of the combining tree
*/
private Iterator<Entry<KeyType>> parallelCombine(List<Iterator<Entry<KeyType>>> sortedIterators)
private Iterator<Entry<KeyType>> parallelCombine(
List<Iterator<Entry<KeyType>>> sortedIterators,
List<String> mergedDictionary
)
{
// CombineBuffer is initialized when this method is called
final ResourceHolder<ByteBuffer> combineBufferHolder = combineBufferSupplier.get();
@ -339,7 +374,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
combiningFactories[i] = aggregatorFactories[i].getCombiningFactory();
}
final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity(
keySerdeFactory.factorize(),
mergingKeySerdeFactory.factorizeWithDictionary(mergedDictionary),
combiningFactories
);
final Pair<Integer, Integer> degreeAndBufferNum = findCombineDegreeAndNumBuffers(
@ -383,7 +418,8 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
bufferSupplier,
combiningFactories,
combineDegree,
combineFutures
combineFutures,
mergedDictionary
);
return new Iterator<Entry<KeyType>>()
@ -492,7 +528,8 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
Supplier<ByteBuffer> bufferSupplier,
AggregatorFactory[] combiningFactories,
int combineDegree,
List<Future> combineFutures
List<Future> combineFutures,
List<String> dictionary
)
{
final int numIterators = sortedIterators.size();
@ -506,13 +543,14 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
bufferSupplier,
combiningFactories,
combineDegree,
combineFutures
combineFutures,
dictionary
)
);
}
return runCombiner(childIterators, bufferSupplier.get(), combiningFactories, combineFutures).iterator();
return runCombiner(childIterators, bufferSupplier.get(), combiningFactories, combineFutures, dictionary).iterator();
} else {
return runCombiner(sortedIterators, bufferSupplier.get(), combiningFactories, combineFutures).iterator();
return runCombiner(sortedIterators, bufferSupplier.get(), combiningFactories, combineFutures, dictionary).iterator();
}
}
@ -528,7 +566,8 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
List<Iterator<Entry<KeyType>>> iterators,
ByteBuffer combineBuffer,
AggregatorFactory[] combiningFactories,
List<Future> combineFutures
List<Future> combineFutures,
List<String> dictionary
)
{
final Iterator<Entry<KeyType>> mergedIterator = Groupers.mergeIterators(iterators, keyObjComparator);
@ -536,7 +575,7 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
new SettableColumnSelectorFactory(aggregatorFactories);
final StreamingMergeSortedGrouper<KeyType> grouper = new StreamingMergeSortedGrouper<>(
Suppliers.ofInstance(combineBuffer),
keySerdeFactory.factorize(),
mergingKeySerdeFactory.factorizeWithDictionary(dictionary),
settableColumnSelectorFactory,
combiningFactories
);

View File

@ -22,6 +22,7 @@ package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.ResourceHolder;
@ -730,6 +731,12 @@ public class GroupByQueryEngineV2
return ByteBuffer.class;
}
@Override
public List<String> getDictionary()
{
return ImmutableList.of();
}
@Override
public ByteBuffer toByteBuffer(ByteBuffer key)
{

View File

@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.function.ToIntFunction;
/**
@ -186,10 +187,22 @@ public interface Grouper<KeyType> extends Closeable
interface KeySerdeFactory<T>
{
/**
* Create a new KeySerde, which may be stateful.
* Return max dictionary size threshold.
*
* @return max dictionary size
*/
long getMaxDictionarySize();
/**
* Create a new {@link KeySerde}, which may be stateful.
*/
KeySerde<T> factorize();
/**
* Create a new {@link KeySerde} with the given dictionary.
*/
KeySerde<T> factorizeWithDictionary(List<String> dictionary);
/**
* Return an object that knows how to compare two serialized key instances. Will be called by the
* {@link #iterator(boolean)} method if sorting is enabled.
@ -217,6 +230,11 @@ public interface Grouper<KeyType> extends Closeable
*/
Class<T> keyClazz();
/**
* Return the dictionary of this KeySerde. The return value should not be null.
*/
List<String> getDictionary();
/**
* Serialize a key. This will be called by the {@link #aggregate(Comparable)} method. The buffer will not
* be retained after the aggregate method returns, so reusing buffers is OK.

View File

@ -22,7 +22,6 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
@ -38,6 +37,7 @@ import io.druid.collections.ResourceHolder;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.AllGranularity;
import io.druid.java.util.common.guava.Accumulator;
@ -50,6 +50,7 @@ import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
import io.druid.query.groupby.epinephelinae.Grouper.BufferComparator;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
@ -65,6 +66,8 @@ import io.druid.segment.LongColumnSelector;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -77,6 +80,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
// this class contains shared code between GroupByMergingQueryRunnerV2 and GroupByRowProcessor
public class RowBasedGrouperHelper
@ -192,10 +196,21 @@ public class RowBasedGrouperHelper
sortHasNonGroupingFields
);
} else {
final Grouper.KeySerdeFactory<RowBasedKey> mergingKeySerdeFactory = new RowBasedKeySerdeFactory(
includeTimestamp,
query.getContextSortByDimsFirst(),
query.getDimensions(),
querySpecificConfig.getMaxMergingDictionarySize(), // use entire dictionary space for one key serde
valueTypes,
aggregatorFactories,
limitSpec
);
grouper = new ConcurrentGrouper<>(
bufferSupplier,
combineBufferSupplier,
keySerdeFactory,
mergingKeySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
querySpecificConfig.getBufferGrouperMaxSize(),
@ -646,6 +661,12 @@ public class RowBasedGrouperHelper
this.valueTypes = valueTypes;
}
@Override
public long getMaxDictionarySize()
{
return maxDictionarySize;
}
@Override
public Grouper.KeySerde<RowBasedKey> factorize()
{
@ -655,7 +676,22 @@ public class RowBasedGrouperHelper
dimensions,
maxDictionarySize,
limitSpec,
valueTypes
valueTypes,
null
);
}
@Override
public Grouper.KeySerde<RowBasedKey> factorizeWithDictionary(List<String> dictionary)
{
return new RowBasedKeySerde(
includeTimestamp,
sortByDimsFirst,
dimensions,
maxDictionarySize,
limitSpec,
valueTypes,
dictionary
);
}
@ -893,10 +929,12 @@ public class RowBasedGrouperHelper
}
}
private static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedGrouperHelper.RowBasedKey>
static class RowBasedKeySerde implements Grouper.KeySerde<RowBasedGrouperHelper.RowBasedKey>
{
// Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes
private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES;
private static final int DICTIONARY_INITIAL_CAPACITY = 10000;
private static final int UNKNOWN_DICTIONARY_KEY = -1;
private final boolean includeTimestamp;
private final boolean sortByDimsFirst;
@ -904,14 +942,18 @@ public class RowBasedGrouperHelper
private final int dimCount;
private final int keySize;
private final ByteBuffer keyBuffer;
private final List<String> dictionary = Lists.newArrayList();
private final Map<String, Integer> reverseDictionary = Maps.newHashMap();
private final List<RowBasedKeySerdeHelper> serdeHelpers;
private final DefaultLimitSpec limitSpec;
private final List<ValueType> valueTypes;
private final boolean enableRuntimeDictionaryGeneration;
private final List<String> dictionary;
private final Object2IntMap<String> reverseDictionary;
// Size limiting for the dictionary, in (roughly estimated) bytes.
private final long maxDictionarySize;
private long currentEstimatedSize = 0;
// dictionary id -> its position if it were sorted by dictionary value
@ -923,19 +965,57 @@ public class RowBasedGrouperHelper
final List<DimensionSpec> dimensions,
final long maxDictionarySize,
final DefaultLimitSpec limitSpec,
final List<ValueType> valueTypes
final List<ValueType> valueTypes,
@Nullable final List<String> dictionary
)
{
this.includeTimestamp = includeTimestamp;
this.sortByDimsFirst = sortByDimsFirst;
this.dimensions = dimensions;
this.dimCount = dimensions.size();
this.maxDictionarySize = maxDictionarySize;
this.valueTypes = valueTypes;
this.limitSpec = limitSpec;
this.serdeHelpers = makeSerdeHelpers();
this.enableRuntimeDictionaryGeneration = dictionary == null;
this.dictionary = enableRuntimeDictionaryGeneration ? new ArrayList<>(DICTIONARY_INITIAL_CAPACITY) : dictionary;
this.reverseDictionary = enableRuntimeDictionaryGeneration ?
new Object2IntOpenHashMap<>(DICTIONARY_INITIAL_CAPACITY) :
new Object2IntOpenHashMap<>(dictionary.size());
this.reverseDictionary.defaultReturnValue(UNKNOWN_DICTIONARY_KEY);
this.maxDictionarySize = maxDictionarySize;
this.serdeHelpers = makeSerdeHelpers(limitSpec != null, enableRuntimeDictionaryGeneration);
this.keySize = (includeTimestamp ? Longs.BYTES : 0) + getTotalKeySize();
this.keyBuffer = ByteBuffer.allocate(keySize);
if (!enableRuntimeDictionaryGeneration) {
final long initialDictionarySize = dictionary.stream().mapToLong(RowBasedKeySerde::estimateStringKeySize).sum();
Preconditions.checkState(
maxDictionarySize >= initialDictionarySize,
"Dictionary size[%s] exceeds threshold[%s]",
initialDictionarySize,
maxDictionarySize
);
for (int i = 0; i < dictionary.size(); i++) {
reverseDictionary.put(dictionary.get(i), i);
}
initializeSortableIds();
}
}
private void initializeSortableIds()
{
final int dictionarySize = dictionary.size();
final Pair<String, Integer>[] dictAndIds = new Pair[dictionarySize];
for (int id = 0; id < dictionarySize; id++) {
dictAndIds[id] = new Pair<>(dictionary.get(id), id);
}
Arrays.sort(dictAndIds, Comparator.comparing(pair -> pair.lhs));
sortableIds = new int[dictionarySize];
for (int i = 0; i < dictionarySize; i++) {
sortableIds[dictAndIds[i].rhs] = i;
}
}
@Override
@ -950,6 +1030,12 @@ public class RowBasedGrouperHelper
return RowBasedKey.class;
}
@Override
public List<String> getDictionary()
{
return dictionary;
}
@Override
public ByteBuffer toByteBuffer(RowBasedKey key)
{
@ -1002,17 +1088,7 @@ public class RowBasedGrouperHelper
public Grouper.BufferComparator bufferComparator()
{
if (sortableIds == null) {
final int dictionarySize = dictionary.size();
final Pair<String, Integer>[] dictAndIds = new Pair[dictionarySize];
for (int id = 0; id < dictionarySize; id++) {
dictAndIds[id] = new Pair<>(dictionary.get(id), id);
}
Arrays.sort(dictAndIds, Comparator.comparing(pair -> pair.lhs));
sortableIds = new int[dictionarySize];
for (int i = 0; i < dictionarySize; i++) {
sortableIds[dictAndIds[i].rhs] = i;
}
initializeSortableIds();
}
if (includeTimestamp) {
@ -1024,7 +1100,6 @@ public class RowBasedGrouperHelper
{
final int cmp = compareDimsInBuffersForNullFudgeTimestamp(
serdeHelpers,
sortableIds,
dimCount,
lhsBuffer,
rhsBuffer,
@ -1052,7 +1127,6 @@ public class RowBasedGrouperHelper
return compareDimsInBuffersForNullFudgeTimestamp(
serdeHelpers,
sortableIds,
dimCount,
lhsBuffer,
rhsBuffer,
@ -1113,35 +1187,19 @@ public class RowBasedGrouperHelper
int aggIndex = OrderByColumnSpec.getAggIndexForOrderBy(orderSpec, Arrays.asList(aggregatorFactories));
if (aggIndex >= 0) {
final RowBasedKeySerdeHelper serdeHelper;
final StringComparator cmp = orderSpec.getDimensionComparator();
final boolean cmpIsNumeric = cmp == StringComparators.NUMERIC;
final StringComparator stringComparator = orderSpec.getDimensionComparator();
final String typeName = aggregatorFactories[aggIndex].getTypeName();
final int aggOffset = aggregatorOffsets[aggIndex] - Ints.BYTES;
aggCount++;
if (typeName.equals("long")) {
if (cmpIsNumeric) {
serdeHelper = new LongRowBasedKeySerdeHelper(aggOffset);
} else {
serdeHelper = new LimitPushDownLongRowBasedKeySerdeHelper(aggOffset, cmp);
}
} else if (typeName.equals("float")) {
if (cmpIsNumeric) {
serdeHelper = new FloatRowBasedKeySerdeHelper(aggOffset);
} else {
serdeHelper = new LimitPushDownFloatRowBasedKeySerdeHelper(aggOffset, cmp);
}
} else if (typeName.equals("double")) {
if (cmpIsNumeric) {
serdeHelper = new DoubleRowBasedKeySerdeHelper(aggOffset);
} else {
serdeHelper = new LimitPushDownDoubleRowBasedKeySerdeHelper(aggOffset, cmp);
}
} else {
final ValueType valueType = ValueType.fromString(typeName);
if (!ValueType.isNumeric(valueType)) {
throw new IAE("Cannot order by a non-numeric aggregator[%s]", orderSpec);
}
serdeHelper = makeNumericSerdeHelper(valueType, aggOffset, true, stringComparator);
orderByHelpers.add(serdeHelper);
needsReverses.add(needsReverse);
}
@ -1244,98 +1302,15 @@ public class RowBasedGrouperHelper
}
}
private static int compareDimsInBuffersForNullFudgeTimestamp(
List<RowBasedKeySerdeHelper> serdeHelpers,
int[] sortableIds,
int dimCount,
ByteBuffer lhsBuffer,
ByteBuffer rhsBuffer,
int lhsPosition,
int rhsPosition
)
{
for (int i = 0; i < dimCount; i++) {
final int cmp = serdeHelpers.get(i).compare(
lhsBuffer,
rhsBuffer,
lhsPosition + Longs.BYTES,
rhsPosition + Longs.BYTES
);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
private static int compareDimsInBuffersForNullFudgeTimestampForPushDown(
List<RowBasedKeySerdeHelper> serdeHelpers,
List<Boolean> needsReverses,
int dimCount,
ByteBuffer lhsBuffer,
ByteBuffer rhsBuffer,
int lhsPosition,
int rhsPosition
)
{
for (int i = 0; i < dimCount; i++) {
final int cmp;
if (needsReverses.get(i)) {
cmp = serdeHelpers.get(i).compare(
rhsBuffer,
lhsBuffer,
rhsPosition + Longs.BYTES,
lhsPosition + Longs.BYTES
);
} else {
cmp = serdeHelpers.get(i).compare(
lhsBuffer,
rhsBuffer,
lhsPosition + Longs.BYTES,
rhsPosition + Longs.BYTES
);
}
if (cmp != 0) {
return cmp;
}
}
return 0;
}
@Override
public void reset()
{
dictionary.clear();
reverseDictionary.clear();
sortableIds = null;
currentEstimatedSize = 0;
}
/**
* Adds s to the dictionary. If the dictionary's size limit would be exceeded by adding this key, then
* this returns -1.
*
* @param s a string
*
* @return id for this string, or -1
*/
private int addToDictionary(final String s)
{
Integer idx = reverseDictionary.get(s);
if (idx == null) {
final long additionalEstimatedSize = (long) s.length() * Chars.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) {
return -1;
}
idx = dictionary.size();
reverseDictionary.put(s, idx);
dictionary.add(s);
currentEstimatedSize += additionalEstimatedSize;
if (enableRuntimeDictionaryGeneration) {
dictionary.clear();
reverseDictionary.clear();
sortableIds = null;
currentEstimatedSize = 0;
}
return idx;
}
private int getTotalKeySize()
@ -1347,143 +1322,127 @@ public class RowBasedGrouperHelper
return size;
}
private List<RowBasedKeySerdeHelper> makeSerdeHelpers()
private List<RowBasedKeySerdeHelper> makeSerdeHelpers(
boolean pushLimitDown,
boolean enableRuntimeDictionaryGeneration
)
{
if (limitSpec != null) {
return makeSerdeHelpersForLimitPushDown();
}
List<RowBasedKeySerdeHelper> helpers = new ArrayList<>();
final List<RowBasedKeySerdeHelper> helpers = new ArrayList<>();
int keyBufferPosition = 0;
for (ValueType valType : valueTypes) {
RowBasedKeySerdeHelper helper;
switch (valType) {
case STRING:
helper = new StringRowBasedKeySerdeHelper(keyBufferPosition);
break;
case LONG:
helper = new LongRowBasedKeySerdeHelper(keyBufferPosition);
break;
case FLOAT:
helper = new FloatRowBasedKeySerdeHelper(keyBufferPosition);
break;
case DOUBLE:
helper = new DoubleRowBasedKeySerdeHelper(keyBufferPosition);
break;
default:
throw new IAE("invalid type: %s", valType);
for (int i = 0; i < dimCount; i++) {
final StringComparator stringComparator;
if (limitSpec != null) {
final String dimName = dimensions.get(i).getOutputName();
stringComparator = DefaultLimitSpec.getComparatorForDimName(limitSpec, dimName);
} else {
stringComparator = null;
}
RowBasedKeySerdeHelper helper = makeSerdeHelper(
valueTypes.get(i),
keyBufferPosition,
pushLimitDown,
stringComparator,
enableRuntimeDictionaryGeneration
);
keyBufferPosition += helper.getKeyBufferValueSize();
helpers.add(helper);
}
return helpers;
}
private List<RowBasedKeySerdeHelper> makeSerdeHelpersForLimitPushDown()
private RowBasedKeySerdeHelper makeSerdeHelper(
ValueType valueType,
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator,
boolean enableRuntimeDictionaryGeneration
)
{
List<RowBasedKeySerdeHelper> helpers = new ArrayList<>();
int keyBufferPosition = 0;
for (int i = 0; i < valueTypes.size(); i++) {
final ValueType valType = valueTypes.get(i);
final String dimName = dimensions.get(i).getOutputName();
StringComparator cmp = DefaultLimitSpec.getComparatorForDimName(limitSpec, dimName);
final boolean cmpIsNumeric = cmp == StringComparators.NUMERIC;
RowBasedKeySerdeHelper helper;
switch (valType) {
case STRING:
if (cmp == null) {
cmp = StringComparators.LEXICOGRAPHIC;
}
helper = new LimitPushDownStringRowBasedKeySerdeHelper(keyBufferPosition, cmp);
break;
case LONG:
if (cmp == null || cmpIsNumeric) {
helper = new LongRowBasedKeySerdeHelper(keyBufferPosition);
} else {
helper = new LimitPushDownLongRowBasedKeySerdeHelper(keyBufferPosition, cmp);
}
break;
case FLOAT:
if (cmp == null || cmpIsNumeric) {
helper = new FloatRowBasedKeySerdeHelper(keyBufferPosition);
} else {
helper = new LimitPushDownFloatRowBasedKeySerdeHelper(keyBufferPosition, cmp);
}
break;
case DOUBLE:
if (cmp == null || cmpIsNumeric) {
helper = new DoubleRowBasedKeySerdeHelper(keyBufferPosition);
} else {
helper = new LimitPushDownDoubleRowBasedKeySerdeHelper(keyBufferPosition, cmp);
}
break;
default:
throw new IAE("invalid type: %s", valType);
}
keyBufferPosition += helper.getKeyBufferValueSize();
helpers.add(helper);
switch (valueType) {
case STRING:
if (enableRuntimeDictionaryGeneration) {
return new DynamicDictionaryStringRowBasedKeySerdeHelper(
keyBufferPosition,
pushLimitDown,
stringComparator
);
} else {
return new StaticDictionaryStringRowBasedKeySerdeHelper(
keyBufferPosition,
pushLimitDown,
stringComparator
);
}
case LONG:
case FLOAT:
case DOUBLE:
return makeNumericSerdeHelper(valueType, keyBufferPosition, pushLimitDown, stringComparator);
default:
throw new IAE("invalid type: %s", valueType);
}
return helpers;
}
private interface RowBasedKeySerdeHelper
private RowBasedKeySerdeHelper makeNumericSerdeHelper(
ValueType valueType,
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
/**
* @return The size in bytes for a value of the column handled by this SerdeHelper.
*/
int getKeyBufferValueSize();
/**
* Read a value from RowBasedKey at `idx` and put the value at the current position of RowBasedKeySerde's keyBuffer.
* advancing the position by the size returned by getKeyBufferValueSize().
*
* If an internal resource limit has been reached and the value could not be added to the keyBuffer,
* (e.g., maximum dictionary size exceeded for Strings), this method returns false.
*
* @param key RowBasedKey containing the grouping key values for a row.
* @param idx Index of the grouping key column within that this SerdeHelper handles
*
* @return true if the value was added to the key, false otherwise
*/
boolean putToKeyBuffer(RowBasedKey key, int idx);
/**
* Read a value from a ByteBuffer containing a grouping key in the same format as RowBasedKeySerde's keyBuffer and
* put the value in `dimValues` at `dimValIdx`.
*
* The value to be read resides in the buffer at position (`initialOffset` + the SerdeHelper's keyBufferPosition).
*
* @param buffer ByteBuffer containing an array of grouping keys for a row
* @param initialOffset Offset where non-timestamp grouping key columns start, needed because timestamp is not
* always included in the buffer.
* @param dimValIdx Index within dimValues to store the value read from the buffer
* @param dimValues Output array containing grouping key values for a row
*/
void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues);
/**
* Compare the values at lhsBuffer[lhsPosition] and rhsBuffer[rhsPosition] using the natural ordering
* for this SerdeHelper's value type.
*
* @param lhsBuffer ByteBuffer containing an array of grouping keys for a row
* @param rhsBuffer ByteBuffer containing an array of grouping keys for a row
* @param lhsPosition Position of value within lhsBuffer
* @param rhsPosition Position of value within rhsBuffer
*
* @return Negative number if lhs < rhs, positive if lhs > rhs, 0 if lhs == rhs
*/
int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition);
switch (valueType) {
case LONG:
return new LongRowBasedKeySerdeHelper(keyBufferPosition, pushLimitDown, stringComparator);
case FLOAT:
return new FloatRowBasedKeySerdeHelper(keyBufferPosition, pushLimitDown, stringComparator);
case DOUBLE:
return new DoubleRowBasedKeySerdeHelper(keyBufferPosition, pushLimitDown, stringComparator);
default:
throw new IAE("invalid type: %s", valueType);
}
}
private class StringRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
static long estimateStringKeySize(String key)
{
final int keyBufferPosition;
return (long) key.length() * Chars.BYTES + ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY;
}
public StringRowBasedKeySerdeHelper(int keyBufferPosition)
private static boolean isPrimitiveComparable(boolean pushLimitDown, @Nullable StringComparator stringComparator)
{
return !pushLimitDown || stringComparator == null || stringComparator == StringComparators.NUMERIC;
}
private abstract class AbstractStringRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
{
protected final int keyBufferPosition;
protected final BufferComparator bufferComparator;
protected AbstractStringRowBasedKeySerdeHelper(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
this.keyBufferPosition = keyBufferPosition;
if (!pushLimitDown) {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Ints.compare(
sortableIds[lhsBuffer.getInt(lhsPosition + keyBufferPosition)],
sortableIds[rhsBuffer.getInt(rhsPosition + keyBufferPosition)]
);
} else {
final StringComparator realComparator = stringComparator == null ?
StringComparators.LEXICOGRAPHIC :
stringComparator;
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
String lhsStr = dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
String rhsStr = dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
return realComparator.compare(lhsStr, rhsStr);
};
}
}
@Override
@ -1492,6 +1451,30 @@ public class RowBasedGrouperHelper
return Ints.BYTES;
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
{
dimValues[dimValIdx] = dictionary.get(buffer.getInt(initialOffset + keyBufferPosition));
}
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
return bufferComparator.compare(lhsBuffer, rhsBuffer, lhsPosition, rhsPosition);
}
}
private class DynamicDictionaryStringRowBasedKeySerdeHelper extends AbstractStringRowBasedKeySerdeHelper
{
DynamicDictionaryStringRowBasedKeySerdeHelper(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
super(keyBufferPosition, pushLimitDown, stringComparator);
}
@Override
public boolean putToKeyBuffer(RowBasedKey key, int idx)
{
@ -1503,48 +1486,82 @@ public class RowBasedGrouperHelper
return true;
}
@Override
public void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues)
/**
* Adds s to the dictionary. If the dictionary's size limit would be exceeded by adding this key, then
* this returns -1.
*
* @param s a string
*
* @return id for this string, or -1
*/
private int addToDictionary(final String s)
{
dimValues[dimValIdx] = dictionary.get(buffer.getInt(initialOffset + keyBufferPosition));
}
int idx = reverseDictionary.getInt(s);
if (idx == UNKNOWN_DICTIONARY_KEY) {
final long additionalEstimatedSize = estimateStringKeySize(s);
if (currentEstimatedSize + additionalEstimatedSize > maxDictionarySize) {
return -1;
}
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
return Ints.compare(
sortableIds[lhsBuffer.getInt(lhsPosition + keyBufferPosition)],
sortableIds[rhsBuffer.getInt(rhsPosition + keyBufferPosition)]
);
idx = dictionary.size();
reverseDictionary.put(s, idx);
dictionary.add(s);
currentEstimatedSize += additionalEstimatedSize;
}
return idx;
}
}
private class LimitPushDownStringRowBasedKeySerdeHelper extends StringRowBasedKeySerdeHelper
private class StaticDictionaryStringRowBasedKeySerdeHelper extends AbstractStringRowBasedKeySerdeHelper
{
final StringComparator cmp;
public LimitPushDownStringRowBasedKeySerdeHelper(int keyBufferPosition, StringComparator cmp)
StaticDictionaryStringRowBasedKeySerdeHelper(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
super(keyBufferPosition);
this.cmp = cmp;
super(keyBufferPosition, pushLimitDown, stringComparator);
}
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
public boolean putToKeyBuffer(RowBasedKey key, int idx)
{
String lhsStr = dictionary.get(lhsBuffer.getInt(lhsPosition + keyBufferPosition));
String rhsStr = dictionary.get(rhsBuffer.getInt(rhsPosition + keyBufferPosition));
return cmp.compare(lhsStr, rhsStr);
final String stringKey = (String) key.getKey()[idx];
final int dictIndex = reverseDictionary.getInt(stringKey);
if (dictIndex == UNKNOWN_DICTIONARY_KEY) {
throw new ISE("Cannot find key[%s] from dictionary", stringKey);
}
keyBuffer.putInt(dictIndex);
return true;
}
}
private class LongRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
{
final int keyBufferPosition;
final BufferComparator bufferComparator;
public LongRowBasedKeySerdeHelper(int keyBufferPosition)
LongRowBasedKeySerdeHelper(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
this.keyBufferPosition = keyBufferPosition;
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Longs.compare(
lhsBuffer.getLong(lhsPosition + keyBufferPosition),
rhsBuffer.getLong(rhsPosition + keyBufferPosition)
);
} else {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
long lhs = lhsBuffer.getLong(lhsPosition + keyBufferPosition);
long rhs = rhsBuffer.getLong(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
}
@Override
@ -1569,40 +1586,33 @@ public class RowBasedGrouperHelper
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
return Longs.compare(
lhsBuffer.getLong(lhsPosition + keyBufferPosition),
rhsBuffer.getLong(rhsPosition + keyBufferPosition)
);
}
}
private class LimitPushDownLongRowBasedKeySerdeHelper extends LongRowBasedKeySerdeHelper
{
final StringComparator cmp;
public LimitPushDownLongRowBasedKeySerdeHelper(int keyBufferPosition, StringComparator cmp)
{
super(keyBufferPosition);
this.cmp = cmp;
}
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
long lhs = lhsBuffer.getLong(lhsPosition + keyBufferPosition);
long rhs = rhsBuffer.getLong(rhsPosition + keyBufferPosition);
return cmp.compare(String.valueOf(lhs), String.valueOf(rhs));
return bufferComparator.compare(lhsBuffer, rhsBuffer, lhsPosition, rhsPosition);
}
}
private class FloatRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
{
final int keyBufferPosition;
final BufferComparator bufferComparator;
public FloatRowBasedKeySerdeHelper(int keyBufferPosition)
FloatRowBasedKeySerdeHelper(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator)
{
this.keyBufferPosition = keyBufferPosition;
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Float.compare(
lhsBuffer.getFloat(lhsPosition + keyBufferPosition),
rhsBuffer.getFloat(rhsPosition + keyBufferPosition)
);
} else {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
float lhs = lhsBuffer.getFloat(lhsPosition + keyBufferPosition);
float rhs = rhsBuffer.getFloat(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
}
@Override
@ -1627,39 +1637,34 @@ public class RowBasedGrouperHelper
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
return Float.compare(
lhsBuffer.getFloat(lhsPosition + keyBufferPosition),
rhsBuffer.getFloat(rhsPosition + keyBufferPosition)
);
}
}
private class LimitPushDownFloatRowBasedKeySerdeHelper extends FloatRowBasedKeySerdeHelper
{
final StringComparator cmp;
public LimitPushDownFloatRowBasedKeySerdeHelper(int keyBufferPosition, StringComparator cmp)
{
super(keyBufferPosition);
this.cmp = cmp;
}
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
float lhs = lhsBuffer.getFloat(lhsPosition + keyBufferPosition);
float rhs = rhsBuffer.getFloat(rhsPosition + keyBufferPosition);
return cmp.compare(String.valueOf(lhs), String.valueOf(rhs));
return bufferComparator.compare(lhsBuffer, rhsBuffer, lhsPosition, rhsPosition);
}
}
private class DoubleRowBasedKeySerdeHelper implements RowBasedKeySerdeHelper
{
final int keyBufferPosition;
final BufferComparator bufferComparator;
public DoubleRowBasedKeySerdeHelper(int keyBufferPosition)
DoubleRowBasedKeySerdeHelper(
int keyBufferPosition,
boolean pushLimitDown,
@Nullable StringComparator stringComparator
)
{
this.keyBufferPosition = keyBufferPosition;
if (isPrimitiveComparable(pushLimitDown, stringComparator)) {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> Double.compare(
lhsBuffer.getDouble(lhsPosition + keyBufferPosition),
rhsBuffer.getDouble(rhsPosition + keyBufferPosition)
);
} else {
bufferComparator = (lhsBuffer, rhsBuffer, lhsPosition, rhsPosition) -> {
double lhs = lhsBuffer.getDouble(lhsPosition + keyBufferPosition);
double rhs = rhsBuffer.getDouble(rhsPosition + keyBufferPosition);
return stringComparator.compare(String.valueOf(lhs), String.valueOf(rhs));
};
}
}
@Override
@ -1684,30 +1689,67 @@ public class RowBasedGrouperHelper
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
return Double.compare(
lhsBuffer.getDouble(lhsPosition + keyBufferPosition),
rhsBuffer.getDouble(rhsPosition + keyBufferPosition)
);
}
}
private class LimitPushDownDoubleRowBasedKeySerdeHelper extends DoubleRowBasedKeySerdeHelper
{
final StringComparator cmp;
public LimitPushDownDoubleRowBasedKeySerdeHelper(int keyBufferPosition, StringComparator cmp)
{
super(keyBufferPosition);
this.cmp = cmp;
}
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
double lhs = lhsBuffer.getDouble(lhsPosition + keyBufferPosition);
double rhs = rhsBuffer.getDouble(rhsPosition + keyBufferPosition);
return cmp.compare(String.valueOf(lhs), String.valueOf(rhs));
return bufferComparator.compare(lhsBuffer, rhsBuffer, lhsPosition, rhsPosition);
}
}
}
static int compareDimsInBuffersForNullFudgeTimestamp(
List<RowBasedKeySerdeHelper> serdeHelpers,
int dimCount,
ByteBuffer lhsBuffer,
ByteBuffer rhsBuffer,
int lhsPosition,
int rhsPosition
)
{
for (int i = 0; i < dimCount; i++) {
final int cmp = serdeHelpers.get(i).compare(
lhsBuffer,
rhsBuffer,
lhsPosition + Longs.BYTES,
rhsPosition + Longs.BYTES
);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
static int compareDimsInBuffersForNullFudgeTimestampForPushDown(
List<RowBasedKeySerdeHelper> serdeHelpers,
List<Boolean> needsReverses,
int dimCount,
ByteBuffer lhsBuffer,
ByteBuffer rhsBuffer,
int lhsPosition,
int rhsPosition
)
{
for (int i = 0; i < dimCount; i++) {
final int cmp;
if (needsReverses.get(i)) {
cmp = serdeHelpers.get(i).compare(
rhsBuffer,
lhsBuffer,
rhsPosition + Longs.BYTES,
lhsPosition + Longs.BYTES
);
} else {
cmp = serdeHelpers.get(i).compare(
lhsBuffer,
rhsBuffer,
lhsPosition + Longs.BYTES,
rhsPosition + Longs.BYTES
);
}
if (cmp != 0) {
return cmp;
}
}
return 0;
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
import java.nio.ByteBuffer;
interface RowBasedKeySerdeHelper
{
/**
* @return The size in bytes for a value of the column handled by this SerdeHelper.
*/
int getKeyBufferValueSize();
/**
* Read a value from RowBasedKey at `idx` and put the value at the current position of RowBasedKeySerde's keyBuffer.
* advancing the position by the size returned by getKeyBufferValueSize().
*
* If an internal resource limit has been reached and the value could not be added to the keyBuffer,
* (e.g., maximum dictionary size exceeded for Strings), this method returns false.
*
* @param key RowBasedKey containing the grouping key values for a row.
* @param idx Index of the grouping key column within that this SerdeHelper handles
*
* @return true if the value was added to the key, false otherwise
*/
boolean putToKeyBuffer(RowBasedKey key, int idx);
/**
* Read a value from a ByteBuffer containing a grouping key in the same format as RowBasedKeySerde's keyBuffer and
* put the value in `dimValues` at `dimValIdx`.
*
* The value to be read resides in the buffer at position (`initialOffset` + the SerdeHelper's keyBufferPosition).
*
* @param buffer ByteBuffer containing an array of grouping keys for a row
* @param initialOffset Offset where non-timestamp grouping key columns start, needed because timestamp is not
* always included in the buffer.
* @param dimValIdx Index within dimValues to store the value read from the buffer
* @param dimValues Output array containing grouping key values for a row
*/
void getFromByteBuffer(ByteBuffer buffer, int initialOffset, int dimValIdx, Comparable[] dimValues);
/**
* Compare the values at lhsBuffer[lhsPosition] and rhsBuffer[rhsPosition] using the natural ordering
* for this SerdeHelper's value type.
*
* @param lhsBuffer ByteBuffer containing an array of grouping keys for a row
* @param rhsBuffer ByteBuffer containing an array of grouping keys for a row
* @param lhsPosition Position of value within lhsBuffer
* @param rhsPosition Position of value within rhsBuffer
*
* @return Negative number if lhs < rhs, positive if lhs > rhs, 0 if lhs == rhs
*/
int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition);
}

View File

@ -42,8 +42,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Grouper based around a single underlying {@link BufferHashGrouper}. Not thread-safe.
@ -64,6 +66,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
private final Comparator<Grouper.Entry<KeyType>> defaultOrderKeyObjComparator;
private final List<File> files = Lists.newArrayList();
private final List<File> dictionaryFiles = Lists.newArrayList();
private final List<Closeable> closeables = Lists.newArrayList();
private final boolean sortHasNonGroupingFields;
@ -167,6 +170,30 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
deleteFiles();
}
public List<String> getDictionary()
{
final Set<String> mergedDictionary = new HashSet<>();
mergedDictionary.addAll(keySerde.getDictionary());
for (File dictFile : dictionaryFiles) {
try {
final MappingIterator<String> dictIterator = spillMapper.readValues(
spillMapper.getFactory().createParser(new LZ4BlockInputStream(new FileInputStream(dictFile))),
spillMapper.getTypeFactory().constructType(String.class)
);
while (dictIterator.hasNext()) {
mergedDictionary.add(dictIterator.next());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
return new ArrayList<>(mergedDictionary);
}
public void setSpillingAllowed(final boolean spillingAllowed)
{
this.spillingAllowed = spillingAllowed;
@ -214,24 +241,27 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
private void spill() throws IOException
{
final File outFile;
files.add(spill(grouper.iterator(true)));
dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
grouper.reset();
}
private <T> File spill(Iterator<T> iterator) throws IOException
{
try (
final LimitedTemporaryStorage.LimitedOutputStream out = temporaryStorage.createFile();
final LZ4BlockOutputStream compressedOut = new LZ4BlockOutputStream(out);
final JsonGenerator jsonGenerator = spillMapper.getFactory().createGenerator(compressedOut)
) {
outFile = out.getFile();
final Iterator<Entry<KeyType>> it = grouper.iterator(true);
while (it.hasNext()) {
while (iterator.hasNext()) {
BaseQuery.checkInterrupted();
jsonGenerator.writeObject(it.next());
jsonGenerator.writeObject(iterator.next());
}
}
files.add(outFile);
grouper.reset();
return out.getFile();
}
}
private MappingIterator<Entry<KeyType>> read(final File file, final Class<KeyType> keyClazz)

View File

@ -170,32 +170,33 @@ public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
@Override
public AggregateResult aggregate(KeyType key)
{
final ByteBuffer keyBuffer = keySerde.toByteBuffer(key);
if (keyBuffer == null) {
// This should bubble up to the user, so call finish() here.
try {
final ByteBuffer keyBuffer = keySerde.toByteBuffer(key);
if (keyBuffer.remaining() != keySize) {
throw new IAE(
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
}
final int prevRecordOffset = curWriteIndex * recordSize;
if (curWriteIndex == -1 || !keyEquals(keyBuffer, buffer, prevRecordOffset)) {
initNewSlot(keyBuffer);
}
final int curRecordOffset = curWriteIndex * recordSize;
for (int i = 0; i < aggregatorOffsets.length; i++) {
aggregators[i].aggregate(buffer, curRecordOffset + aggregatorOffsets[i]);
}
return AggregateResult.ok();
}
catch (Throwable t) {
finish();
return DICTIONARY_FULL;
throw t;
}
if (keyBuffer.remaining() != keySize) {
throw new IAE(
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
}
final int prevRecordOffset = curWriteIndex * recordSize;
if (curWriteIndex == -1 || !keyEquals(keyBuffer, buffer, prevRecordOffset)) {
initNewSlot(keyBuffer);
}
final int curRecordOffset = curWriteIndex * recordSize;
for (int i = 0; i < aggregatorOffsets.length; i++) {
aggregators[i].aggregate(buffer, curRecordOffset + aggregatorOffsets[i]);
}
return AggregateResult.ok();
}
private boolean keyEquals(ByteBuffer curKeyBuffer, ByteBuffer buffer, int bufferOffset)

View File

@ -20,6 +20,7 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.MoreExecutors;
@ -122,6 +123,12 @@ public class ConcurrentGrouperTest
private static final KeySerdeFactory<Long> keySerdeFactory = new KeySerdeFactory<Long>()
{
@Override
public long getMaxDictionarySize()
{
return 0;
}
@Override
public KeySerde<Long> factorize()
{
@ -141,6 +148,12 @@ public class ConcurrentGrouperTest
return Long.class;
}
@Override
public List<String> getDictionary()
{
return ImmutableList.of();
}
@Override
public ByteBuffer toByteBuffer(Long key)
{
@ -183,6 +196,12 @@ public class ConcurrentGrouperTest
};
}
@Override
public KeySerde<Long> factorizeWithDictionary(List<String> dictionary)
{
return factorize();
}
@Override
public Comparator<Grouper.Entry<Long>> objectComparator(boolean forceDefaultOrder)
{
@ -243,6 +262,7 @@ public class ConcurrentGrouperTest
bufferSupplier,
combineBufferSupplier,
keySerdeFactory,
keySerdeFactory,
null_factory,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
24,

View File

@ -19,11 +19,13 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import io.druid.query.aggregation.AggregatorFactory;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
public class IntKeySerde implements Grouper.KeySerde<Integer>
{
@ -66,6 +68,12 @@ public class IntKeySerde implements Grouper.KeySerde<Integer>
return Integer.class;
}
@Override
public List<String> getDictionary()
{
return ImmutableList.of();
}
@Override
public ByteBuffer toByteBuffer(Integer key)
{