Array-based aggregation for groupBy query (#4576)

* Array-based aggregation

* Fix handling missing grouping key

* Handle invalid offset

* Fix compilation

* Add cardinality check

* Fix cardinality check

* Address comments

* Address comments

* Address comments

* Address comments

* Cleanup GroupByQueryEngineV2.process

* Change to Byte.SIZE

* Add flatMap
This commit is contained in:
Jihoon Son 2017-08-04 02:04:54 +09:00 committed by Roman Leventov
parent 163b0edd79
commit f3f2cd35e1
29 changed files with 994 additions and 213 deletions

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -53,10 +54,13 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryEngine;
@ -238,6 +242,30 @@ public class GroupByBenchmark
basicQueries.put("nested", queryA); basicQueries.put("nested", queryA);
} }
{ // basic.filter
final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(basicSchema.getDataInterval())
);
// Use multiple aggregators to see how the number of aggregators impact to the query performance
List<AggregatorFactory> queryAggs = ImmutableList.of(
new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"),
new LongSumAggregatorFactory("rows", "rows"),
new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"),
new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf")
);
GroupByQuery queryA = GroupByQuery
.builder()
.setDataSource("blah")
.setQuerySegmentSpec(intervalSpec)
.setDimensions(ImmutableList.of(new DefaultDimensionSpec("dimUniform", null)))
.setAggregatorSpecs(queryAggs)
.setGranularity(Granularity.fromString(queryGranularity))
.setDimFilter(new BoundDimFilter("dimUniform", "0", "100", true, true, null, null, null))
.build();
basicQueries.put("filter", queryA);
}
SCHEMA_QUERY_MAP.put("basic", basicQueries); SCHEMA_QUERY_MAP.put("basic", basicQueries);
// simple one column schema, for testing performance difference between querying on numeric values as Strings and // simple one column schema, for testing performance difference between querying on numeric values as Strings and

View File

@ -153,6 +153,9 @@ threads. You can adjust this as necessary to balance concurrency and memory usag
historical nodes. historical nodes.
- groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2 - groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2
ignores chunkPeriod. ignores chunkPeriod.
- groupBy v2 supports both array-based aggregation and hash-based aggregation. The array-based aggregation is used only
when the grouping key is a single indexed string column. In array-based aggregation, the dictionary-encoded value is used
as the index, so the aggregated values in the array can be accessed directly without finding buckets based on hashing.
#### Memory tuning and resource limits #### Memory tuning and resource limits
@ -246,6 +249,7 @@ When using the "v2" strategy, the following query context parameters apply:
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| |`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.| |`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|
|`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.| |`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|
|`forceHashAggregation`|Force to use hash-based aggregation.|
When using the "v1" strategy, the following query context parameters apply: When using the "v1" strategy, the following query context parameters apply:

View File

@ -27,7 +27,7 @@ import com.yahoo.sketches.theta.UpdateSketch;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.groupby.epinephelinae.BufferGrouper; import io.druid.query.groupby.epinephelinae.BufferHashGrouper;
import io.druid.query.groupby.epinephelinae.Grouper; import io.druid.query.groupby.epinephelinae.Grouper;
import io.druid.query.groupby.epinephelinae.GrouperTestUtil; import io.druid.query.groupby.epinephelinae.GrouperTestUtil;
import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory; import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
@ -36,15 +36,15 @@ import org.junit.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public class BufferGrouperUsingSketchMergeAggregatorFactoryTest public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest
{ {
private static BufferGrouper<Integer> makeGrouper( private static BufferHashGrouper<Integer> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory, TestColumnSelectorFactory columnSelectorFactory,
int bufferSize, int bufferSize,
int initialBuckets int initialBuckets
) )
{ {
final BufferGrouper<Integer> grouper = new BufferGrouper<>( final BufferHashGrouper<Integer> grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)), Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)),
GrouperTestUtil.intKeySerde(), GrouperTestUtil.intKeySerde(),
columnSelectorFactory, columnSelectorFactory,

View File

@ -31,10 +31,10 @@ public class ConcatSequence<T> implements Sequence<T>
private final Sequence<Sequence<T>> baseSequences; private final Sequence<Sequence<T>> baseSequences;
public ConcatSequence( public ConcatSequence(
Sequence<Sequence<T>> baseSequences Sequence<? extends Sequence<? extends T>> baseSequences
) )
{ {
this.baseSequences = baseSequences; this.baseSequences = (Sequence<Sequence<T>>) baseSequences;
} }
@Override @Override

View File

@ -21,6 +21,7 @@ package io.druid.java.util.common.guava;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import java.io.Closeable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Function; import java.util.function.Function;
@ -68,6 +69,13 @@ public interface Sequence<T>
return new MappedSequence<>(this, mapper); return new MappedSequence<>(this, mapper);
} }
default <R> Sequence<R> flatMap(
Function<? super T, ? extends Sequence<? extends R>> mapper
)
{
return new ConcatSequence<>(this.map(mapper));
}
default <R> Sequence<R> flatMerge( default <R> Sequence<R> flatMerge(
Function<? super T, ? extends Sequence<? extends R>> mapper, Function<? super T, ? extends Sequence<? extends R>> mapper,
Ordering<? super R> ordering Ordering<? super R> ordering
@ -80,4 +88,9 @@ public interface Sequence<T>
{ {
return Sequences.withEffect(this, effect, effectExecutor); return Sequences.withEffect(this, effect, effectExecutor);
} }
default Sequence<T> withBaggage(Closeable baggage)
{
return Sequences.withBaggage(this, baggage);
}
} }

View File

@ -75,7 +75,7 @@ public class Sequences
return concat(Sequences.simple(sequences)); return concat(Sequences.simple(sequences));
} }
public static <T> Sequence<T> concat(Sequence<Sequence<T>> sequences) public static <T> Sequence<T> concat(Sequence<? extends Sequence<T>> sequences)
{ {
return new ConcatSequence<>(sequences); return new ConcatSequence<>(sequences);
} }

View File

@ -36,6 +36,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize";
private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage"; private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage";
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize"; private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
@JsonProperty @JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2; private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
@ -70,6 +71,9 @@ public class GroupByQueryConfig
@JsonProperty @JsonProperty
private boolean forcePushDownLimit = false; private boolean forcePushDownLimit = false;
@JsonProperty
private boolean forceHashAggregation = false;
public String getDefaultStrategy() public String getDefaultStrategy()
{ {
return defaultStrategy; return defaultStrategy;
@ -134,6 +138,11 @@ public class GroupByQueryConfig
{ {
return forcePushDownLimit; return forcePushDownLimit;
} }
public boolean isForceHashAggregation()
{
return forceHashAggregation;
}
public GroupByQueryConfig withOverrides(final GroupByQuery query) public GroupByQueryConfig withOverrides(final GroupByQuery query)
{ {
@ -169,6 +178,7 @@ public class GroupByQueryConfig
getMaxMergingDictionarySize() getMaxMergingDictionarySize()
); );
newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit()); newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation());
return newConfig; return newConfig;
} }
@ -185,6 +195,8 @@ public class GroupByQueryConfig
", bufferGrouperInitialBuckets=" + bufferGrouperInitialBuckets + ", bufferGrouperInitialBuckets=" + bufferGrouperInitialBuckets +
", maxMergingDictionarySize=" + maxMergingDictionarySize + ", maxMergingDictionarySize=" + maxMergingDictionarySize +
", maxOnDiskStorage=" + maxOnDiskStorage + ", maxOnDiskStorage=" + maxOnDiskStorage +
", forcePushDownLimit=" + forcePushDownLimit +
", forceHashAggregation=" + forceHashAggregation +
'}'; '}';
} }
} }

