Merge pull request #593 from metamx/fix-groupyb

Support all standard Druid functions in GroupBy and nested groupBy
This commit is contained in:
fjy 2014-06-13 15:37:55 -06:00
commit d5d2873d3c
22 changed files with 794 additions and 178 deletions

View File

@ -88,7 +88,6 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
@Override @Override
public Sequence<Row> run(final Query<Row> queryParam) public Sequence<Row> run(final Query<Row> queryParam)
{ {
final GroupByQuery query = (GroupByQuery) queryParam; final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query, query,

View File

@ -26,9 +26,9 @@ import java.util.List;
/** /**
* Processing related interface * Processing related interface
* * <p/>
* An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory. * An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory.
* * <p/>
* This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects * This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects
* without making any assumptions about how they are pulling values out of the base data. That is, the data is * without making any assumptions about how they are pulling values out of the base data. That is, the data is
* provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how * provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how
@ -37,7 +37,9 @@ import java.util.List;
public interface AggregatorFactory public interface AggregatorFactory
{ {
public Aggregator factorize(ColumnSelectorFactory metricFactory); public Aggregator factorize(ColumnSelectorFactory metricFactory);
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory); public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory);
public Comparator getComparator(); public Comparator getComparator();
/** /**
@ -48,6 +50,7 @@ public interface AggregatorFactory
* *
* @param lhs The left hand side of the combine * @param lhs The left hand side of the combine
* @param rhs The right hand side of the combine * @param rhs The right hand side of the combine
*
* @return an object representing the combination of lhs and rhs, this can be a new object or a mutation of the inputs * @return an object representing the combination of lhs and rhs, this can be a new object or a mutation of the inputs
*/ */
public Object combine(Object lhs, Object rhs); public Object combine(Object lhs, Object rhs);
@ -61,11 +64,19 @@ public interface AggregatorFactory
*/ */
public AggregatorFactory getCombiningFactory(); public AggregatorFactory getCombiningFactory();
/**
* Gets a list of all columns that this AggregatorFactory will scan
*
* @return AggregatorFactories for the columns to scan of the parent AggregatorFactory
*/
public List<AggregatorFactory> getRequiredColumns();
/** /**
* A method that knows how to "deserialize" the object from whatever form it might have been put into * A method that knows how to "deserialize" the object from whatever form it might have been put into
* in order to transfer via JSON. * in order to transfer via JSON.
* *
* @param object the object to deserialize * @param object the object to deserialize
*
* @return the deserialized object * @return the deserialized object
*/ */
public Object deserialize(Object object); public Object deserialize(Object object);
@ -75,13 +86,17 @@ public interface AggregatorFactory
* intermediate format than their final resultant output. * intermediate format than their final resultant output.
* *
* @param object the object to be finalized * @param object the object to be finalized
*
* @return the finalized value that should be returned for the initial query * @return the finalized value that should be returned for the initial query
*/ */
public Object finalizeComputation(Object object); public Object finalizeComputation(Object object);
public String getName(); public String getName();
public List<String> requiredFields(); public List<String> requiredFields();
public byte[] getCacheKey(); public byte[] getCacheKey();
public String getTypeName(); public String getTypeName();
/** /**

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory;
import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -76,6 +77,12 @@ public class CountAggregatorFactory implements AggregatorFactory
return new LongSumAggregatorFactory(name, name); return new LongSumAggregatorFactory(name, name);
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new CountAggregatorFactory(name));
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {
@ -136,12 +143,18 @@ public class CountAggregatorFactory implements AggregatorFactory
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CountAggregatorFactory that = (CountAggregatorFactory) o; CountAggregatorFactory that = (CountAggregatorFactory) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false; if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true; return true;
} }

View File

@ -85,6 +85,12 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
return new DoubleSumAggregatorFactory(name, name); return new DoubleSumAggregatorFactory(name, name);
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new DoubleSumAggregatorFactory(fieldName, fieldName));
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {
@ -158,13 +164,21 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o; DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
if (name != null ? !name.equals(that.name) : that.name != null) return false; return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true; return true;
} }

View File

@ -56,7 +56,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;
this.breaksList = (breaksList == null) ? Lists.<Float>newArrayList() :breaksList; this.breaksList = (breaksList == null) ? Lists.<Float>newArrayList() : breaksList;
this.breaks = new float[this.breaksList.size()]; this.breaks = new float[this.breaksList.size()];
for (int i = 0; i < this.breaksList.size(); ++i) { for (int i = 0; i < this.breaksList.size(); ++i) {
this.breaks[i] = this.breaksList.get(i); this.breaks[i] = this.breaksList.get(i);
@ -100,6 +100,12 @@ public class HistogramAggregatorFactory implements AggregatorFactory
return new HistogramAggregatorFactory(name, name, breaksList); return new HistogramAggregatorFactory(name, name, breaksList);
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new HistogramAggregatorFactory(fieldName, fieldName, breaksList));
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {
@ -183,15 +189,27 @@ public class HistogramAggregatorFactory implements AggregatorFactory
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HistogramAggregatorFactory that = (HistogramAggregatorFactory) o; HistogramAggregatorFactory that = (HistogramAggregatorFactory) o;
if (!Arrays.equals(breaks, that.breaks)) return false; if (!Arrays.equals(breaks, that.breaks)) {
if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) return false; return false;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; }
if (name != null ? !name.equals(that.name) : that.name != null) return false; if (breaksList != null ? !breaksList.equals(that.breaksList) : that.breaksList != null) {
return false;
}
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true; return true;
} }

View File

@ -140,6 +140,22 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine); return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Lists.transform(
fieldNames,
new com.google.common.base.Function<String, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(String input)
{
return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine);
}
}
);
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {

View File

@ -31,11 +31,11 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
/** /**
*/ */
public class LongSumAggregatorFactory implements AggregatorFactory public class LongSumAggregatorFactory implements AggregatorFactory
{ {
private static final byte CACHE_TYPE_ID = 0x1; private static final byte CACHE_TYPE_ID = 0x1;
private final String fieldName; private final String fieldName;
private final String name; private final String name;
@ -85,6 +85,12 @@ public class LongSumAggregatorFactory implements AggregatorFactory
return new LongSumAggregatorFactory(name, name); return new LongSumAggregatorFactory(name, name);
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongSumAggregatorFactory(fieldName, fieldName));
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {
@ -154,13 +160,21 @@ public class LongSumAggregatorFactory implements AggregatorFactory
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LongSumAggregatorFactory that = (LongSumAggregatorFactory) o; LongSumAggregatorFactory that = (LongSumAggregatorFactory) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) return false; if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
if (name != null ? !name.equals(that.name) : that.name != null) return false; return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true; return true;
} }

View File

@ -82,6 +82,12 @@ public class MaxAggregatorFactory implements AggregatorFactory
return new MaxAggregatorFactory(name, name); return new MaxAggregatorFactory(name, name);
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new MaxAggregatorFactory(fieldName, fieldName));
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {

View File

@ -82,6 +82,12 @@ public class MinAggregatorFactory implements AggregatorFactory
return new MinAggregatorFactory(name, name); return new MinAggregatorFactory(name, name);
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new MinAggregatorFactory(fieldName, fieldName));
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {

View File

@ -65,6 +65,12 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory
return baseAggregatorFactory.getCombiningFactory(); return baseAggregatorFactory.getCombiningFactory();
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return baseAggregatorFactory.getRequiredColumns();
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {

View File

@ -32,12 +32,14 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.Aggregators; import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -142,7 +144,23 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
@Override @Override
public AggregatorFactory getCombiningFactory() public AggregatorFactory getCombiningFactory()
{ {
return new CardinalityAggregatorFactory(name, fieldNames, byRow); return new HyperUniquesAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Lists.transform(
fieldNames,
new Function<String, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(String input)
{
return new CardinalityAggregatorFactory(input, fieldNames, byRow);
}
}
);
} }
@Override @Override

View File

@ -73,12 +73,13 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
return Aggregators.noopAggregator(); return Aggregators.noopAggregator();
} }
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) { final Class classOfObject = selector.classOfObject();
if (classOfObject.equals(Object.class) || HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
return new HyperUniquesAggregator(name, selector); return new HyperUniquesAggregator(name, selector);
} }
throw new IAE( throw new IAE(
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject() "Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, classOfObject
); );
} }
@ -91,12 +92,13 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
return Aggregators.noopBufferAggregator(); return Aggregators.noopBufferAggregator();
} }
if (HyperLogLogCollector.class.isAssignableFrom(selector.classOfObject())) { final Class classOfObject = selector.classOfObject();
if (classOfObject.equals(Object.class) || HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
return new HyperUniquesBufferAggregator(selector); return new HyperUniquesBufferAggregator(selector);
} }
throw new IAE( throw new IAE(
"Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, selector.classOfObject() "Incompatible type for metric[%s], expected a HyperUnique, got a %s", fieldName, classOfObject
); );
} }
@ -131,6 +133,12 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
return new HyperUniquesAggregatorFactory(name, name); return new HyperUniquesAggregatorFactory(name, name);
} }
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new HyperUniquesAggregatorFactory(fieldName, fieldName));
}
@Override @Override
public Object deserialize(Object object) public Object deserialize(Object object)
{ {

View File

@ -72,7 +72,7 @@ public class GroupByQuery extends BaseQuery<Row>
private final List<AggregatorFactory> aggregatorSpecs; private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs; private final List<PostAggregator> postAggregatorSpecs;
private final Function<Sequence<Row>, Sequence<Row>> orderByLimitFn; private final Function<Sequence<Row>, Sequence<Row>> limitFn;
@JsonCreator @JsonCreator
public GroupByQuery( public GroupByQuery(
@ -85,8 +85,9 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs, @JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") HavingSpec havingSpec, @JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec, @JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("orderBy") LimitSpec orderBySpec, @JsonProperty("context") Map<String, Object> context,
@JsonProperty("context") Map<String, Object> context // Backwards compatible
@JsonProperty("orderBy") LimitSpec orderBySpec
) )
{ {
super(dataSource, querySegmentSpec, context); super(dataSource, querySegmentSpec, context);
@ -129,7 +130,7 @@ public class GroupByQuery extends BaseQuery<Row>
); );
} }
orderByLimitFn = postProcFn; limitFn = postProcFn;
} }
/** /**
@ -146,7 +147,7 @@ public class GroupByQuery extends BaseQuery<Row>
List<PostAggregator> postAggregatorSpecs, List<PostAggregator> postAggregatorSpecs,
HavingSpec havingSpec, HavingSpec havingSpec,
LimitSpec orderBySpec, LimitSpec orderBySpec,
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn, Function<Sequence<Row>, Sequence<Row>> limitFn,
Map<String, Object> context Map<String, Object> context
) )
{ {
@ -159,7 +160,7 @@ public class GroupByQuery extends BaseQuery<Row>
this.postAggregatorSpecs = postAggregatorSpecs; this.postAggregatorSpecs = postAggregatorSpecs;
this.havingSpec = havingSpec; this.havingSpec = havingSpec;
this.limitSpec = orderBySpec; this.limitSpec = orderBySpec;
this.orderByLimitFn = orderByLimitFn; this.limitFn = limitFn;
} }
@JsonProperty("filter") @JsonProperty("filter")
@ -199,7 +200,7 @@ public class GroupByQuery extends BaseQuery<Row>
} }
@JsonProperty @JsonProperty
public LimitSpec getOrderBy() public LimitSpec getLimitSpec()
{ {
return limitSpec; return limitSpec;
} }
@ -218,7 +219,7 @@ public class GroupByQuery extends BaseQuery<Row>
public Sequence<Row> applyLimit(Sequence<Row> results) public Sequence<Row> applyLimit(Sequence<Row> results)
{ {
return orderByLimitFn.apply(results); return limitFn.apply(results);
} }
@Override @Override
@ -234,7 +235,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs, postAggregatorSpecs,
havingSpec, havingSpec,
limitSpec, limitSpec,
orderByLimitFn, limitFn,
computeOverridenContext(contextOverride) computeOverridenContext(contextOverride)
); );
} }
@ -252,7 +253,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs, postAggregatorSpecs,
havingSpec, havingSpec,
limitSpec, limitSpec,
orderByLimitFn, limitFn,
getContext() getContext()
); );
} }
@ -270,7 +271,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs, postAggregatorSpecs,
havingSpec, havingSpec,
limitSpec, limitSpec,
orderByLimitFn, limitFn,
getContext() getContext()
); );
} }
@ -292,11 +293,25 @@ public class GroupByQuery extends BaseQuery<Row>
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList(); private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
private int limit = Integer.MAX_VALUE; private int limit = Integer.MAX_VALUE;
private Builder() public Builder()
{ {
} }
private Builder(Builder builder) public Builder(GroupByQuery query)
{
dataSource = query.getDataSource();
querySegmentSpec = query.getQuerySegmentSpec();
limitSpec = query.getLimitSpec();
dimFilter = query.getDimFilter();
granularity = query.getGranularity();
dimensions = query.getDimensions();
aggregatorSpecs = query.getAggregatorSpecs();
postAggregatorSpecs = query.getPostAggregatorSpecs();
havingSpec = query.getHavingSpec();
context = query.getContext();
}
public Builder(Builder builder)
{ {
dataSource = builder.dataSource; dataSource = builder.dataSource;
querySegmentSpec = builder.querySegmentSpec; querySegmentSpec = builder.querySegmentSpec;
@ -490,7 +505,11 @@ public class GroupByQuery extends BaseQuery<Row>
{ {
final LimitSpec theLimitSpec; final LimitSpec theLimitSpec;
if (limitSpec == null) { if (limitSpec == null) {
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) {
theLimitSpec = new NoopLimitSpec();
} else {
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
}
} else { } else {
theLimitSpec = limitSpec; theLimitSpec = limitSpec;
} }
@ -504,9 +523,9 @@ public class GroupByQuery extends BaseQuery<Row>
aggregatorSpecs, aggregatorSpecs,
postAggregatorSpecs, postAggregatorSpecs,
havingSpec, havingSpec,
null,
theLimitSpec, theLimitSpec,
context context,
null
); );
} }
} }
@ -515,36 +534,57 @@ public class GroupByQuery extends BaseQuery<Row>
public String toString() public String toString()
{ {
return "GroupByQuery{" + return "GroupByQuery{" +
"limitSpec=" + limitSpec + "limitSpec=" + limitSpec +
", dimFilter=" + dimFilter + ", dimFilter=" + dimFilter +
", granularity=" + granularity + ", granularity=" + granularity +
", dimensions=" + dimensions + ", dimensions=" + dimensions +
", aggregatorSpecs=" + aggregatorSpecs + ", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs + ", postAggregatorSpecs=" + postAggregatorSpecs +
", orderByLimitFn=" + orderByLimitFn + ", limitFn=" + limitFn +
'}'; '}';
} }
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
if (!super.equals(o)) return false; }
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
GroupByQuery that = (GroupByQuery) o; GroupByQuery that = (GroupByQuery) o;
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) if (aggregatorSpecs != null ? !aggregatorSpecs.equals(that.aggregatorSpecs) : that.aggregatorSpecs != null) {
return false; return false;
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) return false; }
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) return false; if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) return false;
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) return false;
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) return false;
if (orderByLimitFn != null ? !orderByLimitFn.equals(that.orderByLimitFn) : that.orderByLimitFn != null)
return false; return false;
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null) }
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false; return false;
}
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) {
return false;
}
if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) {
return false;
}
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) {
return false;
}
if (limitFn != null ? !limitFn.equals(that.limitFn) : that.limitFn != null) {
return false;
}
if (postAggregatorSpecs != null
? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
: that.postAggregatorSpecs != null) {
return false;
}
return true; return true;
} }
@ -560,7 +600,7 @@ public class GroupByQuery extends BaseQuery<Row>
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0); result = 31 * result + (limitFn != null ? limitFn.hashCode() : 0);
return result; return result;
} }
} }

View File

@ -24,12 +24,14 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.List; import java.util.List;
@ -53,7 +55,7 @@ public class GroupByQueryHelper
new Function<AggregatorFactory, AggregatorFactory>() new Function<AggregatorFactory, AggregatorFactory>()
{ {
@Override @Override
public AggregatorFactory apply(@Nullable AggregatorFactory input) public AggregatorFactory apply(AggregatorFactory input)
{ {
return input.getCombiningFactory(); return input.getCombiningFactory();
} }
@ -64,7 +66,7 @@ public class GroupByQueryHelper
new Function<DimensionSpec, String>() new Function<DimensionSpec, String>()
{ {
@Override @Override
public String apply(@Nullable DimensionSpec input) public String apply(DimensionSpec input)
{ {
return input.getOutputName(); return input.getOutputName();
} }
@ -83,14 +85,14 @@ public class GroupByQueryHelper
@Override @Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{ {
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) { if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions), false) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
} }
return accumulated; return accumulated;
} }
}; };
return new Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>>(index, accumulator); return new Pair<>(index, accumulator);
} }
} }

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.Pair; import com.metamx.common.Pair;
@ -44,11 +45,13 @@ import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner; import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Minutes; import org.joda.time.Minutes;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -59,7 +62,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{ {
}; };
private static final String GROUP_BY_MERGE_KEY = "groupByMerge"; private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(GROUP_BY_MERGE_KEY, "false"); private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(
GROUP_BY_MERGE_KEY,
"false"
);
private final Supplier<GroupByQueryConfig> configSupplier; private final Supplier<GroupByQueryConfig> configSupplier;
private GroupByQueryEngine engine; // For running the outer query around a subquery private GroupByQueryEngine engine; // For running the outer query around a subquery
@ -81,7 +87,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override @Override
public Sequence<Row> run(Query<Row> input) public Sequence<Row> run(Query<Row> input)
{ {
if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner); return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
} else { } else {
return runner.run(input); return runner.run(input);
@ -92,33 +98,45 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner) private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
{ {
Sequence<Row> result;
// If there's a subquery, merge subquery results and then apply the aggregator // If there's a subquery, merge subquery results and then apply the aggregator
DataSource dataSource = query.getDataSource(); final DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) { if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery; GroupByQuery subquery;
try { try {
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery(); subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery();
} catch (ClassCastException e) { }
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
} }
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner); final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
IncrementalIndexStorageAdapter adapter final List<AggregatorFactory> aggs = Lists.newArrayList();
= new IncrementalIndexStorageAdapter(makeIncrementalIndex(subquery, subqueryResult)); for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
result = engine.process(query, adapter); aggs.addAll(aggregatorFactory.getRequiredColumns());
}
// We need the inner incremental index to have all the columns required by the outer query
final GroupByQuery innerQuery = new GroupByQuery.Builder(query)
.setAggregatorSpecs(aggs)
.setInterval(subquery.getIntervals())
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList())
.build();
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build();
final IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(
makeIncrementalIndex(innerQuery, subqueryResult)
);
return outerQuery.applyLimit(engine.process(outerQuery, adapter));
} else { } else {
result = runner.run(query); return query.applyLimit(postAggregate(query, makeIncrementalIndex(query, runner.run(query))));
} }
return postAggregate(query, makeIncrementalIndex(query, result));
} }
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index) private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
{ {
Sequence<Row> sequence = Sequences.map( return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>() new Function<Row, Row>()
{ {
@ -128,13 +146,12 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final MapBasedRow row = (MapBasedRow) input; final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow( return new MapBasedRow(
query.getGranularity() query.getGranularity()
.toDateTime(row.getTimestampFromEpoch()), .toDateTime(row.getTimestampFromEpoch()),
row.getEvent() row.getEvent()
); );
} }
} }
); );
return query.applyLimit(sequence);
} }
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows) private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
@ -152,7 +169,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override @Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences) public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{ {
return new ConcatSequence<Row>(seqOfSequences); return new ConcatSequence<>(seqOfSequences);
} }
@Override @Override

View File

@ -87,12 +87,17 @@ public class DefaultLimitSpec implements LimitSpec
if (limit == Integer.MAX_VALUE) { if (limit == Integer.MAX_VALUE) {
return new SortingFn(ordering); return new SortingFn(ordering);
} } else {
else {
return new TopNFunction(ordering, limit); return new TopNFunction(ordering, limit);
} }
} }
@Override
public LimitSpec merge(LimitSpec other)
{
return this;
}
private Ordering<Row> makeComparator( private Ordering<Row> makeComparator(
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
) )
@ -200,12 +205,18 @@ public class DefaultLimitSpec implements LimitSpec
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LimitingFn that = (LimitingFn) o; LimitingFn that = (LimitingFn) o;
if (limit != that.limit) return false; if (limit != that.limit) {
return false;
}
return true; return true;
} }
@ -232,12 +243,18 @@ public class DefaultLimitSpec implements LimitSpec
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SortingFn sortingFn = (SortingFn) o; SortingFn sortingFn = (SortingFn) o;
if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) return false; if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) {
return false;
}
return true; return true;
} }
@ -273,13 +290,21 @@ public class DefaultLimitSpec implements LimitSpec
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TopNFunction that = (TopNFunction) o; TopNFunction that = (TopNFunction) o;
if (limit != that.limit) return false; if (limit != that.limit) {
if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) return false; return false;
}
if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) {
return false;
}
return true; return true;
} }
@ -296,13 +321,21 @@ public class DefaultLimitSpec implements LimitSpec
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultLimitSpec that = (DefaultLimitSpec) o; DefaultLimitSpec that = (DefaultLimitSpec) o;
if (limit != that.limit) return false; if (limit != that.limit) {
if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false; return false;
}
if (columns != null ? !columns.equals(that.columns) : that.columns != null) {
return false;
}
return true; return true;
} }

View File

@ -38,5 +38,11 @@ import java.util.List;
}) })
public interface LimitSpec public interface LimitSpec
{ {
public Function<Sequence<Row>, Sequence<Row>> build(List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs); public Function<Sequence<Row>, Sequence<Row>> build(
List<DimensionSpec> dimensions,
List<AggregatorFactory> aggs,
List<PostAggregator> postAggs
);
public LimitSpec merge(LimitSpec other);
} }

View File

@ -41,6 +41,12 @@ public class NoopLimitSpec implements LimitSpec
return Functions.identity(); return Functions.identity();
} }
@Override
public LimitSpec merge(LimitSpec other)
{
return other;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -26,7 +26,6 @@ import io.druid.query.QueryRunnerHelper;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.Cursor; import io.druid.segment.Cursor;
import io.druid.segment.StorageAdapter; import io.druid.segment.StorageAdapter;
import io.druid.segment.filter.Filters; import io.druid.segment.filter.Filters;
@ -46,45 +45,43 @@ public class TimeseriesQueryEngine
} }
return QueryRunnerHelper.makeCursorBasedQuery( return QueryRunnerHelper.makeCursorBasedQuery(
adapter, adapter,
query.getQuerySegmentSpec().getIntervals(), query.getQuerySegmentSpec().getIntervals(),
Filters.convertDimensionFilters(query.getDimensionsFilter()), Filters.convertDimensionFilters(query.getDimensionsFilter()),
query.getGranularity(), query.getGranularity(),
new Function<Cursor, Result<TimeseriesResultValue>>() new Function<Cursor, Result<TimeseriesResultValue>>()
{ {
private final List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs(); private final List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggregatorSpecs = query.getPostAggregatorSpecs();
@Override @Override
public Result<TimeseriesResultValue> apply(Cursor cursor) public Result<TimeseriesResultValue> apply(Cursor cursor)
{ {
Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs); Aggregator[] aggregators = QueryRunnerHelper.makeAggregators(cursor, aggregatorSpecs);
try {
try { while (!cursor.isDone()) {
while (!cursor.isDone()) { for (Aggregator aggregator : aggregators) {
for (Aggregator aggregator : aggregators) { aggregator.aggregate();
aggregator.aggregate();
}
cursor.advance();
}
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime());
for (Aggregator aggregator : aggregators) {
bob.addMetric(aggregator);
}
Result<TimeseriesResultValue> retVal = bob.build();
return retVal;
}
finally {
// cleanup
for (Aggregator agg : aggregators) {
agg.close();
}
}
}
} }
cursor.advance();
}
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(cursor.getTime());
for (Aggregator aggregator : aggregators) {
bob.addMetric(aggregator);
}
Result<TimeseriesResultValue> retVal = bob.build();
return retVal;
}
finally {
// cleanup
for (Aggregator agg : aggregators) {
agg.close();
}
}
}
}
); );
} }
} }

View File

@ -46,6 +46,7 @@ import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector; import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector; import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TimestampColumnSelector; import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor; import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics; import io.druid.segment.serde.ComplexMetrics;
@ -53,6 +54,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -133,17 +135,29 @@ public class IncrementalIndex implements Iterable<Row>
); );
} }
public int add(InputRow row)
{
// this is an ugly workaround to call ComplexMetricExtractor.extractValue at ingestion time
return add(row, true);
}
/** /**
* Adds a new row. The row might correspond with another row that already exists, in which case this will * Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one. * update that row instead of inserting a new one.
* <p/> * <p/>
* This is *not* thread-safe. Calls to add() should always happen on the same thread. *
* Calls to add() are thread safe.
*
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
* *
* @param row the row of data to add * @param row the row of data to add
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
* *
* @return the number of rows in the data set after adding the InputRow * @return the number of rows in the data set after adding the InputRow
*/ */
public int add(InputRow row) public int add(InputRow row, final boolean deserializeComplexMetrics)
{ {
row = spatialDimensionRowFormatter.formatRow(row); row = spatialDimensionRowFormatter.formatRow(row);
@ -186,7 +200,7 @@ public class IncrementalIndex implements Iterable<Row>
dims = newDims; dims = newDims;
} }
TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Aggregator[] aggs = facts.get(key); Aggregator[] aggs = facts.get(key);
if (aggs == null) { if (aggs == null) {
@ -231,54 +245,108 @@ public class IncrementalIndex implements Iterable<Row>
final String typeName = agg.getTypeName(); final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase(); final String columnName = column.toLowerCase();
if (typeName.equals("float")) { final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
return new ObjectColumnSelector<Float>()
{ {
@Override @Override
public Class classOfObject() public Class classOfObject()
{ {
return Float.TYPE; return Object.class;
} }
@Override @Override
public Float get() public Object get()
{ {
return in.getFloatMetric(columnName); return in.getRaw(columnName);
}
};
if(!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
} }
}; };
} }
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
}
};
} }
@Override @Override
public DimensionSelector makeDimensionSelector(String dimension) public DimensionSelector makeDimensionSelector(final String dimension)
{ {
// we should implement this, but this is going to be rewritten soon anyways final String dimensionName = dimension.toLowerCase();
throw new UnsupportedOperationException( return new DimensionSelector()
"Incremental index aggregation does not support dimension selectors" {
); @Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.getDimension(dimensionName);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.getDimension(dimensionName).get(id);
}
@Override
public int lookupId(String name)
{
return in.getDimension(dimensionName).indexOf(name);
}
};
} }
} }

