mirror of https://github.com/apache/druid.git
Merge branch 'master' into offheap-incremental-index
Conflicts: processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChe st.java processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.j ava
This commit is contained in:
commit
2d03888a2c
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
5
pom.xml
5
pom.xml
|
@ -18,13 +18,12 @@
|
|||
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.metamx.common.guava.Yielders;
|
|||
import com.metamx.common.guava.YieldingAccumulator;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -55,15 +56,10 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
|
|||
{
|
||||
if (query.getContextBySegment(false)) {
|
||||
final Sequence<T> baseSequence = base.run(query);
|
||||
return new Sequence<T>()
|
||||
{
|
||||
@Override
|
||||
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
|
||||
{
|
||||
List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
|
||||
|
||||
return accumulator.accumulate(
|
||||
initValue,
|
||||
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
|
||||
return Sequences.simple(
|
||||
Arrays.asList(
|
||||
(T) new Result<BySegmentResultValueClass<T>>(
|
||||
timestamp,
|
||||
new BySegmentResultValueClass<T>(
|
||||
|
@ -72,29 +68,8 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
|
|||
query.getIntervals().get(0)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
||||
{
|
||||
List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
|
||||
|
||||
final OutType retVal = accumulator.accumulate(
|
||||
initValue,
|
||||
(T) new Result<BySegmentResultValueClass<T>>(
|
||||
timestamp,
|
||||
new BySegmentResultValueClass<T>(
|
||||
results,
|
||||
segmentIdentifier,
|
||||
query.getIntervals().get(0)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
return Yielders.done(retVal, null);
|
||||
}
|
||||
};
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return base.run(query);
|
||||
|
|
|
@ -94,7 +94,6 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
|
|||
@Override
|
||||
public Sequence<Row> run(final Query<Row> queryParam)
|
||||
{
|
||||
|
||||
final GroupByQuery query = (GroupByQuery) queryParam;
|
||||
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
|
||||
query,
|
||||
|
|
|
@ -26,9 +26,9 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
* Processing related interface
|
||||
*
|
||||
* <p/>
|
||||
* An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory.
|
||||
*
|
||||
* <p/>
|
||||
* This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects
|
||||
* without making any assumptions about how they are pulling values out of the base data. That is, the data is
|
||||
* provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how
|
||||
|
@ -37,7 +37,9 @@ import java.util.List;
|
|||
public interface AggregatorFactory
|
||||
{
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory);
|
||||
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory);
|
||||
|
||||
public Comparator getComparator();
|
||||
|
||||
/**
|
||||
|
@ -48,6 +50,7 @@ public interface AggregatorFactory
|
|||
*
|
||||
* @param lhs The left hand side of the combine
|
||||
* @param rhs The right hand side of the combine
|
||||
*
|
||||
* @return an object representing the combination of lhs and rhs, this can be a new object or a mutation of the inputs
|
||||
*/
|
||||
public Object combine(Object lhs, Object rhs);
|
||||
|
@ -61,11 +64,19 @@ public interface AggregatorFactory
|
|||
*/
|
||||
public AggregatorFactory getCombiningFactory();
|
||||
|
||||
/**
|
||||
* Gets a list of all columns that this AggregatorFactory will scan
|
||||
*
|
||||
* @return AggregatorFactories for the columns to scan of the parent AggregatorFactory
|
||||
*/
|
||||
public List<AggregatorFactory> getRequiredColumns();
|
||||
|
||||
/**
|
||||
* A method that knows how to "deserialize" the object from whatever form it might have been put into
|
||||
* in order to transfer via JSON.
|
||||
*
|
||||
* @param object the object to deserialize
|
||||
*
|
||||
* @return the deserialized object
|
||||
*/
|
||||
public Object deserialize(Object object);
|
||||
|
@ -75,13 +86,17 @@ public interface AggregatorFactory
|
|||
* intermediate format than their final resultant output.
|
||||
*
|
||||
* @param object the object to be finalized
|
||||
*
|
||||
* @return the finalized value that should be returned for the initial query
|
||||
*/
|
||||
public Object finalizeComputation(Object object);
|
||||
|
||||
public String getName();
|
||||
|
||||
public List<String> requiredFields();
|
||||
|
||||
public byte[] getCacheKey();
|
||||
|
||||
public String getTypeName();
|
||||
|
||||
/**
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.primitives.Longs;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -76,6 +77,12 @@ public class CountAggregatorFactory implements AggregatorFactory
|
|||
return new LongSumAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new CountAggregatorFactory(name));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
@ -136,12 +143,18 @@ public class CountAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CountAggregatorFactory that = (CountAggregatorFactory) o;
|
||||
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) return false;
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -85,6 +85,12 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
|
|||
return new DoubleSumAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new DoubleSumAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
@ -158,13 +164,21 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o;
|
||||
|
||||
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) return false;
|
||||
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
|
||||
return false;
|
||||
}
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
|||
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.breaksList = (breaksList == null) ? Lists.<Float>newArrayList() :breaksList;
|
||||
this.breaksList = (breaksList == null) ? Lists.<Float>newArrayList() : breaksList;
|
||||
this.breaks = new float[this.breaksList.size()];
|
||||
for (int i = 0; i < this.breaksList.size(); ++i) {
|
||||
this.breaks[i] = this.breaksList.get(i);
|
||||
|
@ -100,6 +100,12 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
|||
return new HistogramAggregatorFactory(name, name, breaksList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new HistogramAggregatorFactory(fieldName, fieldName, breaksList));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
@ -183,15 +189,27 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
HistogramAggregatorFactory that = (HistogramAggregatorFactory) o;
|
||||
|
||||
if (!Arrays.equals(breaks, that.breaks)) return false;
|
||||
if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) return false;
|
||||
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) return false;
|
||||
if (!Arrays.equals(breaks, that.breaks)) {
|
||||
return false;
|
||||
}
|
||||
if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) {
|
||||
return false;
|
||||
}
|
||||
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
|
||||
return false;
|
||||
}
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -140,6 +140,22 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
|||
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Lists.transform(
|
||||
fieldNames,
|
||||
new com.google.common.base.Function<String, AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public AggregatorFactory apply(String input)
|
||||
{
|
||||
return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -31,11 +31,11 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
*/
|
||||
public class LongSumAggregatorFactory implements AggregatorFactory
|
||||
{
|
||||
private static final byte CACHE_TYPE_ID = 0x1;
|
||||
|
||||
|
||||
private final String fieldName;
|
||||
private final String name;
|
||||
|
||||
|
@ -85,6 +85,12 @@ public class LongSumAggregatorFactory implements AggregatorFactory
|
|||
return new LongSumAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new LongSumAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
@ -154,13 +160,21 @@ public class LongSumAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LongSumAggregatorFactory that = (LongSumAggregatorFactory) o;
|
||||
|
||||
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false;
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) return false;
|
||||
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
|
||||
return false;
|
||||
}
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -82,6 +82,12 @@ public class MaxAggregatorFactory implements AggregatorFactory
|
|||
return new MaxAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new MaxAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -82,6 +82,12 @@ public class MinAggregatorFactory implements AggregatorFactory
|
|||
return new MinAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new MinAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -65,6 +65,12 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory
|
|||
return baseAggregatorFactory.getCombiningFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return baseAggregatorFactory.getRequiredColumns();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -32,12 +32,14 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.aggregation.Aggregators;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -142,7 +144,23 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
|
|||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return new CardinalityAggregatorFactory(name, fieldNames, byRow);
|
||||
return new HyperUniquesAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Lists.transform(
|
||||
fieldNames,
|
||||
new Function<String, AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public AggregatorFactory apply(String input)
|
||||
{
|
||||
return new CardinalityAggregatorFactory(input, fieldNames, byRow);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -73,12 +73,13 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
|
|||
return Aggregators.noopAggregator();
|
||||
}
|
||||
|
||||
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
|
||||
final Class classOfObject = selector.classOfObject();
|
||||
if (classOfObject.equals(Object.class) || HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
|
||||
return new HyperUniquesAggregator(name, selector);
|
||||
}
|
||||
|
||||
throw new IAE(
|
||||
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
|
||||
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, classOfObject
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -91,12 +92,13 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
|
|||
return Aggregators.noopBufferAggregator();
|
||||
}
|
||||
|
||||
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) {
|
||||
final Class classOfObject = selector.classOfObject();
|
||||
if (classOfObject.equals(Object.class) || HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
|
||||
return new HyperUniquesBufferAggregator(selector);
|
||||
}
|
||||
|
||||
throw new IAE(
|
||||
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject()
|
||||
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, classOfObject
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -131,6 +133,12 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
|
|||
return new HyperUniquesAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new HyperUniquesAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -72,7 +72,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
private final List<AggregatorFactory> aggregatorSpecs;
|
||||
private final List<PostAggregator> postAggregatorSpecs;
|
||||
|
||||
private final Function<Sequence<Row>, Sequence<Row>> orderByLimitFn;
|
||||
private final Function<Sequence<Row>, Sequence<Row>> limitFn;
|
||||
|
||||
@JsonCreator
|
||||
public GroupByQuery(
|
||||
|
@ -85,8 +85,9 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
|
||||
@JsonProperty("having") HavingSpec havingSpec,
|
||||
@JsonProperty("limitSpec") LimitSpec limitSpec,
|
||||
@JsonProperty("orderBy") LimitSpec orderBySpec,
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
@JsonProperty("context") Map<String, Object> context,
|
||||
// Backwards compatible
|
||||
@JsonProperty("orderBy") LimitSpec orderBySpec
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, context);
|
||||
|
@ -129,7 +130,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
);
|
||||
}
|
||||
|
||||
orderByLimitFn = postProcFn;
|
||||
limitFn = postProcFn;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -146,7 +147,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
List<PostAggregator> postAggregatorSpecs,
|
||||
HavingSpec havingSpec,
|
||||
LimitSpec orderBySpec,
|
||||
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn,
|
||||
Function<Sequence<Row>, Sequence<Row>> limitFn,
|
||||
Map<String, Object> context
|
||||
)
|
||||
{
|
||||
|
@ -159,7 +160,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
this.postAggregatorSpecs = postAggregatorSpecs;
|
||||
this.havingSpec = havingSpec;
|
||||
this.limitSpec = orderBySpec;
|
||||
this.orderByLimitFn = orderByLimitFn;
|
||||
this.limitFn = limitFn;
|
||||
}
|
||||
|
||||
@JsonProperty("filter")
|
||||
|
@ -199,7 +200,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
public LimitSpec getOrderBy()
|
||||
public LimitSpec getLimitSpec()
|
||||
{
|
||||
return limitSpec;
|
||||
}
|
||||
|
@ -218,7 +219,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
|
||||
public Sequence<Row> applyLimit(Sequence<Row> results)
|
||||
{
|
||||
return orderByLimitFn.apply(results);
|
||||
return limitFn.apply(results);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -234,7 +235,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
postAggregatorSpecs,
|
||||
havingSpec,
|
||||
limitSpec,
|
||||
orderByLimitFn,
|
||||
limitFn,
|
||||
computeOverridenContext(contextOverride)
|
||||
);
|
||||
}
|
||||
|
@ -252,7 +253,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
postAggregatorSpecs,
|
||||
havingSpec,
|
||||
limitSpec,
|
||||
orderByLimitFn,
|
||||
limitFn,
|
||||
getContext()
|
||||
);
|
||||
}
|
||||
|
@ -270,7 +271,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
postAggregatorSpecs,
|
||||
havingSpec,
|
||||
limitSpec,
|
||||
orderByLimitFn,
|
||||
limitFn,
|
||||
getContext()
|
||||
);
|
||||
}
|
||||
|
@ -292,11 +293,25 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
|
||||
private int limit = Integer.MAX_VALUE;
|
||||
|
||||
private Builder()
|
||||
public Builder()
|
||||
{
|
||||
}
|
||||
|
||||
private Builder(Builder builder)
|
||||
public Builder(GroupByQuery query)
|
||||
{
|
||||
dataSource = query.getDataSource();
|
||||
querySegmentSpec = query.getQuerySegmentSpec();
|
||||
limitSpec = query.getLimitSpec();
|
||||
dimFilter = query.getDimFilter();
|
||||
granularity = query.getGranularity();
|
||||
dimensions = query.getDimensions();
|
||||
aggregatorSpecs = query.getAggregatorSpecs();
|
||||
postAggregatorSpecs = query.getPostAggregatorSpecs();
|
||||
havingSpec = query.getHavingSpec();
|
||||
context = query.getContext();
|
||||
}
|
||||
|
||||
public Builder(Builder builder)
|
||||
{
|
||||
dataSource = builder.dataSource;
|
||||
querySegmentSpec = builder.querySegmentSpec;
|
||||
|
@ -490,7 +505,11 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
{
|
||||
final LimitSpec theLimitSpec;
|
||||
if (limitSpec == null) {
|
||||
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
|
||||
if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) {
|
||||
theLimitSpec = new NoopLimitSpec();
|
||||
} else {
|
||||
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
|
||||
}
|
||||
} else {
|
||||
theLimitSpec = limitSpec;
|
||||
}
|
||||
|
@ -504,9 +523,9 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
aggregatorSpecs,
|
||||
postAggregatorSpecs,
|
||||
havingSpec,
|
||||
null,
|
||||
theLimitSpec,
|
||||
context
|
||||
context,
|
||||
null
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -515,36 +534,57 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
public String toString()
|
||||
{
|
||||
return "GroupByQuery{" +
|
||||
"limitSpec=" + limitSpec +
|
||||
", dimFilter=" + dimFilter +
|
||||
", granularity=" + granularity +
|
||||
", dimensions=" + dimensions +
|
||||
", aggregatorSpecs=" + aggregatorSpecs +
|
||||
", postAggregatorSpecs=" + postAggregatorSpecs +
|
||||
", orderByLimitFn=" + orderByLimitFn +
|
||||
'}';
|
||||
"limitSpec=" + limitSpec +
|
||||
", dimFilter=" + dimFilter +
|
||||
", granularity=" + granularity +
|
||||
", dimensions=" + dimensions +
|
||||
", aggregatorSpecs=" + aggregatorSpecs +
|
||||
", postAggregatorSpecs=" + postAggregatorSpecs +
|
||||
", limitFn=" + limitFn +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (!super.equals(o)) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
GroupByQuery that = (GroupByQuery) o;
|
||||
|
||||
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null)
|
||||
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) {
|
||||
return false;
|
||||
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false;
|
||||
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false;
|
||||
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
|
||||
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) return false;
|
||||
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) return false;
|
||||
if (orderByLimitFn != null ? !orderByLimitFn.equals(that.orderByLimitFn) : that.orderByLimitFn != null)
|
||||
}
|
||||
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
|
||||
return false;
|
||||
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null)
|
||||
}
|
||||
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
|
||||
return false;
|
||||
}
|
||||
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) {
|
||||
return false;
|
||||
}
|
||||
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) {
|
||||
return false;
|
||||
}
|
||||
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) {
|
||||
return false;
|
||||
}
|
||||
if (limitFn != null ? !limitFn.equals(that.limitFn) : that.limitFn != null) {
|
||||
return false;
|
||||
}
|
||||
if (postAggregatorSpecs != null
|
||||
? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
|
||||
: that.postAggregatorSpecs != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -560,7 +600,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
|
||||
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
|
||||
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
|
||||
result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0);
|
||||
result = 31 * result + (limitFn != null ? limitFn.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -57,7 +56,7 @@ public class GroupByQueryHelper
|
|||
new Function<AggregatorFactory, AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public AggregatorFactory apply(@Nullable AggregatorFactory input)
|
||||
public AggregatorFactory apply(AggregatorFactory input)
|
||||
{
|
||||
return input.getCombiningFactory();
|
||||
}
|
||||
|
@ -68,7 +67,7 @@ public class GroupByQueryHelper
|
|||
new Function<DimensionSpec, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DimensionSpec input)
|
||||
public String apply(DimensionSpec input)
|
||||
{
|
||||
return input.getOutputName();
|
||||
}
|
||||
|
@ -80,7 +79,8 @@ public class GroupByQueryHelper
|
|||
granTimeStart,
|
||||
gran,
|
||||
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
||||
bufferPool
|
||||
bufferPool,
|
||||
false
|
||||
);
|
||||
|
||||
Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()
|
||||
|
@ -95,7 +95,7 @@ public class GroupByQueryHelper
|
|||
return accumulated;
|
||||
}
|
||||
};
|
||||
return new Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>>(index, accumulator);
|
||||
return new Pair<>(index, accumulator);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.Pair;
|
||||
|
@ -47,12 +48,14 @@ import io.druid.query.QueryToolChest;
|
|||
import io.druid.query.SubqueryQueryRunner;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Minutes;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -92,7 +95,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
@Override
|
||||
public Sequence<Row> run(Query<Row> input)
|
||||
{
|
||||
if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
||||
if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
|
||||
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
|
||||
} else {
|
||||
return runner.run(input);
|
||||
|
@ -104,8 +107,9 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
|
||||
{
|
||||
// If there's a subquery, merge subquery results and then apply the aggregator
|
||||
DataSource dataSource = query.getDataSource();
|
||||
final IncrementalIndex index;
|
||||
|
||||
final DataSource dataSource = query.getDataSource();
|
||||
|
||||
if (dataSource instanceof QueryDataSource) {
|
||||
GroupByQuery subquery;
|
||||
try {
|
||||
|
@ -114,22 +118,45 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
catch (ClassCastException e) {
|
||||
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
|
||||
}
|
||||
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
|
||||
final IncrementalIndex subQueryResultIndex = makeIncrementalIndex(subquery, subqueryResult);
|
||||
|
||||
Sequence<Row> result = engine.process(query, new IncrementalIndexStorageAdapter(subQueryResultIndex));
|
||||
index = makeIncrementalIndex(query, result);
|
||||
subQueryResultIndex.close();
|
||||
final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
|
||||
final List<AggregatorFactory> aggs = Lists.newArrayList();
|
||||
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
|
||||
aggs.addAll(aggregatorFactory.getRequiredColumns());
|
||||
}
|
||||
|
||||
// We need the inner incremental index to have all the columns required by the outer query
|
||||
final GroupByQuery innerQuery = new GroupByQuery.Builder(query)
|
||||
.setAggregatorSpecs(aggs)
|
||||
.setInterval(subquery.getIntervals())
|
||||
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList())
|
||||
.build();
|
||||
|
||||
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
|
||||
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
|
||||
.build();
|
||||
IncrementalIndex index = makeIncrementalIndex(innerQuery, subqueryResult);
|
||||
|
||||
return new ResourceClosingSequence<>(
|
||||
outerQuery.applyLimit(
|
||||
engine.process(
|
||||
outerQuery,
|
||||
new IncrementalIndexStorageAdapter(
|
||||
index
|
||||
)
|
||||
)
|
||||
),
|
||||
index
|
||||
);
|
||||
} else {
|
||||
index = makeIncrementalIndex(query, runner.run(query));
|
||||
|
||||
final IncrementalIndex index = makeIncrementalIndex(query, runner.run(query));
|
||||
return new ResourceClosingSequence<>(query.applyLimit(postAggregate(query, index)), index);
|
||||
}
|
||||
return new ResourceClosingSequence<Row>(postAggregate(query, index), index);
|
||||
}
|
||||
|
||||
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
|
||||
{
|
||||
Sequence<Row> sequence = Sequences.map(
|
||||
return Sequences.map(
|
||||
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
||||
new Function<Row, Row>()
|
||||
{
|
||||
|
@ -145,7 +172,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
}
|
||||
}
|
||||
);
|
||||
return query.applyLimit(sequence);
|
||||
}
|
||||
|
||||
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
|
||||
|
@ -163,7 +189,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
@Override
|
||||
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
|
||||
{
|
||||
return new ConcatSequence<Row>(seqOfSequences);
|
||||
return new ConcatSequence<>(seqOfSequences);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -87,12 +87,17 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
|
||||
if (limit == Integer.MAX_VALUE) {
|
||||
return new SortingFn(ordering);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
return new TopNFunction(ordering, limit);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public LimitSpec merge(LimitSpec other)
|
||||
{
|
||||
return this;
|
||||
}
|
||||
|
||||
private Ordering<Row> makeComparator(
|
||||
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
|
||||
)
|
||||
|
@ -200,12 +205,18 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LimitingFn that = (LimitingFn) o;
|
||||
|
||||
if (limit != that.limit) return false;
|
||||
if (limit != that.limit) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -232,12 +243,18 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SortingFn sortingFn = (SortingFn) o;
|
||||
|
||||
if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) return false;
|
||||
if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -273,13 +290,21 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
TopNFunction that = (TopNFunction) o;
|
||||
|
||||
if (limit != that.limit) return false;
|
||||
if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) return false;
|
||||
if (limit != that.limit) {
|
||||
return false;
|
||||
}
|
||||
if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -296,13 +321,21 @@ public class DefaultLimitSpec implements LimitSpec
|
|||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DefaultLimitSpec that = (DefaultLimitSpec) o;
|
||||
|
||||
if (limit != that.limit) return false;
|
||||
if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false;
|
||||
if (limit != that.limit) {
|
||||
return false;
|
||||
}
|
||||
if (columns != null ? !columns.equals(that.columns) : that.columns != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -38,5 +38,11 @@ import java.util.List;
|
|||
})
|
||||
public interface LimitSpec
|
||||
{
|
||||
public Function<Sequence<Row>, Sequence<Row>> build(List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs);
|
||||
public Function<Sequence<Row>, Sequence<Row>> build(
|
||||
List<DimensionSpec> dimensions,
|
||||
List<AggregatorFactory> aggs,
|
||||
List<PostAggregator> postAggs
|
||||
);
|
||||
|
||||
public LimitSpec merge(LimitSpec other);
|
||||
}
|
||||
|
|
|
@ -41,6 +41,12 @@ public class NoopLimitSpec implements LimitSpec
|
|||
return Functions.identity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LimitSpec merge(LimitSpec other)
|
||||
{
|
||||
return other;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -26,7 +26,6 @@ import io.druid.query.QueryRunnerHelper;
|
|||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.filter.Filters;
|
||||
|
@ -46,45 +45,43 @@ public class TimeseriesQueryEngine
|
|||
}
|
||||
|
||||
return QueryRunnerHelper.makeCursorBasedQuery(
|
||||
adapter,
|
||||
query.getQuerySegmentSpec().getIntervals(),
|
||||
Filters.convertDimensionFilters(query.getDimensionsFilter()),
|
||||
query.getGranularity(),
|
||||
new Function<Cursor, Result<TimeseriesResultValue>>()
|
||||
{
|
||||
private final List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
|
||||
private final List<PostAggregator> postAggregatorSpecs = query.getPostAggregatorSpecs();
|
||||
adapter,
|
||||
query.getQuerySegmentSpec().getIntervals(),
|
||||
Filters.convertDimensionFilters(query.getDimensionsFilter()),
|
||||
query.getGranularity(),
|
||||
new Function<Cursor, Result<TimeseriesResultValue>>()
|
||||
{
|
||||
private final List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
|
||||
|
||||
@Override
|
||||
public Result<TimeseriesResultValue> apply(Cursor cursor)
|
||||
{
|
||||
Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs);
|
||||
|
||||
try {
|
||||
while (!cursor.isDone()) {
|
||||
for (Aggregator aggregator : aggregators) {
|
||||
aggregator.aggregate();
|
||||
}
|
||||
cursor.advance();
|
||||
}
|
||||
|
||||
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime());
|
||||
|
||||
for (Aggregator aggregator : aggregators) {
|
||||
bob.addMetric(aggregator);
|
||||
}
|
||||
|
||||
Result<TimeseriesResultValue> retVal = bob.build();
|
||||
return retVal;
|
||||
}
|
||||
finally {
|
||||
// cleanup
|
||||
for (Aggregator agg : aggregators) {
|
||||
agg.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public Result<TimeseriesResultValue> apply(Cursor cursor)
|
||||
{
|
||||
Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs);
|
||||
try {
|
||||
while (!cursor.isDone()) {
|
||||
for (Aggregator aggregator : aggregators) {
|
||||
aggregator.aggregate();
|
||||
}
|
||||
cursor.advance();
|
||||
}
|
||||
|
||||
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime());
|
||||
|
||||
for (Aggregator aggregator : aggregators) {
|
||||
bob.addMetric(aggregator);
|
||||
}
|
||||
|
||||
Result<TimeseriesResultValue> retVal = bob.build();
|
||||
return retVal;
|
||||
}
|
||||
finally {
|
||||
// cleanup
|
||||
for (Aggregator agg : aggregators) {
|
||||
agg.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ import io.druid.segment.DimensionSelector;
|
|||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.TimestampColumnSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.serde.ComplexMetricExtractor;
|
||||
import io.druid.segment.serde.ComplexMetricSerde;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
|
@ -59,6 +60,7 @@ import javax.annotation.Nullable;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -96,7 +98,20 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
// This is modified on add() in a critical section.
|
||||
private ThreadLocal<InputRow> in = new ThreadLocal<>();
|
||||
|
||||
public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, StupidPool<ByteBuffer> bufferPool)
|
||||
/**
|
||||
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
|
||||
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
|
||||
*
|
||||
* @param incrementalIndexSchema
|
||||
* @param bufferPool
|
||||
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
|
||||
* value for aggregators that return metrics other than float.
|
||||
*/
|
||||
public IncrementalIndex(
|
||||
IncrementalIndexSchema incrementalIndexSchema,
|
||||
StupidPool<ByteBuffer> bufferPool,
|
||||
final boolean deserializeComplexMetrics
|
||||
)
|
||||
{
|
||||
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
|
||||
this.gran = incrementalIndexSchema.getGran();
|
||||
|
@ -146,54 +161,108 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
final String typeName = agg.getTypeName();
|
||||
final String columnName = column.toLowerCase();
|
||||
|
||||
if (typeName.equals("float")) {
|
||||
return new ObjectColumnSelector<Float>()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Float.TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Float get()
|
||||
{
|
||||
return in.get().getFloatMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
|
||||
|
||||
if (serde == null) {
|
||||
throw new ISE("Don't know how to handle type[%s]", typeName);
|
||||
}
|
||||
|
||||
final ComplexMetricExtractor extractor = serde.getExtractor();
|
||||
|
||||
return new ObjectColumnSelector()
|
||||
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return extractor.extractedClass();
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return extractor.extractValue(in.get(), columnName);
|
||||
return in.get().getRaw(columnName);
|
||||
}
|
||||
};
|
||||
|
||||
if (!deserializeComplexMetrics) {
|
||||
return rawColumnSelector;
|
||||
} else {
|
||||
if (typeName.equals("float")) {
|
||||
return rawColumnSelector;
|
||||
}
|
||||
|
||||
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
|
||||
if (serde == null) {
|
||||
throw new ISE("Don't know how to handle type[%s]", typeName);
|
||||
}
|
||||
|
||||
final ComplexMetricExtractor extractor = serde.getExtractor();
|
||||
return new ObjectColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return extractor.extractedClass();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return extractor.extractValue(in.get(), columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(String dimension)
|
||||
public DimensionSelector makeDimensionSelector(final String dimension)
|
||||
{
|
||||
// we should implement this, but this is going to be rewritten soon anyways
|
||||
throw new UnsupportedOperationException(
|
||||
"Incremental index aggregation does not support dimension selectors"
|
||||
);
|
||||
final String dimensionName = dimension.toLowerCase();
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
final List<String> dimensionValues = in.get().getDimension(dimensionName);
|
||||
final ArrayList<Integer> vals = Lists.newArrayList();
|
||||
if (dimensionValues != null) {
|
||||
for (int i = 0; i < dimensionValues.size(); ++i) {
|
||||
vals.add(i);
|
||||
}
|
||||
}
|
||||
|
||||
return new IndexedInts()
|
||||
{
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return vals.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
return vals.get(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return vals.iterator();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return in.get().getDimension(dimensionName).get(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
return in.get().getDimension(dimensionName).indexOf(name);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -236,7 +305,34 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
.withQueryGranularity(gran)
|
||||
.withMetrics(metrics)
|
||||
.build(),
|
||||
bufferPool
|
||||
bufferPool,
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
public IncrementalIndex(
|
||||
IncrementalIndexSchema incrementalIndexSchema,
|
||||
StupidPool<ByteBuffer> bufferPool
|
||||
)
|
||||
{
|
||||
this(incrementalIndexSchema, bufferPool, true);
|
||||
}
|
||||
|
||||
public IncrementalIndex(
|
||||
long minTimestamp,
|
||||
QueryGranularity gran,
|
||||
final AggregatorFactory[] metrics,
|
||||
StupidPool<ByteBuffer> bufferPool,
|
||||
boolean deserializeComplexMetrics
|
||||
)
|
||||
{
|
||||
this(
|
||||
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
|
||||
.withQueryGranularity(gran)
|
||||
.withMetrics(metrics)
|
||||
.build(),
|
||||
bufferPool,
|
||||
deserializeComplexMetrics
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -244,7 +340,9 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
* Adds a new row. The row might correspond with another row that already exists, in which case this will
|
||||
* update that row instead of inserting a new one.
|
||||
* <p/>
|
||||
* This method is thread-safe.
|
||||
* <p/>
|
||||
* Calls to add() are thread safe.
|
||||
* <p/>
|
||||
*
|
||||
* @param row the row of data to add
|
||||
*
|
||||
|
@ -253,7 +351,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
public int add(InputRow row)
|
||||
{
|
||||
row = spatialDimensionRowFormatter.formatRow(row);
|
||||
|
||||
if (row.getTimestampFromEpoch() < minTimestamp) {
|
||||
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
|
||||
}
|
||||
|
@ -293,7 +390,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
dims = newDims;
|
||||
}
|
||||
|
||||
TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
|
||||
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
|
||||
|
||||
synchronized (this) {
|
||||
if (!facts.containsKey(key)) {
|
||||
|
@ -305,6 +402,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
|
||||
}
|
||||
facts.put(key, rowOffset);
|
||||
|
||||
}
|
||||
}
|
||||
in.set(row);
|
||||
|
|
|
@ -139,7 +139,9 @@ public class ChainedExecutionQueryRunnerTest
|
|||
Assert.assertTrue(future.isCancelled());
|
||||
Assert.assertTrue(runner1.hasStarted);
|
||||
Assert.assertTrue(runner2.hasStarted);
|
||||
Assert.assertFalse(runner3.hasStarted);
|
||||
Assert.assertTrue(runner1.interrupted);
|
||||
Assert.assertTrue(runner2.interrupted);
|
||||
Assert.assertTrue(!runner3.hasStarted || runner3.interrupted);
|
||||
Assert.assertFalse(runner1.hasCompleted);
|
||||
Assert.assertFalse(runner2.hasCompleted);
|
||||
Assert.assertFalse(runner3.hasCompleted);
|
||||
|
@ -243,7 +245,9 @@ public class ChainedExecutionQueryRunnerTest
|
|||
Assert.assertTrue(future.isCancelled());
|
||||
Assert.assertTrue(runner1.hasStarted);
|
||||
Assert.assertTrue(runner2.hasStarted);
|
||||
Assert.assertFalse(runner3.hasStarted);
|
||||
Assert.assertTrue(runner1.interrupted);
|
||||
Assert.assertTrue(runner2.interrupted);
|
||||
Assert.assertTrue(!runner3.hasStarted || runner3.interrupted);
|
||||
Assert.assertFalse(runner1.hasCompleted);
|
||||
Assert.assertFalse(runner2.hasCompleted);
|
||||
Assert.assertFalse(runner3.hasCompleted);
|
||||
|
@ -256,6 +260,7 @@ public class ChainedExecutionQueryRunnerTest
|
|||
private final CountDownLatch latch;
|
||||
private boolean hasStarted = false;
|
||||
private boolean hasCompleted = false;
|
||||
private boolean interrupted = false;
|
||||
|
||||
public DyingQueryRunner(CountDownLatch latch)
|
||||
{
|
||||
|
@ -268,6 +273,7 @@ public class ChainedExecutionQueryRunnerTest
|
|||
hasStarted = true;
|
||||
latch.countDown();
|
||||
if (Thread.interrupted()) {
|
||||
interrupted = true;
|
||||
throw new QueryInterruptedException("I got killed");
|
||||
}
|
||||
|
||||
|
@ -276,6 +282,7 @@ public class ChainedExecutionQueryRunnerTest
|
|||
Thread.sleep(500);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
throw new QueryInterruptedException("I got killed");
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import io.druid.query.aggregation.CountAggregatorFactory;
|
|||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
|
@ -110,6 +111,11 @@ public class QueryRunnerTestHelper
|
|||
"uniques",
|
||||
"quality_uniques"
|
||||
);
|
||||
public static final CardinalityAggregatorFactory qualityCardinality = new CardinalityAggregatorFactory(
|
||||
"cardinality",
|
||||
Arrays.asList("quality"),
|
||||
false
|
||||
);
|
||||
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
|
||||
public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
|
||||
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
|
||||
|
|
|
@ -43,8 +43,13 @@ import io.druid.query.QueryToolChest;
|
|||
import io.druid.query.TestQueryRunners;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.MaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.dimension.ExtractionDimensionSpec;
|
||||
|
@ -53,6 +58,7 @@ import io.druid.query.filter.JavaScriptDimFilter;
|
|||
import io.druid.query.filter.RegexDimFilter;
|
||||
import io.druid.query.groupby.having.EqualToHavingSpec;
|
||||
import io.druid.query.groupby.having.GreaterThanHavingSpec;
|
||||
import io.druid.query.groupby.having.HavingSpec;
|
||||
import io.druid.query.groupby.having.OrHavingSpec;
|
||||
import io.druid.query.groupby.orderby.DefaultLimitSpec;
|
||||
import io.druid.query.groupby.orderby.LimitSpec;
|
||||
|
@ -213,6 +219,36 @@ public class GroupByQueryRunnerTest
|
|||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithCardinality()
|
||||
{
|
||||
GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
QueryRunnerTestHelper.qualityCardinality
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow(
|
||||
"2011-04-01",
|
||||
"rows",
|
||||
26L,
|
||||
"cardinality",
|
||||
QueryRunnerTestHelper.UNIQUES_9
|
||||
)
|
||||
);
|
||||
|
||||
Iterable<Row> results = runQuery(query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithDimExtractionFn()
|
||||
{
|
||||
|
@ -1031,6 +1067,278 @@ public class GroupByQueryRunnerTest
|
|||
Assert.assertFalse(results.iterator().hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubqueryWithPostAggregators()
|
||||
{
|
||||
final GroupByQuery subquery = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx_subagg", "index")
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
Arrays.<PostAggregator>asList(
|
||||
new ArithmeticPostAggregator(
|
||||
"idx_subpostagg", "+", Arrays.<PostAggregator>asList(
|
||||
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
|
||||
new ConstantPostAggregator("thousand", 1000, 1000)
|
||||
)
|
||||
)
|
||||
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
final GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subquery)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
new LongSumAggregatorFactory("rows", "rows"),
|
||||
new LongSumAggregatorFactory("idx", "idx_subpostagg")
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
Arrays.<PostAggregator>asList(
|
||||
new ArithmeticPostAggregator(
|
||||
"idx", "+", Arrays.asList(
|
||||
new FieldAccessPostAggregator("the_idx_agg", "idx"),
|
||||
new ConstantPostAggregator("ten_thousand", 10000, 10000)
|
||||
)
|
||||
)
|
||||
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 11135.0),
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 11118.0),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0),
|
||||
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0),
|
||||
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 13870.0),
|
||||
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0),
|
||||
createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 13900.0),
|
||||
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0),
|
||||
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0),
|
||||
|
||||
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 11147.0),
|
||||
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 11112.0),
|
||||
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 11166.0),
|
||||
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 11113.0),
|
||||
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 13447.0),
|
||||
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 11114.0),
|
||||
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 13505.0),
|
||||
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 11097.0),
|
||||
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 11126.0)
|
||||
);
|
||||
|
||||
// Subqueries are handled by the ToolChest
|
||||
Iterable<Row> results = runQuery(query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubqueryWithPostAggregatorsAndHaving()
|
||||
{
|
||||
final GroupByQuery subquery = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx_subagg", "index")
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
Arrays.<PostAggregator>asList(
|
||||
new ArithmeticPostAggregator(
|
||||
"idx_subpostagg",
|
||||
"+",
|
||||
Arrays.asList(
|
||||
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
|
||||
new ConstantPostAggregator("thousand", 1000, 1000)
|
||||
)
|
||||
)
|
||||
|
||||
)
|
||||
)
|
||||
.setHavingSpec(
|
||||
new HavingSpec()
|
||||
{
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
return (row.getFloatMetric("idx_subpostagg") < 3800);
|
||||
}
|
||||
}
|
||||
)
|
||||
.addOrderByColumn("alias")
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
final GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subquery)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
new LongSumAggregatorFactory("rows", "rows"),
|
||||
new LongSumAggregatorFactory("idx", "idx_subpostagg")
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
Arrays.<PostAggregator>asList(
|
||||
new ArithmeticPostAggregator(
|
||||
"idx", "+", Arrays.asList(
|
||||
new FieldAccessPostAggregator("the_idx_agg", "idx"),
|
||||
new ConstantPostAggregator("ten_thousand", 10000, 10000)
|
||||
)
|
||||
)
|
||||
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 11135.0),
|
||||
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 11118.0),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0),
|
||||
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0),
|
||||
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0),
|
||||
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0),
|
||||
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0),
|
||||
|
||||
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 11147.0),
|
||||
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 11112.0),
|
||||
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 11166.0),
|
||||
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 11113.0),
|
||||
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 13447.0),
|
||||
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 11114.0),
|
||||
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 13505.0),
|
||||
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 11097.0),
|
||||
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 11126.0)
|
||||
);
|
||||
|
||||
// Subqueries are handled by the ToolChest
|
||||
Iterable<Row> results = runQuery(query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubqueryWithMultiColumnAggregators()
|
||||
{
|
||||
final GroupByQuery subquery = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setDimFilter(new JavaScriptDimFilter("provider", "function(dim){ return true; }"))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new DoubleSumAggregatorFactory("idx_subagg", "index"),
|
||||
new JavaScriptAggregatorFactory(
|
||||
"js_agg",
|
||||
Arrays.asList("index", "provider"),
|
||||
"function(current, index, dim){return current + index + dim.length;}",
|
||||
"function(){return 0;}",
|
||||
"function(a,b){return a + b;}"
|
||||
)
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
Arrays.<PostAggregator>asList(
|
||||
new ArithmeticPostAggregator(
|
||||
"idx_subpostagg",
|
||||
"+",
|
||||
Arrays.asList(
|
||||
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
|
||||
new ConstantPostAggregator("thousand", 1000, 1000)
|
||||
)
|
||||
)
|
||||
|
||||
)
|
||||
)
|
||||
.setHavingSpec(
|
||||
new HavingSpec()
|
||||
{
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
return (row.getFloatMetric("idx_subpostagg") < 3800);
|
||||
}
|
||||
}
|
||||
)
|
||||
.addOrderByColumn("alias")
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
final GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subquery)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
new LongSumAggregatorFactory("rows", "rows"),
|
||||
new LongSumAggregatorFactory("idx", "idx_subpostagg"),
|
||||
new DoubleSumAggregatorFactory("js_outer_agg", "js_agg")
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
Arrays.<PostAggregator>asList(
|
||||
new ArithmeticPostAggregator(
|
||||
"idx", "+", Arrays.asList(
|
||||
new FieldAccessPostAggregator("the_idx_agg", "idx"),
|
||||
new ConstantPostAggregator("ten_thousand", 10000, 10000)
|
||||
)
|
||||
)
|
||||
|
||||
)
|
||||
)
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
Arrays.asList(
|
||||
new OrderByColumnSpec(
|
||||
"alias",
|
||||
OrderByColumnSpec.Direction.DESCENDING
|
||||
)
|
||||
),
|
||||
5
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0, "js_outer_agg", 123.92274475097656),
|
||||
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0, "js_outer_agg", 82.62254333496094),
|
||||
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0, "js_outer_agg", 125.58358001708984),
|
||||
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0, "js_outer_agg", 124.13470458984375),
|
||||
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0, "js_outer_agg", 162.74722290039062)
|
||||
);
|
||||
|
||||
// Subqueries are handled by the ToolChest
|
||||
Iterable<Row> results = runQuery(query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
private Iterable<Row> runQuery(GroupByQuery query)
|
||||
{
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -18,8 +18,7 @@
|
|||
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-server</artifactId>
|
||||
|
@ -29,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.122-SNAPSHOT</version>
|
||||
<version>0.6.124-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
Loading…
Reference in New Issue