View File

@ -28,21 +28,10 @@ import io.druid.query.aggregation.BufferAggregator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType> public abstract class AbstractBufferHashGrouper<KeyType> implements Grouper<KeyType>
{ {
private static final AggregateResult DICTIONARY_FULL = AggregateResult.failure(
"Not enough dictionary space to execute this query. Try increasing "
+ "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
private static final AggregateResult HASHTABLE_FULL = AggregateResult.failure(
"Not enough aggregation table space to execute this query. Try increasing "
+ "druid.processing.buffer.sizeBytes or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
protected static final int HASH_SIZE = Ints.BYTES; protected static final int HASH_SIZE = Ints.BYTES;
protected static final Logger log = new Logger(AbstractBufferGrouper.class); protected static final Logger log = new Logger(AbstractBufferHashGrouper.class);
protected final Supplier<ByteBuffer> bufferSupplier; protected final Supplier<ByteBuffer> bufferSupplier;
protected final KeySerde<KeyType> keySerde; protected final KeySerde<KeyType> keySerde;
@ -61,7 +50,8 @@ public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType>
protected ByteBufferHashTable hashTable; protected ByteBufferHashTable hashTable;
protected ByteBuffer hashTableBuffer; // buffer for the entire hash table (total space, not individual growth) protected ByteBuffer hashTableBuffer; // buffer for the entire hash table (total space, not individual growth)
public AbstractBufferGrouper( public AbstractBufferHashGrouper(
// the buffer returned from the below supplier can have dirty bits and should be cleared during initialization
final Supplier<ByteBuffer> bufferSupplier, final Supplier<ByteBuffer> bufferSupplier,
final KeySerde<KeyType> keySerde, final KeySerde<KeyType> keySerde,
final AggregatorFactory[] aggregatorFactories, final AggregatorFactory[] aggregatorFactories,
@ -77,7 +67,7 @@ public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType>
} }
/** /**
* Called when a new bucket is used for an entry in the hash table. An implementing BufferGrouper class * Called when a new bucket is used for an entry in the hash table. An implementing BufferHashGrouper class
* can use this to update its own state, e.g. tracking bucket offsets in a structure outside of the hash table. * can use this to update its own state, e.g. tracking bucket offsets in a structure outside of the hash table.
* *
* @param bucketOffset offset of the new bucket, within the buffer returned by hashTable.getTableBuffer() * @param bucketOffset offset of the new bucket, within the buffer returned by hashTable.getTableBuffer()
@ -95,7 +85,7 @@ public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType>
public abstract boolean canSkipAggregate(boolean bucketWasUsed, int bucketOffset); public abstract boolean canSkipAggregate(boolean bucketWasUsed, int bucketOffset);
/** /**
* Called after a row is aggregated. An implementing BufferGrouper class can use this to update * Called after a row is aggregated. An implementing BufferHashGrouper class can use this to update
* its own state, e.g. reading the new aggregated values for the row's key and acting on that information. * its own state, e.g. reading the new aggregated values for the row's key and acting on that information.
* *
* @param bucketOffset Offset of the bucket containing the row that was aggregated, * @param bucketOffset Offset of the bucket containing the row that was aggregated,
@ -134,7 +124,7 @@ public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType>
if (keyBuffer == null) { if (keyBuffer == null) {
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct. // be correct.
return DICTIONARY_FULL; return Groupers.DICTIONARY_FULL;
} }
if (keyBuffer.remaining() != keySize) { if (keyBuffer.remaining() != keySize) {
@ -150,7 +140,7 @@ public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType>
if (bucket < 0) { if (bucket < 0) {
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct. // be correct.
return HASHTABLE_FULL; return Groupers.HASH_TABLE_FULL;
} }
final int bucketStartOffset = hashTable.getOffsetForBucket(bucket); final int bucketStartOffset = hashTable.getOffsetForBucket(bucket);
@ -181,12 +171,6 @@ public abstract class AbstractBufferGrouper<KeyType> implements Grouper<KeyType>
return AggregateResult.ok(); return AggregateResult.ok();
} }
@Override
public AggregateResult aggregate(final KeyType key)
{
return aggregate(key, Groupers.hash(key));
}
@Override @Override
public void close() public void close()
{ {

View File

@ -0,0 +1,284 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy;
import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.ToIntFunction;
/**
* A buffer grouper for array-based aggregation. This grouper stores aggregated values in the buffer using the grouping
* key as the index.
* <p>
* The buffer is divided into 2 separate regions, i.e., used flag buffer and value buffer. The used flag buffer is a
* bit set to represent which keys are valid. If a bit of an index is set, that key is valid. Finally, the value
* buffer is used to store aggregated values. The first index is reserved for
* {@link GroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE}.
* <p>
* This grouper is available only when the grouping key is a single indexed dimension of a known cardinality because it
* directly uses the dimension value as the index for array access. Since the cardinality for the grouping key across
* different segments cannot be currently retrieved, this grouper can be used only when performing per-segment query
* execution.
*/
public class BufferArrayGrouper implements Grouper<Integer>
{
private static final Logger LOG = new Logger(BufferArrayGrouper.class);
private final Supplier<ByteBuffer> bufferSupplier;
private final BufferAggregator[] aggregators;
private final int[] aggregatorOffsets;
private final int cardinalityWithMissingValue;
private final int recordSize; // size of all aggregated values
private boolean initialized = false;
private ByteBuffer usedFlagBuffer;
private ByteBuffer valBuffer;
static int requiredBufferCapacity(
int cardinality,
AggregatorFactory[] aggregatorFactories
)
{
final int cardinalityWithMissingValue = cardinality + 1;
final int recordSize = Arrays.stream(aggregatorFactories)
.mapToInt(AggregatorFactory::getMaxIntermediateSize)
.sum();
return getUsedFlagBufferCapacity(cardinalityWithMissingValue) + // total used flags size
cardinalityWithMissingValue * recordSize; // total values size
}
/**
* Compute the number of bytes to store all used flag bits.
*/
private static int getUsedFlagBufferCapacity(int cardinalityWithMissingValue)
{
return (cardinalityWithMissingValue + Byte.SIZE - 1) / Byte.SIZE;
}
public BufferArrayGrouper(
// the buffer returned from the below supplier can have dirty bits and should be cleared during initialization
final Supplier<ByteBuffer> bufferSupplier,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
final int cardinality
)
{
Preconditions.checkNotNull(aggregatorFactories, "aggregatorFactories");
Preconditions.checkArgument(cardinality > 0, "Cardinality must a non-zero positive number");
this.bufferSupplier = Preconditions.checkNotNull(bufferSupplier, "bufferSupplier");
this.aggregators = new BufferAggregator[aggregatorFactories.length];
this.aggregatorOffsets = new int[aggregatorFactories.length];
this.cardinalityWithMissingValue = cardinality + 1;
int offset = 0;
for (int i = 0; i < aggregatorFactories.length; i++) {
aggregators[i] = aggregatorFactories[i].factorizeBuffered(columnSelectorFactory);
aggregatorOffsets[i] = offset;
offset += aggregatorFactories[i].getMaxIntermediateSize();
}
recordSize = offset;
}
@Override
public void init()
{
if (!initialized) {
final ByteBuffer buffer = bufferSupplier.get();
final int usedFlagBufferEnd = getUsedFlagBufferCapacity(cardinalityWithMissingValue);
buffer.position(0);
buffer.limit(usedFlagBufferEnd);
usedFlagBuffer = buffer.slice();
buffer.position(usedFlagBufferEnd);
buffer.limit(buffer.capacity());
valBuffer = buffer.slice();
reset();
initialized = true;
}
}
@Override
public boolean isInitialized()
{
return initialized;
}
@Override
public AggregateResult aggregate(Integer key, int dimIndex)
{
Preconditions.checkArgument(
dimIndex >= 0 && dimIndex < cardinalityWithMissingValue,
"Invalid dimIndex[%s]",
dimIndex
);
Preconditions.checkNotNull(key);
final int recordOffset = dimIndex * recordSize;
if (recordOffset + recordSize > valBuffer.capacity()) {
// This error cannot be recoverd, and the query must fail
throw new ISE(
"A record of size [%d] cannot be written to the array buffer at offset[%d] "
+ "because it exceeds the buffer capacity[%d]. Try increasing druid.processing.buffer.sizeBytes",
recordSize,
recordOffset,
valBuffer.capacity()
);
}
if (!isUsedSlot(dimIndex)) {
initializeSlot(dimIndex);
}
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].aggregate(valBuffer, recordOffset + aggregatorOffsets[i]);
}
return AggregateResult.ok();
}
private void initializeSlot(int dimIndex)
{
final int index = dimIndex / Byte.SIZE;
final int extraIndex = dimIndex % Byte.SIZE;
usedFlagBuffer.put(index, (byte) (usedFlagBuffer.get(index) | (1 << extraIndex)));
final int recordOffset = dimIndex * recordSize;
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].init(valBuffer, recordOffset + aggregatorOffsets[i]);
}
}
private boolean isUsedSlot(int dimIndex)
{
final int index = dimIndex / Byte.SIZE;
final int extraIndex = dimIndex % Byte.SIZE;
final int usedFlagByte = 1 << extraIndex;
return (usedFlagBuffer.get(index) & usedFlagByte) != 0;
}
@Override
public void reset()
{
// Clear the entire usedFlagBuffer
final int usedFlagBufferCapacity = usedFlagBuffer.capacity();
// putLong() instead of put() can boost the performance of clearing the buffer
final int n = (usedFlagBufferCapacity / Long.BYTES) * Long.BYTES;
for (int i = 0; i < n; i += Long.BYTES) {
usedFlagBuffer.putLong(i, 0L);
}
for (int i = n; i < usedFlagBufferCapacity; i++) {
usedFlagBuffer.put(i, (byte) 0);
}
}
@Override
public ToIntFunction<Integer> hashFunction()
{
return key -> key + 1;
}
@Override
public void close()
{
for (BufferAggregator aggregator : aggregators) {
try {
aggregator.close();
}
catch (Exception e) {
LOG.warn(e, "Could not close aggregator [%s], skipping.", aggregator);
}
}
}
@Override
public Iterator<Entry<Integer>> iterator(boolean sorted)
{
if (sorted) {
throw new UnsupportedOperationException("sorted iterator is not supported yet");
}
return new Iterator<Entry<Integer>>()
{
int cur = -1;
boolean findNext = false;
{
cur = findNext();
}
@Override
public boolean hasNext()
{
if (findNext) {
cur = findNext();
findNext = false;
}
return cur >= 0;
}
private int findNext()
{
for (int i = cur + 1; i < cardinalityWithMissingValue; i++) {
if (isUsedSlot(i)) {
return i;
}
}
return -1;
}
@Override
public Entry<Integer> next()
{
if (cur < 0) {
throw new NoSuchElementException();
}
findNext = true;
final Object[] values = new Object[aggregators.length];
final int recordOffset = cur * recordSize;
for (int i = 0; i < aggregators.length; i++) {
values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]);
}
return new Entry<>(cur - 1, values);
}
};
}
}

