Fine grained buffer management for groupby (#3863)

* Fine-grained buffer management for group by queries

* Remove maxQueryCount from GroupByRules

* Fix code style

* Merge master

* Fix compilation failure

* Address comments

* Address comments

- Revert Sequence
- Add isInitialized() to Grouper
- Initialize the grouper in RowBasedGrouperHelper.Accumulator
- Simple refactoring RowBasedGrouperHelper.Accumulator
- Add tests for checking the number of used merge buffers
- Improve docs

* Revert unnecessary changes

* change to visible to testing

* fix misspelling
This commit is contained in:
Jihoon Son 2017-02-15 05:55:54 +09:00 committed by Gian Merlino
parent 78b0d134ae
commit a459db68b6
19 changed files with 1186 additions and 189 deletions

View File

@ -19,6 +19,7 @@
package io.druid.collections;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@ -78,4 +79,10 @@ public class BlockingPool<T>
}
);
}
@VisibleForTesting
protected int getQueueSize()
{
return objects.size();
}
}

View File

@ -157,10 +157,10 @@ inner query's results stream with off-heap fact map and on-heap string dictionar
strategy perform the outer query on the broker in a single-threaded fashion.
Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
safe and do not suffer from this issue.
This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply
nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the
merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend
that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy.
#### Server configuration
@ -185,7 +185,8 @@ When using the "v2" strategy, the following runtime properties apply:
Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that
does so. By default, Druid is configured without any merging buffer pool, so to use the "v2" strategy you must also
set `druid.processing.numMergeBuffers` to some non-zero number.
set `druid.processing.numMergeBuffers` to some non-zero number. Furthermore, if you want to execute deeply nested groupBys,
you must set `druid.processing.numMergeBuffers` to at least 2.
This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can

View File

