diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java index 200e207b867..3c48c4cecbc 100644 --- a/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java +++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java @@ -22,11 +22,14 @@ package com.metamx.druid.query.group; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; +import com.google.common.base.Functions; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; import com.metamx.druid.BaseQuery; import com.metamx.druid.Query; import com.metamx.druid.QueryGranularity; @@ -37,13 +40,15 @@ import com.metamx.druid.query.Queries; import com.metamx.druid.query.dimension.DefaultDimensionSpec; import com.metamx.druid.query.dimension.DimensionSpec; import com.metamx.druid.query.filter.DimFilter; -import com.metamx.druid.query.group.limit.DefaultLimitSpec; -import com.metamx.druid.query.group.limit.LimitSpec; -import com.metamx.druid.query.group.limit.NoopLimitSpec; -import com.metamx.druid.query.group.limit.OrderByColumnSpec; +import com.metamx.druid.query.group.having.HavingSpec; +import com.metamx.druid.query.group.orderby.DefaultLimitSpec; +import com.metamx.druid.query.group.orderby.LimitSpec; +import com.metamx.druid.query.group.orderby.NoopLimitSpec; +import com.metamx.druid.query.group.orderby.OrderByColumnSpec; import com.metamx.druid.query.segment.LegacySegmentSpec; import com.metamx.druid.query.segment.QuerySegmentSpec; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -57,6 +62,7 @@ public class GroupByQuery extends BaseQuery } private final LimitSpec limitSpec; + private final HavingSpec havingSpec; private final DimFilter dimFilter; private final QueryGranularity granularity; private final List dimensions; @@ -69,34 +75,88 @@ public class GroupByQuery extends BaseQuery public GroupByQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, - @JsonProperty("limitSpec") LimitSpec limitSpec, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("dimensions") List dimensions, @JsonProperty("aggregations") List aggregatorSpecs, @JsonProperty("postAggregations") List postAggregatorSpecs, + @JsonProperty("having") HavingSpec havingSpec, + @JsonProperty("limitSpec") LimitSpec limitSpec, + @JsonProperty("orderBy") LimitSpec orderBySpec, @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, context); - this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec; this.dimFilter = dimFilter; this.granularity = granularity; this.dimensions = dimensions == null ? ImmutableList.of() : dimensions; this.aggregatorSpecs = aggregatorSpecs; this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs; + this.havingSpec = havingSpec; + this.limitSpec = (limitSpec == null) ? (orderBySpec == null ? new NoopLimitSpec() : orderBySpec) : limitSpec; Preconditions.checkNotNull(this.granularity, "Must specify a granularity"); Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator"); Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); - orderByLimitFn = this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); + Function, Sequence> postProcFn = + this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); + + if (havingSpec != null) { + postProcFn = Functions.compose( + new Function, Sequence>() + { + @Override + public Sequence apply(@Nullable Sequence input) + { + return Sequences.filter( + input, + new Predicate() + { + @Override + public boolean apply(@Nullable Row input) + { + return GroupByQuery.this.havingSpec.eval(input); + } + } + ); + } + }, + postProcFn + ); + } + + orderByLimitFn = postProcFn; } - @JsonProperty - public LimitSpec getLimitSpec() + /** + * A private constructor that avoids all of the various state checks. Used by the with*() methods where the checks + * have already passed in order for the object to exist. + */ + private GroupByQuery( + String dataSource, + QuerySegmentSpec querySegmentSpec, + DimFilter dimFilter, + QueryGranularity granularity, + List dimensions, + List aggregatorSpecs, + List postAggregatorSpecs, + HavingSpec havingSpec, + LimitSpec orderBySpec, + Function, Sequence> orderByLimitFn, + Map context + ) { - return limitSpec; + super(dataSource, querySegmentSpec, context); + + this.dimFilter = dimFilter; + this.granularity = granularity; + this.dimensions = dimensions; + this.aggregatorSpecs = aggregatorSpecs; + this.postAggregatorSpecs = postAggregatorSpecs; + this.havingSpec = havingSpec; + this.limitSpec = orderBySpec; + this.orderByLimitFn = orderByLimitFn; } @JsonProperty("filter") @@ -129,6 +189,18 @@ public class GroupByQuery extends BaseQuery return postAggregatorSpecs; } + @JsonProperty("having") + public HavingSpec getHavingSpec() + { + return havingSpec; + } + + @JsonProperty + public LimitSpec getOrderBy() + { + return limitSpec; + } + @Override public boolean hasFilters() { @@ -152,12 +224,14 @@ public class GroupByQuery extends BaseQuery return new GroupByQuery( getDataSource(), getQuerySegmentSpec(), - limitSpec, dimFilter, granularity, dimensions, aggregatorSpecs, postAggregatorSpecs, + havingSpec, + limitSpec, + orderByLimitFn, computeOverridenContext(contextOverride) ); } @@ -168,12 +242,14 @@ public class GroupByQuery extends BaseQuery return new GroupByQuery( getDataSource(), spec, - limitSpec, dimFilter, granularity, dimensions, aggregatorSpecs, postAggregatorSpecs, + havingSpec, + limitSpec, + orderByLimitFn, getContext() ); } @@ -187,6 +263,8 @@ public class GroupByQuery extends BaseQuery private List dimensions; private List aggregatorSpecs; private List postAggregatorSpecs; + private HavingSpec havingSpec; + private Map context; private LimitSpec limitSpec = null; @@ -205,6 +283,9 @@ public class GroupByQuery extends BaseQuery dimensions = builder.dimensions; aggregatorSpecs = builder.aggregatorSpecs; postAggregatorSpecs = builder.postAggregatorSpecs; + havingSpec = builder.havingSpec; + limit = builder.limit; + context = builder.context; } @@ -264,7 +345,7 @@ public class GroupByQuery extends BaseQuery private void ensureFluentLimitsNotSet() { - if (! (limit == Integer.MAX_VALUE && orderByColumnSpecs.isEmpty()) ) { + if (!(limit == Integer.MAX_VALUE && orderByColumnSpecs.isEmpty())) { throw new ISE("Ambiguous build, limit[%s] or columnSpecs[%s] already set.", limit, orderByColumnSpecs); } } @@ -351,6 +432,20 @@ public class GroupByQuery extends BaseQuery return this; } + public Builder setHavingSpec(HavingSpec havingSpec) + { + this.havingSpec = havingSpec; + + return this; + } + + public Builder setLimit(Integer limit) + { + this.limit = limit; + + return this; + } + public Builder copy() { return new Builder(this); @@ -361,20 +456,21 @@ public class GroupByQuery extends BaseQuery final LimitSpec theLimitSpec; if (limitSpec == null) { theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); - } - else { + } else { theLimitSpec = limitSpec; } return new GroupByQuery( dataSource, querySegmentSpec, - theLimitSpec, dimFilter, granularity, dimensions, aggregatorSpecs, postAggregatorSpecs, + havingSpec, + null, + theLimitSpec, context ); } diff --git a/client/src/main/java/com/metamx/druid/query/group/having/AlwaysHavingSpec.java b/client/src/main/java/com/metamx/druid/query/group/having/AlwaysHavingSpec.java new file mode 100644 index 00000000000..3fa27b99ace --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/having/AlwaysHavingSpec.java @@ -0,0 +1,14 @@ +package com.metamx.druid.query.group.having; + +import com.metamx.druid.input.Row; + +/** + */ +public class AlwaysHavingSpec implements HavingSpec +{ + @Override + public boolean eval(Row row) + { + return true; + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/having/AndHavingSpec.java b/client/src/main/java/com/metamx/druid/query/group/having/AndHavingSpec.java new file mode 100644 index 00000000000..f474bc67c95 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/having/AndHavingSpec.java @@ -0,0 +1,75 @@ +package com.metamx.druid.query.group.having; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.metamx.druid.input.Row; + +import java.util.List; + +/** + * The logical "and" operator for the "having" clause. + */ +public class AndHavingSpec implements HavingSpec +{ + private List havingSpecs; + + @JsonCreator + public AndHavingSpec(@JsonProperty("havingSpecs") List havingSpecs) + { + this.havingSpecs = havingSpecs == null ? ImmutableList.of() : havingSpecs; + } + + @JsonProperty("havingSpecs") + public List getHavingSpecs() + { + return havingSpecs; + } + + @Override + public boolean eval(Row row) + { + for (HavingSpec havingSpec : havingSpecs) { + if (!havingSpec.eval(row)) { + return false; + } + } + + return true; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AndHavingSpec that = (AndHavingSpec) o; + + if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return havingSpecs != null ? havingSpecs.hashCode() : 0; + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder(); + sb.append("AndHavingSpec"); + sb.append("{havingSpecs=").append(havingSpecs); + sb.append('}'); + return sb.toString(); + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/having/EqualToHavingSpec.java b/client/src/main/java/com/metamx/druid/query/group/having/EqualToHavingSpec.java new file mode 100644 index 00000000000..f7e5060e4e9 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/having/EqualToHavingSpec.java @@ -0,0 +1,94 @@ +package com.metamx.druid.query.group.having; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.druid.input.Row; + +/** + * The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value", + * except that in SQL an aggregation is an expression instead of an aggregation name as in Druid. + */ +public class EqualToHavingSpec implements HavingSpec +{ + private String aggregationName; + private Number value; + + @JsonCreator + public EqualToHavingSpec( + @JsonProperty("aggregation") String aggName, + @JsonProperty("value") Number value + ) + { + this.aggregationName = aggName; + this.value = value; + } + + @JsonProperty("value") + public Number getValue() + { + return value; + } + + @JsonProperty("aggregation") + public String getAggregationName() + { + return aggregationName; + } + + @Override + public boolean eval(Row row) + { + float metricValue = row.getFloatMetric(aggregationName); + + return Float.compare(value.floatValue(), metricValue) == 0; + } + + /** + * This method treats internal value as double mainly for ease of test. + */ + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + EqualToHavingSpec that = (EqualToHavingSpec) o; + + if (aggregationName != null ? !aggregationName.equals(that.aggregationName) : that.aggregationName != null) { + return false; + } + + if (value != null && that.value != null) { + return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0; + } + + if (value == null && that.value == null) { + return true; + } + + return false; + } + + @Override + public int hashCode() + { + int result = aggregationName != null ? aggregationName.hashCode() : 0; + result = 31 * result + (value != null ? value.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder(); + sb.append("EqualToHavingSpec"); + sb.append("{aggregationName='").append(aggregationName).append('\''); + sb.append(", value=").append(value); + sb.append('}'); + return sb.toString(); + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/having/GreaterThanHavingSpec.java b/client/src/main/java/com/metamx/druid/query/group/having/GreaterThanHavingSpec.java new file mode 100644 index 00000000000..a64f5ee5050 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/having/GreaterThanHavingSpec.java @@ -0,0 +1,90 @@ +package com.metamx.druid.query.group.having; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.druid.input.Row; + +/** + * The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value", + * except that an aggregation in SQL is an expression instead of an aggregation name as in Druid. + */ +public class GreaterThanHavingSpec implements HavingSpec +{ + private String aggregationName; + private Number value; + + @JsonCreator + public GreaterThanHavingSpec( + @JsonProperty("aggregation") String aggName, + @JsonProperty("value") Number value + ) + { + this.aggregationName = aggName; + this.value = value; + } + + @JsonProperty("aggregation") + public String getAggregationName() + { + return aggregationName; + } + + @JsonProperty("value") + public Number getValue() + { + return value; + } + + @Override + public boolean eval(Row row) + { + float metricValue = row.getFloatMetric(aggregationName); + + return Float.compare(metricValue, value.floatValue()) > 0; + } + + /** + * This method treats internal value as double mainly for ease of test. + */ + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + GreaterThanHavingSpec that = (GreaterThanHavingSpec) o; + + if (value != null && that.value != null) { + return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0; + } + + if (value == null && that.value == null) { + return true; + } + + return false; + } + + @Override + public int hashCode() + { + int result = aggregationName != null ? aggregationName.hashCode() : 0; + result = 31 * result + (value != null ? value.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder(); + sb.append("GreaterThanHavingSpec"); + sb.append("{aggregationName='").append(aggregationName).append('\''); + sb.append(", value=").append(value); + sb.append('}'); + return sb.toString(); + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/having/HavingSpec.java b/client/src/main/java/com/metamx/druid/query/group/having/HavingSpec.java new file mode 100644 index 00000000000..b40ae662534 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/having/HavingSpec.java @@ -0,0 +1,51 @@ +package com.metamx.druid.query.group.having; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.metamx.druid.input.Row; + +/** + * A "having" clause that filters aggregated value. This is similar to SQL's "having" + * clause. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = AlwaysHavingSpec.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "and", value = AndHavingSpec.class), + @JsonSubTypes.Type(name = "or", value = OrHavingSpec.class), + @JsonSubTypes.Type(name = "not", value = NotHavingSpec.class), + @JsonSubTypes.Type(name = "greaterThan", value = GreaterThanHavingSpec.class), + @JsonSubTypes.Type(name = "lessThan", value = LessThanHavingSpec.class), + @JsonSubTypes.Type(name = "equalTo", value = EqualToHavingSpec.class) +}) +public interface HavingSpec +{ + /** + * Evaluates if a given row satisfies the having spec. + * + * @param row A Row of data that may contain aggregated values + * + * @return true if the given row satisfies the having spec. False otherwise. + * + * @see Row + */ + public boolean eval(Row row); + + // Atoms for easy combination, but for now they are mostly useful + // for testing. + /** + * A "having" spec that always evaluates to false + */ + public static final HavingSpec NEVER = new HavingSpec() + { + @Override + public boolean eval(Row row) + { + return false; + } + }; + + /** + * A "having" spec that always evaluates to true + */ + public static final HavingSpec ALWAYS = new AlwaysHavingSpec(); +} diff --git a/client/src/main/java/com/metamx/druid/query/group/having/LessThanHavingSpec.java b/client/src/main/java/com/metamx/druid/query/group/having/LessThanHavingSpec.java new file mode 100644 index 00000000000..86ff4dfaf9c --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/having/LessThanHavingSpec.java @@ -0,0 +1,89 @@ +package com.metamx.druid.query.group.having; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.druid.input.Row; + +/** + * The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value", + * except that an aggregation in SQL is an expression instead of an aggregation name as in Druid. + */ +public class LessThanHavingSpec implements HavingSpec +{ + private String aggregationName; + private Number value; + + public LessThanHavingSpec + ( + @JsonProperty("aggregation") String aggName, + @JsonProperty("value") Number value + ) + { + this.aggregationName = aggName; + this.value = value; + } + + @JsonProperty("aggregation") + public String getAggregationName() + { + return aggregationName; + } + + @JsonProperty("value") + public Number getValue() + { + return value; + } + + @Override + public boolean eval(Row row) + { + float metricValue = row.getFloatMetric(aggregationName); + + return Float.compare(metricValue, value.floatValue()) < 0; + } + + /** + * This method treats internal value as double mainly for ease of test. + */ + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LessThanHavingSpec that = (LessThanHavingSpec) o; + + if (value != null && that.value != null) { + return Double.compare(value.doubleValue(), that.value.doubleValue()) == 0; + } + + if (value == null && that.value == null) { + return true; + } + + return false; + } + + @Override + public int hashCode() + { + int result = aggregationName != null ? aggregationName.hashCode() : 0; + result = 31 * result + (value != null ? value.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder(); + sb.append("LessThanHavingSpec"); + sb.append("{aggregationName='").append(aggregationName).append('\''); + sb.append(", value=").append(value); + sb.append('}'); + return sb.toString(); + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/having/NotHavingSpec.java b/client/src/main/java/com/metamx/druid/query/group/having/NotHavingSpec.java new file mode 100644 index 00000000000..9324268fbda --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/having/NotHavingSpec.java @@ -0,0 +1,66 @@ +package com.metamx.druid.query.group.having; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.druid.input.Row; + +/** + * The logical "not" operator for the "having" clause. + */ +public class NotHavingSpec implements HavingSpec +{ + private HavingSpec havingSpec; + + @JsonCreator + public NotHavingSpec(@JsonProperty("havingSpec") HavingSpec havingSpec) + { + this.havingSpec = havingSpec; + } + + @JsonProperty("havingSpec") + public HavingSpec getHavingSpec() + { + return havingSpec; + } + + @Override + public boolean eval(Row row) + { + return !havingSpec.eval(row); + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder(); + sb.append("NotHavingSpec"); + sb.append("{havingSpec=").append(havingSpec); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + NotHavingSpec that = (NotHavingSpec) o; + + if (havingSpec != null ? !havingSpec.equals(that.havingSpec) : that.havingSpec != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return havingSpec != null ? havingSpec.hashCode() : 0; + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/having/OrHavingSpec.java b/client/src/main/java/com/metamx/druid/query/group/having/OrHavingSpec.java new file mode 100644 index 00000000000..76c8920182f --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/having/OrHavingSpec.java @@ -0,0 +1,67 @@ +package com.metamx.druid.query.group.having; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.metamx.druid.input.Row; + +import java.util.List; + +/** + * The logical "or" operator for the "having" clause. + */ +public class OrHavingSpec implements HavingSpec +{ + private List havingSpecs; + + @JsonCreator + public OrHavingSpec(@JsonProperty("havingSpecs") List havingSpecs) { + this.havingSpecs = havingSpecs == null ? ImmutableList.of() : havingSpecs; + } + + @JsonProperty("havingSpecs") + public List getHavingSpecs(){ + return havingSpecs; + } + + @Override + public boolean eval(Row row) + { + for(HavingSpec havingSpec: havingSpecs) { + if(havingSpec.eval(row)){ + return true; + } + } + + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + OrHavingSpec that = (OrHavingSpec) o; + + if (havingSpecs != null ? !havingSpecs.equals(that.havingSpecs) : that.havingSpecs != null) return false; + + return true; + } + + @Override + public int hashCode() + { + return havingSpecs != null ? havingSpecs.hashCode() : 0; + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder(); + sb.append("OrHavingSpec"); + sb.append("{havingSpecs=").append(havingSpecs); + sb.append('}'); + return sb.toString(); + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/limit/DefaultLimitSpec.java b/client/src/main/java/com/metamx/druid/query/group/orderby/DefaultLimitSpec.java similarity index 73% rename from client/src/main/java/com/metamx/druid/query/group/limit/DefaultLimitSpec.java rename to client/src/main/java/com/metamx/druid/query/group/orderby/DefaultLimitSpec.java index 545c3798785..66049f925f3 100644 --- a/client/src/main/java/com/metamx/druid/query/group/limit/DefaultLimitSpec.java +++ b/client/src/main/java/com/metamx/druid/query/group/orderby/DefaultLimitSpec.java @@ -17,13 +17,14 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.query.group.limit; +package com.metamx.druid.query.group.orderby; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; @@ -35,6 +36,8 @@ import com.metamx.druid.aggregation.post.PostAggregator; import com.metamx.druid.input.Row; import com.metamx.druid.query.dimension.DimensionSpec; +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -55,7 +58,7 @@ public class DefaultLimitSpec implements LimitSpec this.columns = (columns == null) ? ImmutableList.of() : columns; this.limit = (limit == null) ? Integer.MAX_VALUE : limit; - Preconditions.checkState(limit > 0, "limit[%s] must be >0", limit); + Preconditions.checkArgument(this.limit > 0, "limit[%s] must be >0", limit); } @JsonProperty @@ -75,20 +78,22 @@ public class DefaultLimitSpec implements LimitSpec List dimensions, List aggs, List postAggs ) { - // Materialize the Comparator first for fast-fail error checking. - final Comparator comparator = makeComparator(dimensions, aggs, postAggs); + if (columns.isEmpty()) { + return new LimitingFn(limit); + } - return new Function, Sequence>() - { - @Override - public Sequence apply(Sequence input) - { - return Sequences.limit(Sequences.sort(input, comparator), limit); - } - }; + // Materialize the Comparator first for fast-fail error checking. + final Ordering ordering = makeComparator(dimensions, aggs, postAggs); + + if (limit == Integer.MAX_VALUE) { + return new SortingFn(ordering); + } + else { + return new TopNFunction(ordering, limit); + } } - private Comparator makeComparator( + private Ordering makeComparator( List dimensions, List aggs, List postAggs ) { @@ -174,4 +179,57 @@ public class DefaultLimitSpec implements LimitSpec ", limit=" + limit + '}'; } + + private static class LimitingFn implements Function, Sequence> + { + private int limit; + + public LimitingFn(int limit) + { + this.limit = limit; + } + + @Override + public Sequence apply( + @Nullable Sequence input + ) + { + return Sequences.limit(input, limit); + } + } + + private static class SortingFn implements Function, Sequence> + { + private final Ordering ordering; + + public SortingFn(Ordering ordering) {this.ordering = ordering;} + + @Override + public Sequence apply(@Nullable Sequence input) + { + return Sequences.sort(input, ordering); + } + } + + private static class TopNFunction implements Function, Sequence> + { + private final TopNSorter sorter; + private final int limit; + + public TopNFunction(Ordering ordering, int limit) + { + this.limit = limit; + + this.sorter = new TopNSorter(ordering); + } + + @Override + public Sequence apply( + @Nullable Sequence input + ) + { + final ArrayList materializedList = Sequences.toList(input, Lists.newArrayList()); + return Sequences.simple(sorter.toTopN(materializedList, limit)); + } + } } diff --git a/client/src/main/java/com/metamx/druid/query/group/limit/LimitSpec.java b/client/src/main/java/com/metamx/druid/query/group/orderby/LimitSpec.java similarity index 97% rename from client/src/main/java/com/metamx/druid/query/group/limit/LimitSpec.java rename to client/src/main/java/com/metamx/druid/query/group/orderby/LimitSpec.java index 4522b4739c2..cee3c76e55d 100644 --- a/client/src/main/java/com/metamx/druid/query/group/limit/LimitSpec.java +++ b/client/src/main/java/com/metamx/druid/query/group/orderby/LimitSpec.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.query.group.limit; +package com.metamx.druid.query.group.orderby; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; diff --git a/client/src/main/java/com/metamx/druid/query/group/limit/NoopLimitSpec.java b/client/src/main/java/com/metamx/druid/query/group/orderby/NoopLimitSpec.java similarity index 93% rename from client/src/main/java/com/metamx/druid/query/group/limit/NoopLimitSpec.java rename to client/src/main/java/com/metamx/druid/query/group/orderby/NoopLimitSpec.java index f9332f8754d..2493cd4cbf3 100644 --- a/client/src/main/java/com/metamx/druid/query/group/limit/NoopLimitSpec.java +++ b/client/src/main/java/com/metamx/druid/query/group/orderby/NoopLimitSpec.java @@ -1,4 +1,4 @@ -package com.metamx.druid.query.group.limit; +package com.metamx.druid.query.group.orderby; import com.google.common.base.Function; import com.google.common.base.Functions; diff --git a/client/src/main/java/com/metamx/druid/query/group/limit/OrderByColumnSpec.java b/client/src/main/java/com/metamx/druid/query/group/orderby/OrderByColumnSpec.java similarity index 70% rename from client/src/main/java/com/metamx/druid/query/group/limit/OrderByColumnSpec.java rename to client/src/main/java/com/metamx/druid/query/group/orderby/OrderByColumnSpec.java index ba9aa79fa87..6a02ec962f8 100644 --- a/client/src/main/java/com/metamx/druid/query/group/limit/OrderByColumnSpec.java +++ b/client/src/main/java/com/metamx/druid/query/group/orderby/OrderByColumnSpec.java @@ -1,12 +1,17 @@ -package com.metamx.druid.query.group.limit; +package com.metamx.druid.query.group.orderby; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.metamx.common.IAE; import com.metamx.common.ISE; +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; import java.util.Map; /** @@ -52,6 +57,46 @@ public class OrderByColumnSpec } } + public static OrderByColumnSpec asc(String dimension) + { + return new OrderByColumnSpec(dimension, Direction.ASCENDING); + } + + public static List ascending(String... dimension) + { + return Lists.transform( + Arrays.asList(dimension), + new Function() + { + @Override + public OrderByColumnSpec apply(@Nullable String input) + { + return asc(input); + } + } + ); + } + + public static OrderByColumnSpec desc(String dimension) + { + return new OrderByColumnSpec(dimension, Direction.DESCENDING); + } + + public static List descending(String... dimension) + { + return Lists.transform( + Arrays.asList(dimension), + new Function() + { + @Override + public OrderByColumnSpec apply(@Nullable String input) + { + return desc(input); + } + } + ); + } + public OrderByColumnSpec( String dimension, Direction direction diff --git a/client/src/main/java/com/metamx/druid/query/group/orderby/OrderedPriorityQueueItems.java b/client/src/main/java/com/metamx/druid/query/group/orderby/OrderedPriorityQueueItems.java new file mode 100644 index 00000000000..5b5a3f48cb6 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/orderby/OrderedPriorityQueueItems.java @@ -0,0 +1,43 @@ +package com.metamx.druid.query.group.orderby; + +import com.google.common.collect.MinMaxPriorityQueue; + +import java.util.Iterator; + +/** + * Utility class that supports iterating a priority queue in sorted order. + */ +class OrderedPriorityQueueItems implements Iterable +{ + private MinMaxPriorityQueue rows; + + public OrderedPriorityQueueItems(MinMaxPriorityQueue rows) + { + this.rows = rows; + } + + @Override + public Iterator iterator() + { + return new Iterator() { + + @Override + public boolean hasNext() + { + return !rows.isEmpty(); + } + + @Override + public T next() + { + return rows.poll(); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Can't remove any item from an intermediary heap for orderBy/limit"); + } + }; + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/orderby/TopNSorter.java b/client/src/main/java/com/metamx/druid/query/group/orderby/TopNSorter.java new file mode 100644 index 00000000000..7160ba62074 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/orderby/TopNSorter.java @@ -0,0 +1,40 @@ +package com.metamx.druid.query.group.orderby; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Ordering; + +/** + * A utility class that sorts a list of comparable items in the given order, and keeps only the + * top N sorted items. + */ +public class TopNSorter +{ + private Ordering ordering; + + /** + * Constructs a sorter that will sort items with given ordering. + * @param ordering the order that this sorter instance will use for sorting + */ + public TopNSorter(Ordering ordering) + { + this.ordering = ordering; + } + + /** + * Sorts a list of rows and retain the top n items + * @param items the collections of items to be sorted + * @param n the number of items to be retained + * @return Top n items that are sorted in the order specified when this instance is constructed. + */ + public Iterable toTopN(Iterable items, int n) + { + if(n <= 0) { + return ImmutableList.of(); + } + + MinMaxPriorityQueue queue = MinMaxPriorityQueue.orderedBy(ordering).maximumSize(n).create(items); + + return new OrderedPriorityQueueItems(queue); + } +} diff --git a/client/src/test/java/com/metamx/druid/query/group/having/HavingSpecTest.java b/client/src/test/java/com/metamx/druid/query/group/having/HavingSpecTest.java new file mode 100644 index 00000000000..28c91878e57 --- /dev/null +++ b/client/src/test/java/com/metamx/druid/query/group/having/HavingSpecTest.java @@ -0,0 +1,227 @@ +package com.metamx.druid.query.group.having; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.druid.input.MapBasedInputRow; +import com.metamx.druid.input.Row; +import com.metamx.druid.jackson.DefaultObjectMapper; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +public class HavingSpecTest +{ + private static final Row ROW = new MapBasedInputRow(0, new ArrayList(), ImmutableMap.of("metric", (Object)Float.valueOf(10))); + + @Test + public void testHavingClauseSerde() throws Exception { + List havings = Arrays.asList( + new GreaterThanHavingSpec("agg", Double.valueOf(1.3)), + new OrHavingSpec( + Arrays.asList( + new LessThanHavingSpec("lessAgg", Long.valueOf(1L)), + new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2))) + ) + ) + ); + + HavingSpec andHavingSpec = new AndHavingSpec(havings); + + Map notMap = ImmutableMap.of( + "type", "not", + "havingSpec", ImmutableMap.of("type", "equalTo", "aggregation", "equalAgg", "value", 2.0) + ); + + Map lessMap = ImmutableMap.of( + "type", "lessThan", + "aggregation", "lessAgg", + "value", 1 + ); + + Map greaterMap = ImmutableMap.of( + "type", "greaterThan", + "aggregation", "agg", + "value", 1.3 + ); + + Map orMap = ImmutableMap.of( + "type", "or", + "havingSpecs", ImmutableList.of(lessMap, notMap) + ); + + Map payloadMap = ImmutableMap.of( + "type", "and", + "havingSpecs", ImmutableList.of(greaterMap, orMap) + ); + + ObjectMapper mapper = new DefaultObjectMapper(); + assertEquals(andHavingSpec, mapper.convertValue(payloadMap, AndHavingSpec.class)); + } + + @Test + public void testGreaterThanHavingSpec() { + GreaterThanHavingSpec spec = new GreaterThanHavingSpec("metric", 10.003); + assertFalse(spec.eval(ROW)); + + spec = new GreaterThanHavingSpec("metric", 10); + assertFalse(spec.eval(ROW)); + + spec = new GreaterThanHavingSpec("metric", 9); + assertTrue(spec.eval(ROW)); + } + + private MapBasedInputRow makeRow(long ts, String dim, int value) + { + List dimensions = Lists.newArrayList(dim); + Map metrics = ImmutableMap.of("metric", (Object) Float.valueOf(value)); + + return new MapBasedInputRow(ts, dimensions, metrics); + } + + @Test + public void testLessThanHavingSpec() { + LessThanHavingSpec spec = new LessThanHavingSpec("metric", 10); + assertFalse(spec.eval(ROW)); + + spec = new LessThanHavingSpec("metric", 11); + assertTrue(spec.eval(ROW)); + + spec = new LessThanHavingSpec("metric", 9); + assertFalse(spec.eval(ROW)); + } + + @Test + public void testEqualHavingSpec() { + EqualToHavingSpec spec = new EqualToHavingSpec("metric", 10); + assertTrue(spec.eval(ROW)); + + spec = new EqualToHavingSpec("metric", 9); + assertFalse(spec.eval(ROW)); + + spec = new EqualToHavingSpec("metric", 11); + assertFalse(spec.eval(ROW)); + } + + private static class CountingHavingSpec implements HavingSpec { + + private final AtomicInteger counter; + private final boolean value; + private CountingHavingSpec(AtomicInteger counter, boolean value) + { + this.counter = counter; + this.value = value; + } + + @Override + public boolean eval(Row row) + { + counter.incrementAndGet(); + return value; + } + } + + @Test + public void testAndHavingSpecShouldSupportShortcutEvaluation () { + AtomicInteger counter = new AtomicInteger(0); + AndHavingSpec spec = new AndHavingSpec(ImmutableList.of( + (HavingSpec)new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, false), + new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, false) + )); + + spec.eval(ROW); + + assertEquals(2, counter.get()); + } + + @Test + public void testAndHavingSpec () { + AtomicInteger counter = new AtomicInteger(0); + AndHavingSpec spec = new AndHavingSpec(ImmutableList.of( + (HavingSpec)new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, true) + )); + + spec.eval(ROW); + + assertEquals(4, counter.get()); + + counter.set(0); + spec = new AndHavingSpec(ImmutableList.of( + (HavingSpec)new CountingHavingSpec(counter, false), + new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, true) + )); + + spec.eval(ROW); + + assertEquals(1, counter.get()); + } + + @Test + public void testOrHavingSpecSupportsShortcutEvaluation() { + AtomicInteger counter = new AtomicInteger(0); + OrHavingSpec spec = new OrHavingSpec(ImmutableList.of( + (HavingSpec)new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, true), + new CountingHavingSpec(counter, false) + )); + + spec.eval(ROW); + + assertEquals(1, counter.get()); + } + + @Test + public void testOrHavingSpec () { + AtomicInteger counter = new AtomicInteger(0); + OrHavingSpec spec = new OrHavingSpec(ImmutableList.of( + (HavingSpec)new CountingHavingSpec(counter, false), + new CountingHavingSpec(counter, false), + new CountingHavingSpec(counter, false), + new CountingHavingSpec(counter, false) + )); + + spec.eval(ROW); + + assertEquals(4, counter.get()); + + counter.set(0); + spec = new OrHavingSpec(ImmutableList.of( + (HavingSpec)new CountingHavingSpec(counter, false), + new CountingHavingSpec(counter, false), + new CountingHavingSpec(counter, false), + new CountingHavingSpec(counter, true) + )); + + spec.eval(ROW); + + assertEquals(4, counter.get()); + } + + @Test + public void testNotHavingSepc() { + NotHavingSpec spec = new NotHavingSpec(HavingSpec.NEVER); + assertTrue(spec.eval(ROW)); + + spec = new NotHavingSpec(HavingSpec.ALWAYS); + assertFalse(spec.eval(ROW)); + + } +} diff --git a/client/src/test/java/com/metamx/druid/query/group/orderby/TopNSorterTest.java b/client/src/test/java/com/metamx/druid/query/group/orderby/TopNSorterTest.java new file mode 100644 index 00000000000..872155e5049 --- /dev/null +++ b/client/src/test/java/com/metamx/druid/query/group/orderby/TopNSorterTest.java @@ -0,0 +1,79 @@ +package com.metamx.druid.query.group.orderby; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; + + +@RunWith(Parameterized.class) +public class TopNSorterTest +{ + private static final long SEED = 2L; + private static final Ordering ASC = Ordering.natural(); + private static final Ordering DESC = Ordering.natural().reverse(); + + private static final List EMPTY = Collections.EMPTY_LIST; + private static final List SINGLE = Lists.newArrayList("a"); + private static final List RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk")); + private static final List RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba")); + + private Ordering ordering; + private List rawInput; + private int limit; + + @Parameterized.Parameters + public static Collection makeTestData(){ + Object[][] data = new Object[][] { + { ASC, RAW_ASC, RAW_ASC.size() - 2}, + { ASC, RAW_ASC, RAW_ASC.size()}, + { ASC, RAW_ASC, RAW_ASC.size() + 2}, + { ASC, RAW_ASC, 0}, + { ASC, SINGLE, 0}, + { ASC, SINGLE, 1}, + { ASC, SINGLE, 2}, + { ASC, SINGLE, 3}, + { ASC, EMPTY, 0}, + { ASC, EMPTY, 1}, + { DESC, RAW_DESC, RAW_DESC.size() - 2}, + { DESC, RAW_DESC, RAW_DESC.size()}, + { DESC, RAW_DESC, RAW_DESC.size() + 2}, + { DESC, RAW_DESC, 0}, + { DESC, RAW_DESC, 0}, + { DESC, SINGLE, 1}, + { DESC, SINGLE, 2}, + { DESC, SINGLE, 3}, + { DESC, EMPTY, 0}, + { DESC, EMPTY, 1}, + }; + + return Arrays.asList(data); + } + + public TopNSorterTest(Ordering ordering, List rawInput, int limit){ + this.ordering = ordering; + this.rawInput = rawInput; + this.limit = limit; + } + + @Test + public void testOrderByWithLimit() + { + List expected = rawInput.subList(0, Math.min(limit, rawInput.size())); + List inputs = Lists.newArrayList(rawInput); + Collections.shuffle(inputs, new Random(2)); + + Iterable result = new TopNSorter(ordering).toTopN(inputs, limit); + + Assert.assertEquals(expected, Lists.newArrayList(result)); + } +} diff --git a/indexer/pom.xml b/indexer/pom.xml index 546a7eb21ff..d659c479cb6 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -120,12 +120,6 @@ ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar - - - org.codehaus.jackson - druid.org.codehaus.jackson - - diff --git a/pom.xml b/pom.xml index 00b7f403c1c..d01a7e4d1f7 100644 --- a/pom.xml +++ b/pom.xml @@ -209,16 +209,6 @@ jackson-jaxrs-json-provider 2.1.4 - - org.codehaus.jackson - jackson-core-asl - 1.9.11 - - - org.codehaus.jackson - jackson-mapper-asl - 1.9.11 - javax.inject javax.inject diff --git a/server/src/test/java/com/metamx/druid/TestHelper.java b/server/src/test/java/com/metamx/druid/TestHelper.java index 53d45192f7c..8a15a73f724 100644 --- a/server/src/test/java/com/metamx/druid/TestHelper.java +++ b/server/src/test/java/com/metamx/druid/TestHelper.java @@ -69,7 +69,7 @@ public class TestHelper assertResult(failMsg, expectedNext, next); assertResult( - String.format("%sSecond iterator bad, multiple calls to iterator() should be safe", failMsg), + String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg), expectedNext, next2 ); @@ -77,20 +77,20 @@ public class TestHelper if (resultsIter.hasNext()) { Assert.fail( - String.format("%sExpected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next()) + String.format("%s: Expected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next()) ); } if (resultsIter2.hasNext()) { Assert.fail( - String.format("%sExpected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next()) + String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next()) ); } if (expectedResultsIter.hasNext()) { Assert.fail( String.format( - "%sExpected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next() + "%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next() ) ); } @@ -109,7 +109,7 @@ public class TestHelper Assert.assertEquals(failMsg, expectedNext, next); Assert.assertEquals( - String.format("%sSecond iterator bad, multiple calls to iterator() should be safe", failMsg), + String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg), expectedNext, next2 ); @@ -117,20 +117,20 @@ public class TestHelper if (resultsIter.hasNext()) { Assert.fail( - String.format("%sExpected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next()) + String.format("%s: Expected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next()) ); } if (resultsIter2.hasNext()) { Assert.fail( - String.format("%sExpected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next()) + String.format("%s: Expected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next()) ); } if (expectedResultsIter.hasNext()) { Assert.fail( String.format( - "%sExpected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next() + "%s: Expected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next() ) ); } diff --git a/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java b/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java index 7dfc83f6f4f..a6e07b52d6e 100644 --- a/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java +++ b/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java @@ -22,9 +22,12 @@ package com.metamx.druid.query.group; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.druid.PeriodGranularity; @@ -41,7 +44,12 @@ import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunnerTestHelper; import com.metamx.druid.query.dimension.DefaultDimensionSpec; import com.metamx.druid.query.dimension.DimensionSpec; -import com.metamx.druid.query.group.limit.OrderByColumnSpec; +import com.metamx.druid.query.group.having.EqualToHavingSpec; +import com.metamx.druid.query.group.having.GreaterThanHavingSpec; +import com.metamx.druid.query.group.having.OrHavingSpec; +import com.metamx.druid.query.group.orderby.DefaultLimitSpec; +import com.metamx.druid.query.group.orderby.LimitSpec; +import com.metamx.druid.query.group.orderby.OrderByColumnSpec; import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -56,6 +64,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; @@ -69,28 +78,30 @@ public class GroupByQueryRunnerTest public static Collection constructorFeeder() throws IOException { final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( - new GroupByQueryEngine( - new GroupByQueryEngineConfig() - { - @Override - public int getMaxIntermediateRows() - { - return 10000; - } - }, - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(1024 * 1024); - } - } - ) - ), - new GroupByQueryRunnerFactoryConfig(){} - ); + new GroupByQueryEngine( + new GroupByQueryEngineConfig() + { + @Override + public int getMaxIntermediateRows() + { + return 10000; + } + }, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ), + new GroupByQueryRunnerFactoryConfig() + { + } + ); return Lists.newArrayList( Iterables.transform( @@ -106,13 +117,15 @@ public class GroupByQueryRunnerTest ); } - public GroupByQueryRunnerTest(GroupByQueryRunnerFactory factory, QueryRunner runner) { + public GroupByQueryRunnerTest(GroupByQueryRunnerFactory factory, QueryRunner runner) + { this.factory = factory; this.runner = runner; } @Test - public void testGroupBy() { + public void testGroupBy() + { GroupByQuery query = GroupByQuery .builder() .setDataSource(QueryRunnerTestHelper.dataSource) @@ -127,35 +140,36 @@ public class GroupByQueryRunnerTest .setGranularity(QueryRunnerTestHelper.dayGran) .build(); - List expectedResults = Arrays.asList( - createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), - createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), - createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), - createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), - createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), - createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), - createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), - createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), - createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), + createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), + createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), + createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), + createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), + createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), + createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), + createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), + createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), - createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), - createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), - createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), - createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), - createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), - createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), - createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), - createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), - createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) - ); + createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), + createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), + createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), + createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), + createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), + createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), + createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), + createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), + createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) + ); - Iterable results = Sequences.toList(runner.run(query), Lists.newArrayList()); + Iterable results = Sequences.toList(runner.run(query), Lists.newArrayList()); - TestHelper.assertExpectedObjects(expectedResults, results, ""); + TestHelper.assertExpectedObjects(expectedResults, results, ""); } @Test - public void testGroupByWithTimeZone() { + public void testGroupByWithTimeZone() + { DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles"); GroupByQuery query = GroupByQuery.builder() @@ -187,38 +201,39 @@ public class GroupByQueryRunnerTest ) .build(); - List expectedResults = Arrays.asList( - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L), - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "business", "rows", 1L, "idx", 118L), - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "entertainment", "rows", 1L, "idx", 158L), - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "health", "rows", 1L, "idx", 120L), - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "mezzanine", "rows", 3L, "idx", 2870L), - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "news", "rows", 1L, "idx", 121L), - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "premium", "rows", 3L, "idx", 2900L), - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "technology", "rows", 1L, "idx", 78L), - createExpectedRow(new DateTime("2011-03-31", tz), "alias", "travel", "rows", 1L, "idx", 119L), + List expectedResults = Arrays.asList( + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "automotive", "rows", 1L, "idx", 135L), + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "business", "rows", 1L, "idx", 118L), + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "entertainment", "rows", 1L, "idx", 158L), + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "health", "rows", 1L, "idx", 120L), + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "mezzanine", "rows", 3L, "idx", 2870L), + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "news", "rows", 1L, "idx", 121L), + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "premium", "rows", 3L, "idx", 2900L), + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "technology", "rows", 1L, "idx", 78L), + createExpectedRow(new DateTime("2011-03-31", tz), "alias", "travel", "rows", 1L, "idx", 119L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "automotive", "rows", 1L, "idx", 147L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "business", "rows", 1L, "idx", 112L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "entertainment", "rows", 1L, "idx", 166L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "health", "rows", 1L, "idx", 113L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "mezzanine", "rows", 3L, "idx", 2447L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "news", "rows", 1L, "idx", 114L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "premium", "rows", 3L, "idx", 2505L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "technology", "rows", 1L, "idx", 97L), - createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L) - ); + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "automotive", "rows", 1L, "idx", 147L), + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "business", "rows", 1L, "idx", 112L), + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "entertainment", "rows", 1L, "idx", 166L), + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "health", "rows", 1L, "idx", 113L), + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "mezzanine", "rows", 3L, "idx", 2447L), + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "news", "rows", 1L, "idx", 114L), + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "premium", "rows", 3L, "idx", 2505L), + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "technology", "rows", 1L, "idx", 97L), + createExpectedRow(new DateTime("2011-04-01", tz), "alias", "travel", "rows", 1L, "idx", 126L) + ); - Iterable results = Sequences.toList( - runner.run(query), - Lists.newArrayList() - ); + Iterable results = Sequences.toList( + runner.run(query), + Lists.newArrayList() + ); - TestHelper.assertExpectedObjects(expectedResults, results, ""); + TestHelper.assertExpectedObjects(expectedResults, results, ""); } @Test - public void testMergeResults() { + public void testMergeResults() + { GroupByQuery.Builder builder = GroupByQuery .builder() .setDataSource(QueryRunnerTestHelper.dataSource) @@ -282,7 +297,180 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct"); TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged"); + } + @Test + public void testMergeResultsWithLimit() + { + for (int limit = 1; limit < 20; ++limit) { + doTestMergeResultsWithValidLimit(limit); + } + } + + private void doTestMergeResultsWithValidLimit(final int limit) + { + GroupByQuery.Builder builder = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) + .setLimit(Integer.valueOf(limit)); + + final GroupByQuery fullQuery = builder.build(); + + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L), + createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L), + createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L), + createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L), + createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L), + createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L), + createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L), + createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L), + createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L) + ); + + QueryRunner mergeRunner = new GroupByQueryQueryToolChest().mergeResults(runner); + + mergeRunner.run(fullQuery).accumulate(null, new Accumulator() + { + @Override + public Object accumulate(Object o, Row row) + { + System.out.printf("%d: %s%n", limit, row); + return null; + } + }); + + TestHelper.assertExpectedObjects( + Iterables.limit(expectedResults, limit), mergeRunner.run(fullQuery), String.format("limit: %d", limit) + ); + } + + @Test(expected = IllegalArgumentException.class) + public void testMergeResultsWithNegativeLimit() + { + GroupByQuery.Builder builder = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) + .setLimit(Integer.valueOf(-1)); + + builder.build(); + } + + @Test + public void testMergeResultsWithOrderBy() + { + LimitSpec[] orderBySpecs = new LimitSpec[]{ + new DefaultLimitSpec(OrderByColumnSpec.ascending("idx"), null), + new DefaultLimitSpec(OrderByColumnSpec.ascending("rows", "idx"), null), + new DefaultLimitSpec(OrderByColumnSpec.descending("idx"), null), + new DefaultLimitSpec(OrderByColumnSpec.descending("rows", "idx"), null), + }; + + final Comparator idxComparator = + new Comparator() + { + @Override + public int compare(Row o1, Row o2) + { + return Float.compare(o1.getFloatMetric("idx"), o2.getFloatMetric("idx")); + } + }; + + Comparator rowsIdxComparator = + new Comparator() + { + + @Override + public int compare(Row o1, Row o2) + { + int value = Float.compare(o1.getFloatMetric("rows"), o2.getFloatMetric("rows")); + if (value != 0) { + return value; + } + + return idxComparator.compare(o1, o2); + } + }; + + List allResults = Arrays.asList( + createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L), + createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L), + createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L), + createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L), + createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L), + createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L), + createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L), + createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L), + createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L) + ); + + List> expectedResults = Lists.newArrayList( + Ordering.from(idxComparator).sortedCopy(allResults), + Ordering.from(rowsIdxComparator).sortedCopy(allResults), + Ordering.from(idxComparator).reverse().sortedCopy(allResults), + Ordering.from(rowsIdxComparator).reverse().sortedCopy(allResults) + ); + + for (int i = 0; i < orderBySpecs.length; ++i) { + doTestMergeResultsWithOrderBy(orderBySpecs[i], expectedResults.get(i)); + } + } + + private void doTestMergeResultsWithOrderBy(LimitSpec orderBySpec, List expectedResults) + { + GroupByQuery.Builder builder = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) + .setLimitSpec(orderBySpec); + + final GroupByQuery fullQuery = builder.build(); + + QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults( + new QueryRunner() + { + @Override + public Sequence run(Query query) + { + // simulate two daily segments + final Query query1 = query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) + ); + final Query query2 = query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) + ); + return Sequences.concat(runner.run(query1), runner.run(query2)); + } + } + ); + + TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged"); } @Test @@ -403,6 +591,59 @@ public class GroupByQueryRunnerTest ); } + @Test + public void testHavingSpec() + { + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L), + createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L), + createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L) + ); + + GroupByQuery.Builder builder = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) + .setHavingSpec( + new OrHavingSpec( + ImmutableList.of( + new GreaterThanHavingSpec("rows", 2L), + new EqualToHavingSpec("idx", 217L) + ) + ) + ); + + final GroupByQuery fullQuery = builder.build(); + + QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults( + new QueryRunner() + { + @Override + public Sequence run(Query query) + { + // simulate two daily segments + final Query query1 = query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) + ); + final Query query2 = query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) + ); + return Sequences.concat(runner.run(query1), runner.run(query2)); + } + } + ); + + TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged"); + } + private Row createExpectedRow(final String timestamp, Object... vals) { return createExpectedRow(new DateTime(timestamp), vals); @@ -413,10 +654,10 @@ public class GroupByQueryRunnerTest Preconditions.checkArgument(vals.length % 2 == 0); Map theVals = Maps.newHashMap(); - for (int i = 0; i < vals.length; i+=2) { - theVals.put(vals[i].toString(), vals[i+1]); + for (int i = 0; i < vals.length; i += 2) { + theVals.put(vals[i].toString(), vals[i + 1]); } - return new MapBasedRow(timestamp, theVals); + return new MapBasedRow(new DateTime(timestamp), theVals); } }