View File

@ -35,9 +35,9 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
public class BufferGrouper<KeyType> extends AbstractBufferGrouper<KeyType> public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyType>
{ {
private static final Logger log = new Logger(BufferGrouper.class); private static final Logger log = new Logger(BufferHashGrouper.class);
private static final int MIN_INITIAL_BUCKETS = 4; private static final int MIN_INITIAL_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024; private static final int DEFAULT_INITIAL_BUCKETS = 1024;
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f; private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
@ -51,7 +51,7 @@ public class BufferGrouper<KeyType> extends AbstractBufferGrouper<KeyType>
private ByteBuffer offsetListBuffer; private ByteBuffer offsetListBuffer;
private ByteBufferIntList offsetList; private ByteBufferIntList offsetList;
public BufferGrouper( public BufferHashGrouper(
final Supplier<ByteBuffer> bufferSupplier, final Supplier<ByteBuffer> bufferSupplier,
final KeySerde<KeyType> keySerde, final KeySerde<KeyType> keySerde,
final ColumnSelectorFactory columnSelectorFactory, final ColumnSelectorFactory columnSelectorFactory,

View File

@ -238,7 +238,7 @@ public class ByteBufferHashTable
{ {
int offset = bucket * bucketSizeWithHash; int offset = bucket * bucketSizeWithHash;
tableBuffer.position(offset); tableBuffer.position(offset);
tableBuffer.putInt(keyHash | 0x80000000); tableBuffer.putInt(Groupers.getUsedFlag(keyHash));
tableBuffer.put(keyBuffer); tableBuffer.put(keyBuffer);
size++; size++;

View File

@ -41,7 +41,7 @@ public class ByteBufferMinMaxOffsetHeap
private final Comparator maxComparator; private final Comparator maxComparator;
private final ByteBuffer buf; private final ByteBuffer buf;
private final int limit; private final int limit;
private final LimitedBufferGrouper.BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater; private final LimitedBufferHashGrouper.BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater;
private int heapSize; private int heapSize;
@ -49,7 +49,7 @@ public class ByteBufferMinMaxOffsetHeap
ByteBuffer buf, ByteBuffer buf,
int limit, int limit,
Comparator<Integer> minComparator, Comparator<Integer> minComparator,
LimitedBufferGrouper.BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater LimitedBufferHashGrouper.BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater
) )
{ {
this.buf = buf; this.buf = buf;

View File

@ -187,12 +187,6 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
} }
} }
@Override
public AggregateResult aggregate(KeyType key)
{
return aggregate(key, Groupers.hash(key));
}
@Override @Override
public void reset() public void reset()
{ {

View File

@ -19,7 +19,7 @@
package io.druid.query.groupby.epinephelinae; package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Function; import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -30,9 +30,7 @@ import io.druid.data.input.Row;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ColumnSelectorPlus; import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.ColumnSelectorStrategyFactory; import io.druid.query.dimension.ColumnSelectorStrategyFactory;
@ -54,12 +52,13 @@ import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType; import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters; import io.druid.segment.filter.Filters;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -113,6 +112,13 @@ public class GroupByQueryEngineV2
null null
); );
final boolean allSingleValueDims = query
.getDimensions()
.stream()
.allMatch(dimension -> {
final ColumnCapabilities columnCapabilities = storageAdapter.getColumnCapabilities(dimension.getDimension());
return columnCapabilities != null && !columnCapabilities.hasMultipleValues();
});
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take(); final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
@ -124,56 +130,105 @@ public class GroupByQueryEngineV2
? null ? null
: new DateTime(Long.parseLong(fudgeTimestampString)); : new DateTime(Long.parseLong(fudgeTimestampString));
return Sequences.concat( return cursors.flatMap(
Sequences.withBaggage( cursor -> new BaseSequence<>(
Sequences.map( new BaseSequence.IteratorMaker<Row, GroupByEngineIterator<?>>()
cursors,
new Function<Cursor, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(final Cursor cursor)
{
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, GroupByEngineIterator>()
{
@Override
public GroupByEngineIterator make()
{
ColumnSelectorPlus<GroupByColumnSelectorStrategy>[] selectorPlus = DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
query.getDimensions(),
cursor
);
return new GroupByEngineIterator(
query,
config,
cursor,
bufferHolder.get(),
fudgeTimestamp,
createGroupBySelectorPlus(selectorPlus)
);
}
@Override
public void cleanup(GroupByEngineIterator iterFromMake)
{
iterFromMake.close();
}
}
);
}
}
),
new Closeable()
{ {
@Override @Override
public void close() throws IOException public GroupByEngineIterator make()
{ {
CloseQuietly.close(bufferHolder); ColumnSelectorPlus<GroupByColumnSelectorStrategy>[] selectorPlus = DimensionHandlerUtils
.createColumnSelectorPluses(
STRATEGY_FACTORY,
query.getDimensions(),
cursor
);
GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(selectorPlus);
final ByteBuffer buffer = bufferHolder.get();
// Check array-based aggregation is applicable
if (isArrayAggregateApplicable(config, query, dims, storageAdapter, buffer)) {
return new ArrayAggregateIterator(
query,
config,
cursor,
buffer,
fudgeTimestamp,
dims,
allSingleValueDims,
// There must be 0 or 1 dimension if isArrayAggregateApplicable() is true
dims.length == 0 ? 1 : storageAdapter.getDimensionCardinality(dims[0].getName())
);
} else {
return new HashAggregateIterator(
query,
config,
cursor,
buffer,
fudgeTimestamp,
dims,
allSingleValueDims
);
}
}
@Override
public void cleanup(GroupByEngineIterator iterFromMake)
{
iterFromMake.close();
} }
} }
) )
); ).withBaggage(bufferHolder);
}
private static boolean isArrayAggregateApplicable(
GroupByQueryConfig config,
GroupByQuery query,
GroupByColumnSelectorPlus[] dims,
StorageAdapter storageAdapter,
ByteBuffer buffer
)
{
if (config.isForceHashAggregation()) {
return false;
}
final ColumnCapabilities columnCapabilities;
final int cardinality;
// Find cardinality
if (dims.length == 0) {
columnCapabilities = null;
cardinality = 1;
} else if (dims.length == 1) {
columnCapabilities = storageAdapter.getColumnCapabilities(dims[0].getName());
cardinality = storageAdapter.getDimensionCardinality(dims[0].getName());
} else {
columnCapabilities = null;
cardinality = -1; // ArrayAggregateIterator is not available
}
// Choose array-based aggregation if the grouping key is a single string dimension of a
// known cardinality
if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING))
&& cardinality > 0) {
final AggregatorFactory[] aggregatorFactories = query
.getAggregatorSpecs()
.toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]);
final int requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity(
cardinality,
aggregatorFactories
);
// Check that all keys and aggregated values can be contained the buffer
if (requiredBufferCapacity <= buffer.capacity()) {
return true;
}
}
return false;
} }
private static class GroupByStrategyFactory implements ColumnSelectorStrategyFactory<GroupByColumnSelectorStrategy> private static class GroupByStrategyFactory implements ColumnSelectorStrategyFactory<GroupByColumnSelectorStrategy>
@ -204,22 +259,18 @@ public class GroupByQueryEngineV2
} }
} }
private static class GroupByEngineIterator implements Iterator<Row>, Closeable private abstract static class GroupByEngineIterator<KeyType> implements Iterator<Row>, Closeable
{ {
private final GroupByQuery query; protected final GroupByQuery query;
private final GroupByQueryConfig querySpecificConfig; protected final GroupByQueryConfig querySpecificConfig;
private final Cursor cursor; protected final Cursor cursor;
private final ByteBuffer buffer; protected final ByteBuffer buffer;
private final Grouper.KeySerde<ByteBuffer> keySerde; protected final Grouper.KeySerde<ByteBuffer> keySerde;
private final DateTime timestamp; protected final GroupByColumnSelectorPlus[] dims;
private final ByteBuffer keyBuffer; protected final DateTime timestamp;
private final int[] stack;
private final Object[] valuess;
private final GroupByColumnSelectorPlus[] dims;
private int stackp = Integer.MIN_VALUE; protected CloseableGrouperIterator<KeyType, Row> delegate = null;
private boolean currentRowWasPartiallyAggregated = false; protected final boolean allSingleValueDims;
private CloseableGrouperIterator<ByteBuffer, Row> delegate = null;
public GroupByEngineIterator( public GroupByEngineIterator(
final GroupByQuery query, final GroupByQuery query,
@ -227,43 +278,159 @@ public class GroupByQueryEngineV2
final Cursor cursor, final Cursor cursor,
final ByteBuffer buffer, final ByteBuffer buffer,
final DateTime fudgeTimestamp, final DateTime fudgeTimestamp,
final GroupByColumnSelectorPlus[] dims final GroupByColumnSelectorPlus[] dims,
final boolean allSingleValueDims
) )
{ {
final int dimCount = query.getDimensions().size();
this.query = query; this.query = query;
this.querySpecificConfig = config.withOverrides(query); this.querySpecificConfig = config.withOverrides(query);
this.cursor = cursor; this.cursor = cursor;
this.buffer = buffer; this.buffer = buffer;
this.keySerde = new GroupByEngineKeySerde(dims); this.keySerde = new GroupByEngineKeySerde(dims);
this.keyBuffer = ByteBuffer.allocate(keySerde.keySize());
this.dims = dims; this.dims = dims;
this.stack = new int[dimCount];
this.valuess = new Object[dimCount];
// Time is the same for every row in the cursor // Time is the same for every row in the cursor
this.timestamp = fudgeTimestamp != null ? fudgeTimestamp : cursor.getTime(); this.timestamp = fudgeTimestamp != null ? fudgeTimestamp : cursor.getTime();
this.allSingleValueDims = allSingleValueDims;
}
private CloseableGrouperIterator<KeyType, Row> initNewDelegate()
{
final Grouper<KeyType> grouper = newGrouper();
grouper.init();
if (allSingleValueDims) {
aggregateSingleValueDims(grouper);
} else {
aggregateMultiValueDims(grouper);
}
return new CloseableGrouperIterator<>(
grouper,
false,
entry -> {
Map<String, Object> theMap = Maps.newLinkedHashMap();
// Add dimensions.
putToMap(entry.getKey(), theMap);
convertRowTypesToOutputTypes(query.getDimensions(), theMap);
// Add aggregations.
for (int i = 0; i < entry.getValues().length; i++) {
theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
}
return new MapBasedRow(timestamp, theMap);
},
grouper
);
} }
@Override @Override
public Row next() public Row next()
{ {
if (delegate != null && delegate.hasNext()) { if (delegate == null || !delegate.hasNext()) {
return delegate.next();
}
if (cursor.isDone()) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
// Make a new delegate iterator return delegate.next();
}
@Override
public boolean hasNext()
{
if (delegate != null && delegate.hasNext()) {
return true;
} else {
if (!cursor.isDone()) {
if (delegate != null) {
delegate.close();
}
delegate = initNewDelegate();
return true;
} else {
return false;
}
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
if (delegate != null) { if (delegate != null) {
delegate.close(); delegate.close();
delegate = null;
} }
}
final Grouper<ByteBuffer> grouper = new BufferGrouper<>( /**
* Create a new grouper.
*/
protected abstract Grouper<KeyType> newGrouper();
/**
* Grouping dimensions are all single-valued, and thus the given grouper don't have to worry about multi-valued
* dimensions.
*/
protected abstract void aggregateSingleValueDims(Grouper<KeyType> grouper);
/**
* Grouping dimensions can be multi-valued, and thus the given grouper should handle them properly during
* aggregation.
*/
protected abstract void aggregateMultiValueDims(Grouper<KeyType> grouper);
/**
* Add the key to the result map. Some pre-processing like deserialization might be done for the key before
* adding to the map.
*/
protected abstract void putToMap(KeyType key, Map<String, Object> map);
protected int getSingleValue(IndexedInts indexedInts)
{
Preconditions.checkArgument(indexedInts.size() < 2, "should be single value");
return indexedInts.size() == 1 ? indexedInts.get(0) : GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE;
}
}
private static class HashAggregateIterator extends GroupByEngineIterator<ByteBuffer>
{
private final int[] stack;
private final Object[] valuess;
private final ByteBuffer keyBuffer;
private int stackPointer = Integer.MIN_VALUE;
protected boolean currentRowWasPartiallyAggregated = false;
public HashAggregateIterator(
GroupByQuery query,
GroupByQueryConfig config,
Cursor cursor,
ByteBuffer buffer,
DateTime fudgeTimestamp,
GroupByColumnSelectorPlus[] dims,
boolean allSingleValueDims
)
{
super(query, config, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims);
final int dimCount = query.getDimensions().size();
stack = new int[dimCount];
valuess = new Object[dimCount];
keyBuffer = ByteBuffer.allocate(keySerde.keySize());
}
@Override
protected Grouper<ByteBuffer> newGrouper()
{
return new BufferHashGrouper<>(
Suppliers.ofInstance(buffer), Suppliers.ofInstance(buffer),
keySerde, keySerde,
cursor, cursor,
@ -273,13 +440,36 @@ public class GroupByQueryEngineV2
querySpecificConfig.getBufferGrouperMaxLoadFactor(), querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets() querySpecificConfig.getBufferGrouperInitialBuckets()
); );
grouper.init(); }
outer: @Override
protected void aggregateSingleValueDims(Grouper<ByteBuffer> grouper)
{
while (!cursor.isDone()) {
for (int i = 0; i < dims.length; i++) {
final GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy();
strategy.writeToKeyBuffer(
dims[i].getKeyBufferPosition(),
strategy.getOnlyValue(dims[i].getSelector()),
keyBuffer
);
}
keyBuffer.rewind();
if (!grouper.aggregate(keyBuffer).isOk()) {
return;
}
cursor.advance();
}
}
@Override
protected void aggregateMultiValueDims(Grouper<ByteBuffer> grouper)
{
while (!cursor.isDone()) { while (!cursor.isDone()) {
if (!currentRowWasPartiallyAggregated) { if (!currentRowWasPartiallyAggregated) {
// Set up stack, valuess, and first grouping in keyBuffer for this row // Set up stack, valuess, and first grouping in keyBuffer for this row
stackp = stack.length - 1; stackPointer = stack.length - 1;
for (int i = 0; i < dims.length; i++) { for (int i = 0; i < dims.length; i++) {
GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy(); GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy();
@ -300,29 +490,29 @@ outer:
// Aggregate groupings for this row // Aggregate groupings for this row
boolean doAggregate = true; boolean doAggregate = true;
while (stackp >= -1) { while (stackPointer >= -1) {
// Aggregate additional grouping for this row // Aggregate additional grouping for this row
if (doAggregate) { if (doAggregate) {
keyBuffer.rewind(); keyBuffer.rewind();
if (!grouper.aggregate(keyBuffer).isOk()) { if (!grouper.aggregate(keyBuffer).isOk()) {
// Buffer full while aggregating; break out and resume later // Buffer full while aggregating; break out and resume later
currentRowWasPartiallyAggregated = true; currentRowWasPartiallyAggregated = true;
break outer; return;
} }
doAggregate = false; doAggregate = false;
} }
if (stackp >= 0) { if (stackPointer >= 0) {
doAggregate = dims[stackp].getColumnSelectorStrategy().checkRowIndexAndAddValueToGroupingKey( doAggregate = dims[stackPointer].getColumnSelectorStrategy().checkRowIndexAndAddValueToGroupingKey(
dims[stackp].getKeyBufferPosition(), dims[stackPointer].getKeyBufferPosition(),
valuess[stackp], valuess[stackPointer],
stack[stackp], stack[stackPointer],
keyBuffer keyBuffer
); );
if (doAggregate) { if (doAggregate) {
stack[stackp]++; stack[stackPointer]++;
for (int i = stackp + 1; i < stack.length; i++) { for (int i = stackPointer + 1; i < stack.length; i++) {
dims[i].getColumnSelectorStrategy().initGroupingKeyColumnValue( dims[i].getColumnSelectorStrategy().initGroupingKeyColumnValue(
dims[i].getKeyBufferPosition(), dims[i].getKeyBufferPosition(),
i, i,
@ -331,12 +521,12 @@ outer:
stack stack
); );
} }
stackp = stack.length - 1; stackPointer = stack.length - 1;
} else { } else {
stackp--; stackPointer--;
} }
} else { } else {
stackp--; stackPointer--;
} }
} }
@ -344,66 +534,131 @@ outer:
cursor.advance(); cursor.advance();
currentRowWasPartiallyAggregated = false; currentRowWasPartiallyAggregated = false;
} }
}
delegate = new CloseableGrouperIterator<>( @Override
grouper, protected void putToMap(ByteBuffer key, Map<String, Object> map)
false, {
new Function<Grouper.Entry<ByteBuffer>, Row>() for (GroupByColumnSelectorPlus selectorPlus : dims) {
{ selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey(
@Override selectorPlus,
public Row apply(final Grouper.Entry<ByteBuffer> entry) key,
{ map
Map<String, Object> theMap = Maps.newLinkedHashMap(); );
}
}
}
// Add dimensions. private static class ArrayAggregateIterator extends GroupByEngineIterator<Integer>
for (GroupByColumnSelectorPlus selectorPlus : dims) { {
selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey( private final int cardinality;
selectorPlus,
entry.getKey(),
theMap
);
}
convertRowTypesToOutputTypes(query.getDimensions(), theMap); @Nullable
private final GroupByColumnSelectorPlus dim;
// Add aggregations. private IndexedInts multiValues;
for (int i = 0; i < entry.getValues().length; i++) { private int nextValIndex;
theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]);
}
return new MapBasedRow(timestamp, theMap); public ArrayAggregateIterator(
} GroupByQuery query,
}, GroupByQueryConfig config,
new Closeable() Cursor cursor,
{ ByteBuffer buffer,
@Override DateTime fudgeTimestamp,
public void close() throws IOException GroupByColumnSelectorPlus[] dims,
{ boolean allSingleValueDims,
grouper.close(); int cardinality
)
{
super(query, config, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims);
this.cardinality = cardinality;
if (dims.length == 1) {
this.dim = dims[0];
} else if (dims.length == 0) {
this.dim = null;
} else {
throw new IAE("Group key should be a single dimension");
}
}
@Override
protected Grouper<Integer> newGrouper()
{
return new BufferArrayGrouper(
Suppliers.ofInstance(buffer),
cursor,
query.getAggregatorSpecs()
.toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]),
cardinality
);
}
@Override
protected void aggregateSingleValueDims(Grouper<Integer> grouper)
{
while (!cursor.isDone()) {
final int key;
if (dim != null) {
// dim is always an indexed string dimension
final IndexedInts indexedInts = ((DimensionSelector) dim.getSelector()).getRow();
key = getSingleValue(indexedInts);
} else {
key = 0;
}
if (!grouper.aggregate(key).isOk()) {
return;
}
cursor.advance();
}
}
@Override
protected void aggregateMultiValueDims(Grouper<Integer> grouper)
{
if (dim == null) {
throw new ISE("dim must exist");
}
if (multiValues == null) {
// dim is always an indexed string dimension
multiValues = ((DimensionSelector) dim.getSelector()).getRow();
nextValIndex = 0;
}
while (!cursor.isDone()) {
if (multiValues.size() == 0) {
if (!grouper.aggregate(GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE).isOk()) {
return;
}
} else {
for (; nextValIndex < multiValues.size(); nextValIndex++) {
if (!grouper.aggregate(multiValues.get(nextValIndex)).isOk()) {
return;
} }
} }
); }
return delegate.next(); cursor.advance();
if (!cursor.isDone()) {
// dim is always an indexed string dimension
multiValues = ((DimensionSelector) dim.getSelector()).getRow();
nextValIndex = multiValues.size() == 0 ? -1 : 0;
}
}
} }
@Override @Override
public boolean hasNext() protected void putToMap(Integer key, Map<String, Object> map)
{ {
return (delegate != null && delegate.hasNext()) || !cursor.isDone(); if (dim != null) {
} if (key != -1) {
map.put(
@Override dim.getOutputName(),
public void remove() ((DimensionSelector) dim.getSelector()).lookupName(key)
{ );
throw new UnsupportedOperationException(); } else {
} map.put(dim.getOutputName(), "");
}
@Override
public void close()
{
if (delegate != null) {
delegate.close();
} }
} }
} }

View File

@ -21,6 +21,7 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import java.io.Closeable; import java.io.Closeable;
@ -28,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.function.ToIntFunction;
/** /**
* Groupers aggregate metrics from rows that they typically get from a ColumnSelectorFactory, under * Groupers aggregate metrics from rows that they typically get from a ColumnSelectorFactory, under
@ -59,7 +61,7 @@ public interface Grouper<KeyType> extends Closeable
* some are not. * some are not.
* *
* @param key key object * @param key key object
* @param keyHash result of {@link Groupers#hash(Object)} on the key * @param keyHash result of {@link #hashFunction()} on the key
* *
* @return result that is ok if the row was aggregated, not ok if a resource limit was hit * @return result that is ok if the row was aggregated, not ok if a resource limit was hit
*/ */
@ -73,13 +75,22 @@ public interface Grouper<KeyType> extends Closeable
* *
* @return result that is ok if the row was aggregated, not ok if a resource limit was hit * @return result that is ok if the row was aggregated, not ok if a resource limit was hit
*/ */
AggregateResult aggregate(KeyType key); default AggregateResult aggregate(KeyType key)
{
Preconditions.checkNotNull(key, "key");
return aggregate(key, hashFunction().applyAsInt(key));
}
/** /**
* Reset the grouper to its initial state. * Reset the grouper to its initial state.
*/ */
void reset(); void reset();
default ToIntFunction<KeyType> hashFunction()
{
return Groupers::hash;
}
/** /**
* Close the grouper and release associated resources. * Close the grouper and release associated resources.
*/ */
@ -87,7 +98,7 @@ public interface Grouper<KeyType> extends Closeable
void close(); void close();
/** /**
* Iterate through entries. If a comparator is provided, do a sorted iteration. * Iterate through entries.
* <p> * <p>
* Once this method is called, writes are no longer safe. After you are done with the iterator returned by this * Once this method is called, writes are no longer safe. After you are done with the iterator returned by this
* method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you * method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you
@ -96,6 +107,9 @@ public interface Grouper<KeyType> extends Closeable
* If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on * If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on
* deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you * deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you
* if these comparators are not equivalent. * if these comparators are not equivalent.
* <p>
* Callers must process and discard the returned {@link Entry}s immediately because some implementations can reuse the
* key objects.
* *
* @param sorted return sorted results * @param sorted return sorted results
* *

View File

@ -31,6 +31,17 @@ public class Groupers
// No instantiation // No instantiation
} }
static final AggregateResult DICTIONARY_FULL = AggregateResult.failure(
"Not enough dictionary space to execute this query. Try increasing "
+ "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
static final AggregateResult HASH_TABLE_FULL = AggregateResult.failure(
"Not enough aggregation buffer space to execute this query. Try increasing "
+ "druid.processing.buffer.sizeBytes or enable disk spilling by setting "
+ "druid.query.groupBy.maxOnDiskStorage to a positive number."
);
private static final int C1 = 0xcc9e2d51; private static final int C1 = 0xcc9e2d51;
private static final int C2 = 0x1b873593; private static final int C2 = 0x1b873593;
@ -56,6 +67,11 @@ public class Groupers
} }
static int getUsedFlag(int keyHash)
{
return keyHash | 0x80000000;
}
public static <KeyType> Iterator<Grouper.Entry<KeyType>> mergeIterators( public static <KeyType> Iterator<Grouper.Entry<KeyType>> mergeIterators(
final Iterable<Iterator<Grouper.Entry<KeyType>>> iterators, final Iterable<Iterator<Grouper.Entry<KeyType>>> iterators,
final Comparator<Grouper.Entry<KeyType>> keyTypeComparator final Comparator<Grouper.Entry<KeyType>> keyTypeComparator

View File

@ -35,7 +35,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
public class LimitedBufferGrouper<KeyType> extends AbstractBufferGrouper<KeyType> public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyType>
{ {
private static final int MIN_INITIAL_BUCKETS = 4; private static final int MIN_INITIAL_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024; private static final int DEFAULT_INITIAL_BUCKETS = 1024;
@ -51,7 +51,7 @@ public class LimitedBufferGrouper<KeyType> extends AbstractBufferGrouper<KeyType
// Additionally, results must be resorted by grouping key to allow results to merge correctly. // Additionally, results must be resorted by grouping key to allow results to merge correctly.
private boolean sortHasNonGroupingFields; private boolean sortHasNonGroupingFields;
// Min-max heap, used for storing offsets when applying limits/sorting in the BufferGrouper // Min-max heap, used for storing offsets when applying limits/sorting in the BufferHashGrouper
private ByteBufferMinMaxOffsetHeap offsetHeap; private ByteBufferMinMaxOffsetHeap offsetHeap;
// ByteBuffer slices used by the grouper // ByteBuffer slices used by the grouper
@ -64,7 +64,7 @@ public class LimitedBufferGrouper<KeyType> extends AbstractBufferGrouper<KeyType
private BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater; private BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater;
private boolean initialized = false; private boolean initialized = false;
public LimitedBufferGrouper( public LimitedBufferHashGrouper(
final Supplier<ByteBuffer> bufferSupplier, final Supplier<ByteBuffer> bufferSupplier,
final Grouper.KeySerde<KeyType> keySerde, final Grouper.KeySerde<KeyType> keySerde,
final ColumnSelectorFactory columnSelectorFactory, final ColumnSelectorFactory columnSelectorFactory,

View File

@ -46,7 +46,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
/** /**
* Grouper based around a single underlying {@link BufferGrouper}. Not thread-safe. * Grouper based around a single underlying {@link BufferHashGrouper}. Not thread-safe.
* *
* When the underlying grouper is full, its contents are sorted and written to temporary files using "spillMapper". * When the underlying grouper is full, its contents are sorted and written to temporary files using "spillMapper".
*/ */
@ -88,7 +88,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
this.keyObjComparator = keySerdeFactory.objectComparator(false); this.keyObjComparator = keySerdeFactory.objectComparator(false);
this.defaultOrderKeyObjComparator = keySerdeFactory.objectComparator(true); this.defaultOrderKeyObjComparator = keySerdeFactory.objectComparator(true);
if (limitSpec != null) { if (limitSpec != null) {
this.grouper = new LimitedBufferGrouper<>( this.grouper = new LimitedBufferHashGrouper<>(
bufferSupplier, bufferSupplier,
keySerde, keySerde,
columnSelectorFactory, columnSelectorFactory,
@ -100,7 +100,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
sortHasNonGroupingFields sortHasNonGroupingFields
); );
} else { } else {
this.grouper = new BufferGrouper<>( this.grouper = new BufferHashGrouper<>(
bufferSupplier, bufferSupplier,
keySerde, keySerde,
columnSelectorFactory, columnSelectorFactory,
@ -153,12 +153,6 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
} }
} }
@Override
public AggregateResult aggregate(KeyType key)
{
return aggregate(key, Groupers.hash(key));
}
@Override @Override
public void reset() public void reset()
{ {

View File

@ -19,6 +19,7 @@
package io.druid.query.groupby.epinephelinae.column; package io.druid.query.groupby.epinephelinae.column;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.druid.segment.ColumnValueSelector; import io.druid.segment.ColumnValueSelector;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
@ -83,4 +84,27 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
} }
valuess[columnIndex] = ArrayBasedIndexedInts.of(newIds); valuess[columnIndex] = ArrayBasedIndexedInts.of(newIds);
} }
@Override
public Object getOnlyValue(ColumnValueSelector selector)
{
final DimensionSelector dimSelector = (DimensionSelector) selector;
final IndexedInts row = dimSelector.getRow();
Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions");
if (row.size() == 0) {
return GROUP_BY_MISSING_VALUE;
}
final String value = dimSelector.lookupName(row.get(0));
final int dictId = reverseDictionary.getInt(value);
if (dictId < 0) {
dictionary.add(value);
reverseDictionary.put(value, nextId);
return nextId++;
} else {
return dictId;
}
}
} }