View File

@ -28,6 +28,7 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ArithmeticPostAggregator;
@ -110,6 +111,11 @@ public class QueryRunnerTestHelper
"uniques", "uniques",
"quality_uniques" "quality_uniques"
); );
public static final CardinalityAggregatorFactory qualityCardinality = new CardinalityAggregatorFactory(
"cardinality",
Arrays.asList("quality"),
false
);
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null); public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");

View File

@ -42,8 +42,13 @@ import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.MaxAggregatorFactory; import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ExtractionDimensionSpec; import io.druid.query.dimension.ExtractionDimensionSpec;
@ -52,6 +57,7 @@ import io.druid.query.filter.JavaScriptDimFilter;
import io.druid.query.filter.RegexDimFilter; import io.druid.query.filter.RegexDimFilter;
import io.druid.query.groupby.having.EqualToHavingSpec; import io.druid.query.groupby.having.EqualToHavingSpec;
import io.druid.query.groupby.having.GreaterThanHavingSpec; import io.druid.query.groupby.having.GreaterThanHavingSpec;
import io.druid.query.groupby.having.HavingSpec;
import io.druid.query.groupby.having.OrHavingSpec; import io.druid.query.groupby.having.OrHavingSpec;
import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.groupby.orderby.LimitSpec; import io.druid.query.groupby.orderby.LimitSpec;
@ -211,6 +217,36 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testGroupByWithCardinality()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.qualityCardinality
)
)
.setGranularity(QueryRunnerTestHelper.allGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow(
"2011-04-01",
"rows",
26L,
"cardinality",
QueryRunnerTestHelper.UNIQUES_9
)
);
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test @Test
public void testGroupByWithDimExtractionFn() public void testGroupByWithDimExtractionFn()
{ {
@ -1029,6 +1065,278 @@ public class GroupByQueryRunnerTest
Assert.assertFalse(results.iterator().hasNext()); Assert.assertFalse(results.iterator().hasNext());
} }
@Test
public void testSubqueryWithPostAggregators()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx_subagg", "index")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"idx_subpostagg", "+", Arrays.<PostAggregator>asList(
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
new ConstantPostAggregator("thousand", 1000, 1000)
)
)
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new LongSumAggregatorFactory("rows", "rows"),
new LongSumAggregatorFactory("idx", "idx_subpostagg")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"idx", "+", Arrays.asList(
new FieldAccessPostAggregator("the_idx_agg", "idx"),
new ConstantPostAggregator("ten_thousand", 10000, 10000)
)
)
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 11135.0),
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 11118.0),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 13870.0),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 13900.0),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0),
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 11147.0),
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 11112.0),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 11166.0),
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 11113.0),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 13447.0),
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 11114.0),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 13505.0),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 11097.0),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 11126.0)
);
// Subqueries are handled by the ToolChest
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testSubqueryWithPostAggregatorsAndHaving()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx_subagg", "index")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"idx_subpostagg",
"+",
Arrays.asList(
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
new ConstantPostAggregator("thousand", 1000, 1000)
)
)
)
)
.setHavingSpec(
new HavingSpec()
{
@Override
public boolean eval(Row row)
{
return (row.getFloatMetric("idx_subpostagg") < 3800);
}
}
)
.addOrderByColumn("alias")
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new LongSumAggregatorFactory("rows", "rows"),
new LongSumAggregatorFactory("idx", "idx_subpostagg")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"idx", "+", Arrays.asList(
new FieldAccessPostAggregator("the_idx_agg", "idx"),
new ConstantPostAggregator("ten_thousand", 10000, 10000)
)
)
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 11135.0),
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 11118.0),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0),
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 11147.0),
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 11112.0),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 11166.0),
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 11113.0),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 13447.0),
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 11114.0),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 13505.0),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 11097.0),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 11126.0)
);
// Subqueries are handled by the ToolChest
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testSubqueryWithMultiColumnAggregators()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setDimFilter(new JavaScriptDimFilter("provider", "function(dim){ return true; }"))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new DoubleSumAggregatorFactory("idx_subagg", "index"),
new JavaScriptAggregatorFactory(
"js_agg",
Arrays.asList("index", "provider"),
"function(current, index, dim){return current + index + dim.length;}",
"function(){return 0;}",
"function(a,b){return a + b;}"
)
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"idx_subpostagg",
"+",
Arrays.asList(
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
new ConstantPostAggregator("thousand", 1000, 1000)
)
)
)
)
.setHavingSpec(
new HavingSpec()
{
@Override
public boolean eval(Row row)
{
return (row.getFloatMetric("idx_subpostagg") < 3800);
}
}
)
.addOrderByColumn("alias")
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new LongSumAggregatorFactory("rows", "rows"),
new LongSumAggregatorFactory("idx", "idx_subpostagg"),
new DoubleSumAggregatorFactory("js_outer_agg", "js_agg")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"idx", "+", Arrays.asList(
new FieldAccessPostAggregator("the_idx_agg", "idx"),
new ConstantPostAggregator("ten_thousand", 10000, 10000)
)
)
)
)
.setLimitSpec(
new DefaultLimitSpec(
Arrays.asList(
new OrderByColumnSpec(
"alias",
OrderByColumnSpec.Direction.DESCENDING
)
),
5
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0, "js_outer_agg", 123.92274475097656),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0, "js_outer_agg", 82.62254333496094),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0, "js_outer_agg", 125.58358001708984),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0, "js_outer_agg", 124.13470458984375),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0, "js_outer_agg", 162.74722290039062)
);
// Subqueries are handled by the ToolChest
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
private Iterable<Row> runQuery(GroupByQuery query) private Iterable<Row> runQuery(GroupByQuery query)
{ {