@ -117,11 +117,10 @@ SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source)
```
Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
safe and do not suffer from this issue. If you like, you can forbid deeper nesting by setting
`druid.sql.planner.maxQueryCount = 2`.
This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply
nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the
merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend
that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy.
#### Semi-joins

View File

@ -23,17 +23,38 @@ package io.druid.java.util.common.guava;
* A Sequence represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose
* a way for you to extract values from it, instead you provide it with a worker (an Accumulator) and that defines
* what happens with the data.
*
* <p>
* This inversion of control is in place to allow the Sequence to do resource management. It can enforce that close()
* methods get called and other resources get cleaned up whenever processing is complete. Without this inversion
* it is very easy to unintentionally leak resources when iterating over something that is backed by a resource.
*
* <p>
* Sequences also expose {#see com.metamx.common.guava.Yielder} Yielder objects which allow you to implement a
* continuation over the Sequence. Yielder do not offer the same guarantees of automagic resource management
* continuation over the Sequence. Yielder do not offer the same guarantees of automatic resource management
* as the accumulate method, but they are Closeable and will do the proper cleanup when close() is called on them.
*/
public interface Sequence<T>
{
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);
/**
* Accumulate this sequence using the given accumulator.
*
* @param initValue the initial value to pass along to start the accumulation.
* @param accumulator the accumulator which is responsible for accumulating input values.
* @param <OutType> the type of accumulated value.
*
* @return accumulated value.
*/
<OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator);
/**
* Return an Yielder for accumulated sequence.
*
* @param initValue the initial value to pass along to start the accumulation.
* @param accumulator the accumulator which is responsible for accumulating input values.
* @param <OutType> the type of accumulated value.
*
* @return an Yielder for accumulated sequence.
*
* @see Yielder
*/
<OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);
}

View File

@ -19,6 +19,7 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
@ -64,17 +65,19 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
private static final int HASH_SIZE = Ints.BYTES;
private final ByteBuffer buffer;
private final Supplier<ByteBuffer> bufferSupplier;
private final KeySerde<KeyType> keySerde;
private final int keySize;
private final BufferAggregator[] aggregators;
private final int[] aggregatorOffsets;
private final int initialBuckets;
private final int bucketSize;
private final int tableArenaSize;
private final int bufferGrouperMaxSize; // Integer.MAX_VALUE in production, only used for unit tests
private final float maxLoadFactor;
private ByteBuffer buffer;
private int tableArenaSize = -1;
// Buffer pointing to the current table (it moves around as the table grows)
private ByteBuffer tableBuffer;
@ -90,8 +93,10 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
// Maximum number of elements in the table before it must be resized
private int maxSize;
private boolean initialized = false;
public BufferGrouper(
final ByteBuffer buffer,
final Supplier<ByteBuffer> bufferSupplier,
final KeySerde<KeyType> keySerde,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
@ -100,7 +105,7 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
final int initialBuckets
)
{
this.buffer = buffer;
this.bufferSupplier = bufferSupplier;
this.keySerde = keySerde;
this.keySize = keySerde.keySize();
this.aggregators = new BufferAggregator[aggregatorFactories.length];
@ -121,9 +126,23 @@ public class BufferGrouper<KeyType> implements Grouper<KeyType>
}
this.bucketSize = offset;
this.tableArenaSize = (buffer.capacity() / (bucketSize + Ints.BYTES)) * bucketSize;
}
@Override
public void init()
{
if (!initialized) {
this.buffer = bufferSupplier.get();
this.tableArenaSize = (buffer.capacity() / (bucketSize + Ints.BYTES)) * bucketSize;
reset();
initialized = true;
}
}
@Override
public boolean isInitialized()
{
return initialized;
}
@Override

View File

@ -21,6 +21,8 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import io.druid.java.util.common.ISE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
@ -34,7 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Grouper based around a set of underlying {@link SpillingGrouper} instances. Thread-safe.
*
* <p>
* The passed-in buffer is cut up into concurrencyHint slices, and each slice is passed to a different underlying
* grouper. Access to each slice is separately synchronized. As long as the result set fits in memory, keys are
* partitioned between buffers based on their hash, and multiple threads can write into the same buffer. When
@ -50,8 +52,21 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
private volatile boolean closed = false;
private final Comparator<KeyType> keyObjComparator;
private final Supplier<ByteBuffer> bufferSupplier;
private final ColumnSelectorFactory columnSelectorFactory;
private final AggregatorFactory[] aggregatorFactories;
private final int bufferGrouperMaxSize;
private final float bufferGrouperMaxLoadFactor;
private final int bufferGrouperInitialBuckets;
private final LimitedTemporaryStorage temporaryStorage;
private final ObjectMapper spillMapper;
private final int concurrencyHint;
private final KeySerdeFactory<KeyType> keySerdeFactory;
private volatile boolean initialized = false;
public ConcurrentGrouper(
final ByteBuffer buffer,
final Supplier<ByteBuffer> bufferSupplier,
final KeySerdeFactory<KeyType> keySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
@ -75,15 +90,34 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
}
};
this.bufferSupplier = bufferSupplier;
this.columnSelectorFactory = columnSelectorFactory;
this.aggregatorFactories = aggregatorFactories;
this.bufferGrouperMaxSize = bufferGrouperMaxSize;
this.bufferGrouperMaxLoadFactor = bufferGrouperMaxLoadFactor;
this.bufferGrouperInitialBuckets = bufferGrouperInitialBuckets;
this.temporaryStorage = temporaryStorage;
this.spillMapper = spillMapper;
this.concurrencyHint = concurrencyHint;
this.keySerdeFactory = keySerdeFactory;
this.keyObjComparator = keySerdeFactory.objectComparator();
}
@Override
public void init()
{
if (!initialized) {
synchronized (bufferSupplier) {
if (!initialized) {
final ByteBuffer buffer = bufferSupplier.get();
final int sliceSize = (buffer.capacity() / concurrencyHint);
for (int i = 0; i < concurrencyHint; i++) {
final ByteBuffer slice = buffer.duplicate();
slice.position(sliceSize * i);
slice.limit(slice.position() + sliceSize);
groupers.add(
new SpillingGrouper<>(
slice.slice(),
final SpillingGrouper<KeyType> grouper = new SpillingGrouper<>(
Suppliers.ofInstance(slice.slice()),
keySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
@ -93,16 +127,30 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
temporaryStorage,
spillMapper,
false
)
);
grouper.init();
groupers.add(grouper);
}
this.keyObjComparator = keySerdeFactory.objectComparator();
initialized = true;
}
}
}
}
@Override
public boolean isInitialized()
{
return initialized;
}
@Override
public boolean aggregate(KeyType key, int keyHash)
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
}
if (closed) {
throw new ISE("Grouper is closed");
}
@ -139,6 +187,10 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
@Override
public void reset()
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
}
if (closed) {
throw new ISE("Grouper is closed");
}
@ -153,6 +205,10 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
@Override
public Iterator<Entry<KeyType>> iterator(final boolean sorted)
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
}
if (closed) {
throw new ISE("Grouper is closed");
}

View File

@ -22,6 +22,7 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -69,7 +70,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class GroupByMergingQueryRunnerV2 implements QueryRunner
public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
{
private static final Logger log = new Logger(GroupByMergingQueryRunnerV2.class);
private static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";
@ -181,7 +182,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
false,
null,
config,
mergeBufferHolder.get(),
Suppliers.ofInstance(mergeBufferHolder.get()),
concurrencyHint,
temporaryStorage,
spillMapper,
@ -189,6 +190,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
);
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
grouper.init();
final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper);

View File

@ -21,6 +21,7 @@ package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
@ -259,7 +260,7 @@ public class GroupByQueryEngineV2
}
final Grouper<ByteBuffer> grouper = new BufferGrouper<>(
buffer,
Suppliers.ofInstance(buffer),
keySerde,
cursor,
query.getAggregatorSpecs()
@ -268,6 +269,7 @@ public class GroupByQueryEngineV2
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets()
);
grouper.init();
outer:
while (!cursor.isDone()) {

View File

@ -21,6 +21,7 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.druid.collections.BlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
@ -114,6 +115,7 @@ public class GroupByRowProcessor
for (Interval queryInterval : queryIntervals) {
if (queryInterval.contains(rowTime)) {
inInterval = true;
break;
}
}
if (!inInterval) {
@ -141,24 +143,33 @@ public class GroupByRowProcessor
closeOnFailure.add(temporaryStorage);
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
// This will potentially block if there are no merge buffers left in the pool.
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new QueryInterruptedException(new TimeoutException());
}
closeOnFailure.add(mergeBufferHolder);
}
catch (InterruptedException e) {
throw new QueryInterruptedException(e);
}
final SettableSupplier<ReferenceCountingResourceHolder<ByteBuffer>> bufferHolderSupplier = new SettableSupplier<>();
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
true,
rowSignature,
querySpecificConfig,
mergeBufferHolder.get(),
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new QueryInterruptedException(new TimeoutException());
}
bufferHolderSupplier.set(mergeBufferHolder);
closeOnFailure.add(mergeBufferHolder);
return mergeBufferHolder.get();
}
catch (InterruptedException e) {
throw new QueryInterruptedException(e);
}
}
},
-1,
temporaryStorage,
spillMapper,
@ -168,7 +179,10 @@ public class GroupByRowProcessor
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
closeOnFailure.add(grouper);
final Grouper<RowBasedKey> retVal = filteredSequence.accumulate(grouper, accumulator);
final Grouper<RowBasedKey> retVal = filteredSequence.accumulate(
grouper,
accumulator
);
if (retVal != grouper) {
throw new ResourceLimitExceededException("Grouping resources exhausted");
}
@ -182,7 +196,7 @@ public class GroupByRowProcessor
public void close() throws IOException
{
grouper.close();
mergeBufferHolder.close();
CloseQuietly.close(bufferHolderSupplier.get());
CloseQuietly.close(temporaryStorage);
}
}

View File

@ -32,7 +32,7 @@ import java.util.Iterator;
* Groupers aggregate metrics from rows that they typically get from a ColumnSelectorFactory, under
* grouping keys that some outside driver is passing in. They can also iterate over the grouped
* rows after the aggregation is done.
*
* <p>
* They work sort of like a map of KeyType to aggregated values, except they don't support
* random lookups.
*
@ -40,6 +40,19 @@ import java.util.Iterator;
*/
public interface Grouper<KeyType> extends Closeable
{
/**
* Initialize the grouper.
* This method needs to be called before calling {@link #aggregate(Object)} and {@link #aggregate(Object, int)}.
*/
void init();
/**
* Check this grouper is initialized or not.
*
* @return true if the grouper is already initialized, otherwise false.
*/
boolean isInitialized();
/**
* Aggregate the current row with the provided key. Some implementations are thread-safe and
* some are not.
@ -74,11 +87,11 @@ public interface Grouper<KeyType> extends Closeable
/**
* Iterate through entries. If a comparator is provided, do a sorted iteration.
*
* <p>
* Once this method is called, writes are no longer safe. After you are done with the iterator returned by this
* method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you
* want to reuse it), or {@link #iterator(boolean)} again if you want another iterator.
*
* <p>
* If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on
* deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you
* if these comparators are not equivalent.
@ -188,7 +201,7 @@ public interface Grouper<KeyType> extends Closeable
/**
* Serialize a key. This will be called by the {@link #aggregate(Comparable)} method. The buffer will not
* be retained after the aggregate method returns, so reusing buffers is OK.
*
* <p>
* This method may return null, which indicates that some internal resource limit has been reached and
* no more keys can be generated. In this situation you can call {@link #reset()} and try again, although
* beware the caveats on that method.

View File

@ -55,6 +55,7 @@ import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -76,7 +77,7 @@ public class RowBasedGrouperHelper
final boolean isInputRaw,
final Map<String, ValueType> rawInputRowSignature,
final GroupByQueryConfig config,
final ByteBuffer buffer,
final Supplier<ByteBuffer> bufferSupplier,
final int concurrencyHint,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
@ -105,7 +106,7 @@ public class RowBasedGrouperHelper
final Grouper<RowBasedKey> grouper;
if (concurrencyHint == -1) {
grouper = new SpillingGrouper<>(
buffer,
bufferSupplier,
keySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
@ -118,7 +119,7 @@ public class RowBasedGrouperHelper
);
} else {
grouper = new ConcurrentGrouper<>(
buffer,
bufferSupplier,
keySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
@ -131,16 +132,15 @@ public class RowBasedGrouperHelper
);
}
final Supplier[] inputRawSuppliers;
if (isInputRaw) {
inputRawSuppliers = getValueSuppliersForDimensions(
final int keySize = includeTimestamp ? query.getDimensions().size() + 1 : query.getDimensions().size();
final ValueExtractFunction valueExtractFn = makeValueExtractFunction(
query,
isInputRaw,
includeTimestamp,
columnSelectorFactory,
query.getDimensions(),
rawInputRowSignature
rawInputRowSignature,
valueTypes
);
} else {
inputRawSuppliers = null;
}
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = new Accumulator<Grouper<RowBasedKey>, Row>()
{
@ -159,62 +159,14 @@ public class RowBasedGrouperHelper
return null;
}
if (!theGrouper.isInitialized()) {
theGrouper.init();
}
columnSelectorRow.set(row);
final int dimStart;
final Comparable[] key;
if (includeTimestamp) {
key = new Comparable[query.getDimensions().size() + 1];
final long timestamp;
if (isInputRaw) {
if (query.getGranularity() instanceof AllGranularity) {
timestamp = query.getIntervals().get(0).getStartMillis();
} else {
timestamp = query.getGranularity().truncate(row.getTimestampFromEpoch());
}
} else {
timestamp = row.getTimestampFromEpoch();
}
key[0] = timestamp;
dimStart = 1;
} else {
key = new Comparable[query.getDimensions().size()];
dimStart = 0;
}
for (int i = dimStart; i < key.length; i++) {
final ValueType type = valueTypes.get(i - dimStart);
Object valObj;
if (isInputRaw) {
valObj = inputRawSuppliers[i - dimStart].get();
} else {
valObj = row.getRaw(query.getDimensions().get(i - dimStart).getOutputName());
}
// convert values to the output type specified by the DimensionSpec, for merging purposes
switch (type) {
case STRING:
valObj = valObj == null ? "" : valObj.toString();
break;
case LONG:
valObj = DimensionHandlerUtils.convertObjectToLong(valObj);
if (valObj == null) {
valObj = 0L;
}
break;
case FLOAT:
valObj = DimensionHandlerUtils.convertObjectToFloat(valObj);
if (valObj == null) {
valObj = 0.0f;
}
break;
default:
throw new IAE("invalid type: [%s]", type);
}
key[i] = (Comparable) valObj;
}
final Comparable[] key = new Comparable[keySize];
valueExtractFn.apply(row, key);
final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(key));
if (!didAggregate) {
@ -230,6 +182,135 @@ public class RowBasedGrouperHelper
return new Pair<>(grouper, accumulator);
}
private interface TimestampExtractFunction
{
long apply(Row row);
}
private static TimestampExtractFunction makeTimestampExtractFunction(
final GroupByQuery query,
final boolean isInputRaw
)
{
if (isInputRaw) {
if (query.getGranularity() instanceof AllGranularity) {
return new TimestampExtractFunction()
{
@Override
public long apply(Row row)
{
return query.getIntervals().get(0).getStartMillis();
}
};
} else {
return new TimestampExtractFunction()
{
@Override
public long apply(Row row)
{
return query.getGranularity().truncate(row.getTimestampFromEpoch());
}
};
}
} else {
return new TimestampExtractFunction()
{
@Override
public long apply(Row row)
{
return row.getTimestampFromEpoch();
}
};
}
}
private interface ValueExtractFunction
{
Comparable[] apply(Row row, Comparable[] key);
}
private static ValueExtractFunction makeValueExtractFunction(
final GroupByQuery query,
final boolean isInputRaw,
final boolean includeTimestamp,
final ColumnSelectorFactory columnSelectorFactory,
final Map<String, ValueType> rawInputRowSignature,
final List<ValueType> valueTypes
)
{
final TimestampExtractFunction timestampExtractFn = includeTimestamp ?
makeTimestampExtractFunction(query, isInputRaw) :
null;
final Function<Comparable, Comparable>[] valueConvertFns = makeValueConvertFunctions(valueTypes);
if (isInputRaw) {
final Supplier<Comparable>[] inputRawSuppliers = getValueSuppliersForDimensions(
columnSelectorFactory,
query.getDimensions(),
rawInputRowSignature
);
if (includeTimestamp) {
return new ValueExtractFunction()
{
@Override
public Comparable[] apply(Row row, Comparable[] key)
{
key[0] = timestampExtractFn.apply(row);
for (int i = 1; i < key.length; i++) {
final Comparable val = inputRawSuppliers[i - 1].get();
key[i] = valueConvertFns[i - 1].apply(val);
}
return key;
}
};
} else {
return new ValueExtractFunction()
{
@Override
public Comparable[] apply(Row row, Comparable[] key)
{
for (int i = 0; i < key.length; i++) {
final Comparable val = inputRawSuppliers[i].get();
key[i] = valueConvertFns[i].apply(val);
}
return key;
}
};
}
} else {
if (includeTimestamp) {
return new ValueExtractFunction()
{
@Override
public Comparable[] apply(Row row, Comparable[] key)
{
key[0] = timestampExtractFn.apply(row);
for (int i = 1; i < key.length; i++) {
final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName());
key[i] = valueConvertFns[i - 1].apply(val);
}
return key;
}
};
} else {
return new ValueExtractFunction()
{
@Override
public Comparable[] apply(Row row, Comparable[] key)
{
for (int i = 0; i < key.length; i++) {
final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName());
key[i] = valueConvertFns[i].apply(val);
}
return key;
}
};
}
}
}
public static CloseableGrouperIterator<RowBasedKey, Row> makeGrouperIterator(
final Grouper<RowBasedKey> grouper,
final GroupByQuery query,
@ -340,7 +421,8 @@ public class RowBasedGrouperHelper
}
}
private static Supplier[] getValueSuppliersForDimensions(
@SuppressWarnings("unchecked")
private static Supplier<Comparable>[] getValueSuppliersForDimensions(
final ColumnSelectorFactory columnSelectorFactory,
final List<DimensionSpec> dimensions,
final Map<String, ValueType> rawInputRowSignature
@ -360,10 +442,10 @@ public class RowBasedGrouperHelper
}
switch (type) {
case STRING:
inputRawSuppliers[i] = new Supplier()
inputRawSuppliers[i] = new Supplier<Comparable>()
{
@Override
public Object get()
public Comparable get()
{
final String value;
IndexedInts index = ((DimensionSelector) selector).getRow();
@ -375,20 +457,20 @@ public class RowBasedGrouperHelper
};
break;
case LONG:
inputRawSuppliers[i] = new Supplier()
inputRawSuppliers[i] = new Supplier<Comparable>()
{
@Override
public Object get()
public Comparable get()
{
return ((LongColumnSelector) selector).get();
}
};
break;
case FLOAT:
inputRawSuppliers[i] = new Supplier()
inputRawSuppliers[i] = new Supplier<Comparable>()
{
@Override
public Object get()
public Comparable get()
{
return ((FloatColumnSelector) selector).get();
}
@ -401,6 +483,74 @@ public class RowBasedGrouperHelper
return inputRawSuppliers;
}
@SuppressWarnings("unchecked")
private static Function<Comparable, Comparable>[] makeValueConvertFunctions(
final Map<String, ValueType> rawInputRowSignature,
final List<DimensionSpec> dimensions
)
{
final List<ValueType> valueTypes = Lists.newArrayListWithCapacity(dimensions.size());
for (DimensionSpec dimensionSpec : dimensions) {
final ValueType valueType = rawInputRowSignature.get(dimensionSpec);
valueTypes.add(valueType == null ? ValueType.STRING : valueType);
}
return makeValueConvertFunctions(valueTypes);
}
@SuppressWarnings("unchecked")
private static Function<Comparable, Comparable>[] makeValueConvertFunctions(
final List<ValueType> valueTypes
)
{
final Function<Comparable, Comparable>[] functions = new Function[valueTypes.size()];
for (int i = 0; i < functions.length; i++) {
ValueType type = valueTypes.get(i);
// Subquery post-aggs aren't added to the rowSignature (see rowSignatureFor() in GroupByQueryHelper) because
// their types aren't known, so default to String handling.
type = type == null ? ValueType.STRING : type;
switch (type) {
case STRING:
functions[i] = new Function<Comparable, Comparable>()
{
@Override
public Comparable apply(@Nullable Comparable input)
{
return input == null ? "" : input.toString();
}
};
break;
case LONG:
functions[i] = new Function<Comparable, Comparable>()
{
@Override
public Comparable apply(@Nullable Comparable input)
{
final Long val = DimensionHandlerUtils.convertObjectToLong(input);
return val == null ? 0L : val;
}
};
break;
case FLOAT:
functions[i] = new Function<Comparable, Comparable>()
{
@Override
public Comparable apply(@Nullable Comparable input)
{
final Float val = DimensionHandlerUtils.convertObjectToFloat(input);
return val == null ? 0.f : val;
}
};
break;
default:
throw new IAE("invalid type: [%s]", type);
}
}
return functions;
}
private static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory<RowBasedKey>
{
private final boolean includeTimestamp;

View File

@ -23,11 +23,11 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
@ -51,8 +51,6 @@ import java.util.List;
*/
public class SpillingGrouper<KeyType> implements Grouper<KeyType>
{
private static final Logger log = new Logger(SpillingGrouper.class);
private final BufferGrouper<KeyType> grouper;
private final KeySerde<KeyType> keySerde;
private final LimitedTemporaryStorage temporaryStorage;
@ -66,7 +64,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
private boolean spillingAllowed = false;
public SpillingGrouper(
final ByteBuffer buffer,
final Supplier<ByteBuffer> bufferSupplier,
final KeySerdeFactory<KeyType> keySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
@ -81,7 +79,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
this.keySerde = keySerdeFactory.factorize();
this.keyObjComparator = keySerdeFactory.objectComparator();
this.grouper = new BufferGrouper<>(
buffer,
bufferSupplier,
keySerde,
columnSelectorFactory,
aggregatorFactories,
@ -95,6 +93,18 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
this.spillingAllowed = spillingAllowed;
}
@Override
public void init()
{
grouper.init();
}
@Override
public boolean isInitialized()
{
return grouper.isInitialized();
}
@Override
public boolean aggregate(KeyType key, int keyHash)
{

View File

@ -0,0 +1,346 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.collections.BlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.groupby.strategy.GroupByStrategyV1;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import org.bouncycastle.util.Integers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class GroupByQueryMergeBufferTest
{
private static class TestBlockingPool extends BlockingPool<ByteBuffer>
{
private int minRemainBufferNum;
public TestBlockingPool(Supplier<ByteBuffer> generator, int limit)
{
super(generator, limit);
minRemainBufferNum = limit;
}
@Override
public ReferenceCountingResourceHolder<ByteBuffer> take(final long timeout) throws InterruptedException
{
final ReferenceCountingResourceHolder<ByteBuffer> holder = super.take(timeout);
final int queueSize = getQueueSize();
if (minRemainBufferNum > queueSize) {
minRemainBufferNum = queueSize;
}
return holder;
}
public void resetMinRemainBufferNum()
{
minRemainBufferNum = PROCESSING_CONFIG.getNumMergeBuffers();
}
public int getMinRemainBufferNum()
{
return minRemainBufferNum;
}
}
public static final DruidProcessingConfig PROCESSING_CONFIG = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
@Override
public int intermediateComputeSizeBytes()
{
return 10 * 1024 * 1024;
}
@Override
public int getNumMergeBuffers()
{
return 3;
}
@Override
public int getNumThreads()
{
return 1;
}
};
private static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config
)
{
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes());
}
}
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
bufferPool
),
new GroupByStrategyV2(
PROCESSING_CONFIG,
configSupplier,
bufferPool,
mergeBufferPool,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(
configSupplier,
strategySelector,
bufferPool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
return new GroupByQueryRunnerFactory(
strategySelector,
toolChest
);
}
private final static TestBlockingPool mergeBufferPool = new TestBlockingPool(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get ()
{
return ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes());
}
},
PROCESSING_CONFIG.getNumMergeBuffers()
);
private static final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
{
public String getDefaultStrategy()
{
return "v2";
}
}
);
private QueryRunner<Row> runner;
@Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
final List<Object[]> args = Lists.newArrayList();
for (QueryRunner<Row> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
args.add(new Object[]{runner});
}
return args;
}
public GroupByQueryMergeBufferTest(QueryRunner<Row> runner)
{
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
}
@Before
public void setup()
{
mergeBufferPool.resetMinRemainBufferNum();
}
@Test
public void testSimpleGroupBy()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryGranularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
assertEquals(2, mergeBufferPool.getMinRemainBufferNum());
}
@Test
public void testNestedGroupBy()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
)
.setGranularity(QueryGranularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
assertEquals(1, mergeBufferPool.getMinRemainBufferNum());
}
@Test
public void testDoubleNestedGroupBy()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("quality", "alias"),
new DefaultDimensionSpec("market", null)
))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
)
.setGranularity(QueryGranularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
assertEquals(1, mergeBufferPool.getMinRemainBufferNum());
}
@Test
public void testTripleNestedGroupBy()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("quality", "alias"),
new DefaultDimensionSpec("market", null),
new DefaultDimensionSpec("placement", null)
))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("quality", "alias"),
new DefaultDimensionSpec("market", null)
))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
)
.setGranularity(QueryGranularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
assertEquals(1, mergeBufferPool.getMinRemainBufferNum());
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import org.bouncycastle.util.Integers;
import org.hamcrest.CoreMatchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerFailureTest
{
public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
@Override
public int intermediateComputeSizeBytes()
{
return 10 * 1024 * 1024;
}
@Override
public int getNumMergeBuffers()
{
return 1;
}
@Override
public int getNumThreads()
{
return 2;
}
};
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
{
public String getDefaultStrategy()
{
return "v2";
}
},
DEFAULT_PROCESSING_CONFIG
);
private QueryRunner<Row> runner;
@Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
final List<Object[]> args = Lists.newArrayList();
for (QueryRunner<Row> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
args.add(new Object[]{runner});
}
return args;
}
public GroupByQueryRunnerFailureTest(QueryRunner<Row> runner)
{
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
}
@Test(timeout = 10000)
public void testLackOfMergeBuffers() throws IOException
{
expectedException.expect(QueryInterruptedException.class);
expectedException.expectCause(CoreMatchers.<Throwable>instanceOf(TimeoutException.class));
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setGranularity(QueryGranularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.build()
)
)
.setGranularity(QueryGranularities.ALL)
.setInterval(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(Lists.<AggregatorFactory>newArrayList(new LongSumAggregatorFactory("rows", "rows")))
.setContext(ImmutableMap.<String, Object>of(QueryContextKeys.TIMEOUT, Integers.valueOf(500)))
.build();
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
}

View File

@ -19,6 +19,7 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@ -42,7 +43,7 @@ public class BufferGrouperTest
{
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final Grouper<Integer> grouper = new BufferGrouper<>(
ByteBuffer.allocate(1000),
Suppliers.ofInstance(ByteBuffer.allocate(1000)),
GrouperTestUtil.intKeySerde(),
columnSelectorFactory,
new AggregatorFactory[]{
@ -53,6 +54,7 @@ public class BufferGrouperTest
0,
0
);
grouper.init();
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.<String, Object>of("value", 10L)));
grouper.aggregate(12);
@ -148,8 +150,8 @@ public class BufferGrouperTest
int initialBuckets
)
{
return new BufferGrouper<>(
ByteBuffer.allocate(bufferSize),
final BufferGrouper<Integer> grouper = new BufferGrouper<>(
Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)),
GrouperTestUtil.intKeySerde(),
columnSelectorFactory,
new AggregatorFactory[]{
@ -160,5 +162,7 @@ public class BufferGrouperTest
0.75f,
initialBuckets
);
grouper.init();
return grouper;
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.primitives.Longs;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.epinephelinae.Grouper.KeyComparator;
import io.druid.query.groupby.epinephelinae.Grouper.KeySerde;
import io.druid.query.groupby.epinephelinae.Grouper.KeySerdeFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.ColumnCapabilities;
import org.junit.AfterClass;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
public class ConcurrentGrouperTest
{
private static final ExecutorService service = Executors.newFixedThreadPool(8);
@AfterClass
public static void teardown()
{
service.shutdown();
}
private static final Supplier<ByteBuffer> bufferSupplier = new Supplier<ByteBuffer>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
@Override
public ByteBuffer get()
{
if (called.compareAndSet(false, true)) {
return ByteBuffer.allocate(192);
} else {
throw new IAE("should be called once");
}
}
};
private static final KeySerdeFactory<Long> keySerdeFactory = new KeySerdeFactory<Long>()
{
@Override
public KeySerde<Long> factorize()
{
return new KeySerde<Long>()
{
final ByteBuffer buffer = ByteBuffer.allocate(8);
@Override
public int keySize()
{
return 8;
}
@Override
public Class<Long> keyClazz()
{
return Long.class;
}
@Override
public ByteBuffer toByteBuffer(Long key)
{
buffer.rewind();
buffer.putLong(key);
buffer.position(0);
return buffer;
}
@Override
public Long fromByteBuffer(ByteBuffer buffer, int position)
{
return buffer.getLong(position);
}
@Override
public KeyComparator bufferComparator()
{
return new KeyComparator()
{
@Override
public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, int rhsPosition)
{
return Longs.compare(lhsBuffer.getLong(lhsPosition), rhsBuffer.getLong(rhsPosition));
}
};
}
@Override
public void reset() {}
};
}
@Override
public Comparator<Long> objectComparator()
{
return new Comparator<Long>()
{
@Override
public int compare(Long o1, Long o2)
{
return o1.compareTo(o2);
}
};
}
};
private static final ColumnSelectorFactory null_factory = new ColumnSelectorFactory()
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return null;
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
return null;
}
@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
return null;
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
return null;
}
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
return null;
}
};
@Test
public void testAggregate() throws InterruptedException, ExecutionException
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
bufferSupplier,
keySerdeFactory,
null_factory,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
24,
0.7f,
1,
null,
null,
8
);
Future<?>[] futures = new Future[8];
for (int i = 0; i < 8; i++) {
futures[i] = service.submit(new Runnable()
{
@Override
public void run()
{
grouper.init();
for (long i = 0; i < 100; i++) {
grouper.aggregate(0L);
}
}
});
}
for (Future eachFuture : futures) {
eachFuture.get();
}
grouper.close();
}
}

View File

@ -184,8 +184,7 @@ public class GroupByRules
null,
aggregate,
operatorTable,
plannerConfig.isUseApproximateCountDistinct(),
plannerConfig.getMaxQueryCount()
plannerConfig.isUseApproximateCountDistinct()
);
if (newDruidRel != null) {
call.transformTo(newDruidRel);
@ -226,8 +225,7 @@ public class GroupByRules
project,
aggregate,
operatorTable,
plannerConfig.isUseApproximateCountDistinct(),
plannerConfig.getMaxQueryCount()
plannerConfig.isUseApproximateCountDistinct()
);
if (newDruidRel != null) {
call.transformTo(newDruidRel);
@ -270,8 +268,7 @@ public class GroupByRules
project,
aggregate,
operatorTable,
plannerConfig.isUseApproximateCountDistinct(),
plannerConfig.getMaxQueryCount()
plannerConfig.isUseApproximateCountDistinct()
);
if (newDruidRel != null) {
call.transformTo(newDruidRel);
@ -382,8 +379,7 @@ public class GroupByRules
final Project project0,
final Aggregate aggregate,
final DruidOperatorTable operatorTable,
final boolean approximateCountDistinct,
final int maxQueryCount
final boolean approximateCountDistinct
)
{
Preconditions.checkState(canApplyAggregate(druidRel, filter0, project0, aggregate), "Cannot applyAggregate.");
@ -490,20 +486,13 @@ public class GroupByRules
if (isNestedQuery) {
// Nested groupBy.
final DruidNestedGroupBy retVal = DruidNestedGroupBy.from(
return DruidNestedGroupBy.from(
druidRel,
filter,
Grouping.create(dimensions, aggregations),
aggregate.getRowType(),
rowOrder
);
// Check maxQueryCount.
if (maxQueryCount > 0 && retVal.getQueryCount() > maxQueryCount) {
return null;
}
return retVal;
} else {
// groupBy on a base dataSource.
return druidRel.withQueryBuilder(

View File

@ -1814,32 +1814,42 @@ public class CalciteQueryTest
+ " ) t1\n"
+ " GROUP BY dim2\n"
+ ") t2",
null,
ImmutableList.<Query>of(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(QSS(Filtration.eternity()))
.setGranularity(QueryGranularities.ALL)
.setDimensions(DIMS(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
))
.setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0")))
.build()
)
.setInterval(QSS(Filtration.eternity()))
.setGranularity(QueryGranularities.ALL)
.setDimensions(DIMS(new DefaultDimensionSpec("d1", "d0")))
.setAggregatorSpecs(AGGS(new LongSumAggregatorFactory("a0", "a0")))
.build()
)
.setInterval(QSS(Filtration.eternity()))
.setGranularity(QueryGranularities.ALL)
.setAggregatorSpecs(AGGS(
new LongSumAggregatorFactory("a0", "a0"),
new CountAggregatorFactory("a1")
))
.build()
),
ImmutableList.of(
new Object[]{6L, 3L}
)
);
}
@Test
public void testDoubleNestedGroupByForbiddenByConfig() throws Exception
{
assertQueryIsUnplannable(
PLANNER_CONFIG_SINGLE_NESTING_ONLY,
"SELECT SUM(cnt), COUNT(*) FROM (\n"
+ " SELECT dim2, SUM(t1.cnt) cnt FROM (\n"
+ " SELECT\n"
+ " dim1,\n"
+ " dim2,\n"
+ " COUNT(*) cnt\n"
+ " FROM druid.foo\n"
+ " GROUP BY dim1, dim2\n"
+ " ) t1\n"
+ " GROUP BY dim2\n"
+ ") t2"
);
}
@Test
public void testExactCountDistinctUsingSubquery() throws Exception
{

View File

@ -171,8 +171,8 @@ public class CalciteTests
@Override
public int getNumMergeBuffers()
{
// Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
return 3;
// Need 2 buffers for CalciteQueryTest.testDoubleNestedGroupby.
return 2;
}
}
)