View File

@ -67,4 +67,16 @@ public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelecto
// this method handles row values after the first in a multivalued row, so just return false // this method handles row values after the first in a multivalued row, so just return false
return false; return false;
} }
@Override
public Object getOnlyValue(ColumnValueSelector selector)
{
return ((DoubleColumnSelector) selector).get();
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer)
{
keyBuffer.putDouble(keyBufferPosition, (Double) obj);
}
} }

View File

@ -50,6 +50,18 @@ public class FloatGroupByColumnSelectorStrategy implements GroupByColumnSelector
valuess[columnIndex] = ((FloatColumnSelector) selector).get(); valuess[columnIndex] = ((FloatColumnSelector) selector).get();
} }
@Override
public Object getOnlyValue(ColumnValueSelector selector)
{
return ((FloatColumnSelector) selector).get();
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer)
{
keyBuffer.putFloat(keyBufferPosition, (Float) obj);
}
@Override @Override
public void initGroupingKeyColumnValue( public void initGroupingKeyColumnValue(
int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack

View File

@ -33,6 +33,8 @@ import java.util.Map;
*/ */
public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
{ {
int GROUP_BY_MISSING_VALUE = -1;
/** /**
* Return the size, in bytes, of this dimension's values in the grouping key. * Return the size, in bytes, of this dimension's values in the grouping key.
* *
@ -65,7 +67,7 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
); );
/** /**
* Retrieve a row object from the ColumnSelectorPlus and put it in valuess at columnIndex. * Retrieve a row object from the {@link ColumnValueSelector} and put it in valuess at columnIndex.
* *
* @param selector Value selector for a column. * @param selector Value selector for a column.
* @param columnIndex Index of the column within the row values array * @param columnIndex Index of the column within the row values array
@ -101,4 +103,22 @@ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
* @return true if rowValIdx < size of rowObj, false otherwise * @return true if rowValIdx < size of rowObj, false otherwise
*/ */
boolean checkRowIndexAndAddValueToGroupingKey(int keyBufferPosition, Object rowObj, int rowValIdx, ByteBuffer keyBuffer); boolean checkRowIndexAndAddValueToGroupingKey(int keyBufferPosition, Object rowObj, int rowValIdx, ByteBuffer keyBuffer);
/**
* Retrieve a single object using the {@link ColumnValueSelector}. The reading column must have a single value.
*
* @param selector Value selector for a column
*
* @return an object retrieved from the column
*/
Object getOnlyValue(ColumnValueSelector selector);
/**
* Write a given object to the keyBuffer at keyBufferPosition.
*
* @param keyBufferPosition starting offset for this column's value within the grouping key
* @param obj row value object retrieved from {@link #getOnlyValue(ColumnValueSelector)}
* @param keyBuffer grouping key
*/
void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer);
} }

View File

@ -50,6 +50,18 @@ public class LongGroupByColumnSelectorStrategy implements GroupByColumnSelectorS
valuess[columnIndex] = ((LongColumnSelector) selector).get(); valuess[columnIndex] = ((LongColumnSelector) selector).get();
} }
@Override
public Object getOnlyValue(ColumnValueSelector selector)
{
return ((LongColumnSelector) selector).get();
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer)
{
keyBuffer.putLong(keyBufferPosition, (Long) obj);
}
@Override @Override
public void initGroupingKeyColumnValue( public void initGroupingKeyColumnValue(
int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack

View File

@ -19,6 +19,7 @@
package io.druid.query.groupby.epinephelinae.column; package io.druid.query.groupby.epinephelinae.column;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import io.druid.segment.ColumnValueSelector; import io.druid.segment.ColumnValueSelector;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
@ -29,8 +30,6 @@ import java.util.Map;
public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
{ {
private static final int GROUP_BY_MISSING_VALUE = -1;
@Override @Override
public int getGroupingKeySize() public int getGroupingKeySize()
{ {
@ -61,6 +60,21 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelecto
valuess[columnIndex] = row; valuess[columnIndex] = row;
} }
@Override
public Object getOnlyValue(ColumnValueSelector selector)
{
final DimensionSelector dimSelector = (DimensionSelector) selector;
final IndexedInts row = dimSelector.getRow();
Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions");
return row.size() == 1 ? row.get(0) : GROUP_BY_MISSING_VALUE;
}
@Override
public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer)
{
keyBuffer.putInt(keyBufferPosition, (int) obj);
}
@Override @Override
public void initGroupingKeyColumnValue(int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack) public void initGroupingKeyColumnValue(int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack)
{ {

View File

@ -287,7 +287,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
} }
); );
// Don't apply limit here for inner results, that will be pushed down to the BufferGrouper // Don't apply limit here for inner results, that will be pushed down to the BufferHashGrouper
if (query.getContextBoolean(CTX_KEY_OUTERMOST, true)) { if (query.getContextBoolean(CTX_KEY_OUTERMOST, true)) {
return query.postProcess(rowSequence); return query.postProcess(rowSequence);
} else { } else {

View File

@ -1401,7 +1401,7 @@ public class GroupByQueryRunnerTest
List<Row> expectedResults = null; List<Row> expectedResults = null;
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
expectedException.expect(ResourceLimitExceededException.class); expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Not enough aggregation table space to execute this query"); expectedException.expectMessage("Not enough aggregation buffer space to execute this query");
} else { } else {
expectedResults = Arrays.asList( expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
@ -1579,7 +1579,7 @@ public class GroupByQueryRunnerTest
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else { } else {
expectedException.expect(ResourceLimitExceededException.class); expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Not enough aggregation table space to execute this query"); expectedException.expectMessage("Not enough aggregation buffer space to execute this query");
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} }
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import io.druid.data.input.MapBasedRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.groupby.epinephelinae.Grouper.Entry;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
public class BufferArrayGrouperTest
{
@Test
public void testAggregate()
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<Integer> grouper = newGrouper(columnSelectorFactory, 1024);
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L)));
grouper.aggregate(12);
grouper.aggregate(6);
grouper.aggregate(10);
grouper.aggregate(6);
grouper.aggregate(12);
grouper.aggregate(6);
final List<Entry<Integer>> expected = ImmutableList.of(
new Grouper.Entry<>(6, new Object[]{30L, 3L}),
new Grouper.Entry<>(10, new Object[]{10L, 1L}),
new Grouper.Entry<>(12, new Object[]{20L, 2L})
);
final List<Entry<Integer>> unsortedEntries = Lists.newArrayList(grouper.iterator(false));
Assert.assertEquals(
expected,
Ordering.from((Comparator<Entry<Integer>>) (o1, o2) -> Ints.compare(o1.getKey(), o2.getKey()))
.sortedCopy(unsortedEntries)
);
}
private BufferArrayGrouper newGrouper(
TestColumnSelectorFactory columnSelectorFactory,
int bufferSize
)
{
final ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
final BufferArrayGrouper grouper = new BufferArrayGrouper(
Suppliers.ofInstance(buffer),
columnSelectorFactory,
new AggregatorFactory[]{
new LongSumAggregatorFactory("valueSum", "value"),
new CountAggregatorFactory("count")
},
1000
);
grouper.init();
return grouper;
}
}

View File

@ -43,7 +43,7 @@ import java.nio.channels.FileChannel;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
public class BufferGrouperTest public class BufferHashGrouperTest
{ {
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -52,7 +52,7 @@ public class BufferGrouperTest
public void testSimple() public void testSimple()
{ {
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<Integer> grouper = new BufferGrouper<>( final Grouper<Integer> grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(1000)), Suppliers.ofInstance(ByteBuffer.allocate(1000)),
GrouperTestUtil.intKeySerde(), GrouperTestUtil.intKeySerde(),
columnSelectorFactory, columnSelectorFactory,
@ -182,7 +182,7 @@ public class BufferGrouperTest
Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true))); Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true)));
} }
private BufferGrouper<Integer> makeGrouper( private BufferHashGrouper<Integer> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory, TestColumnSelectorFactory columnSelectorFactory,
int bufferSize, int bufferSize,
int initialBuckets int initialBuckets
@ -197,7 +197,7 @@ public class BufferGrouperTest
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
final BufferGrouper<Integer> grouper = new BufferGrouper<>( final BufferHashGrouper<Integer> grouper = new BufferHashGrouper<>(
Suppliers.ofInstance(buffer), Suppliers.ofInstance(buffer),
GrouperTestUtil.intKeySerde(), GrouperTestUtil.intKeySerde(),
columnSelectorFactory, columnSelectorFactory,

View File

@ -35,7 +35,7 @@ import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
public class LimitedBufferGrouperTest public class LimitedBufferHashGrouperTest
{ {
@Rule @Rule
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();
@ -46,7 +46,7 @@ public class LimitedBufferGrouperTest
final int limit = 100; final int limit = 100;
final int keyBase = 100000; final int keyBase = 100000;
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 20000, 2, limit); final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 20000, 2, limit);
final int numRows = 1000; final int numRows = 1000;
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L)));
@ -100,7 +100,7 @@ public class LimitedBufferGrouperTest
{ {
expectedException.expect(IAE.class); expectedException.expect(IAE.class);
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 10, 2, 100); final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 10, 2, 100);
} }
@Test @Test
@ -109,7 +109,7 @@ public class LimitedBufferGrouperTest
final int limit = 100; final int limit = 100;
final int keyBase = 100000; final int keyBase = 100000;
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final LimitedBufferGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 11716, 2, limit); final LimitedBufferHashGrouper<Integer> grouper = makeGrouper(columnSelectorFactory, 11716, 2, limit);
final int numRows = 1000; final int numRows = 1000;
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L))); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L)));
@ -146,14 +146,14 @@ public class LimitedBufferGrouperTest
Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true))); Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true)));
} }
private static LimitedBufferGrouper<Integer> makeGrouper( private static LimitedBufferHashGrouper<Integer> makeGrouper(
TestColumnSelectorFactory columnSelectorFactory, TestColumnSelectorFactory columnSelectorFactory,
int bufferSize, int bufferSize,
int initialBuckets, int initialBuckets,
int limit int limit
) )
{ {
LimitedBufferGrouper<Integer> grouper = new LimitedBufferGrouper<>( LimitedBufferHashGrouper<Integer> grouper = new LimitedBufferHashGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)), Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)),
GrouperTestUtil.intKeySerde(), GrouperTestUtil.intKeySerde(),
columnSelectorFactory, columnSelectorFactory,