From cde86d81e20fd14255eb95fe7e39f058657e3b04 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 10 Jun 2014 15:50:13 -0700 Subject: [PATCH] make having and limitspec work for nested queries --- .../io/druid/query/groupby/GroupByQuery.java | 41 +++++---- .../groupby/GroupByQueryQueryToolChest.java | 30 +++--- .../groupby/orderby/DefaultLimitSpec.java | 65 +++++++++---- .../query/groupby/orderby/LimitSpec.java | 8 +- .../query/groupby/orderby/NoopLimitSpec.java | 6 ++ .../query/groupby/GroupByQueryRunnerTest.java | 92 +++++++++++++++++++ 6 files changed, 192 insertions(+), 50 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index d9431849ea2..98ac83e32cd 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -72,7 +72,7 @@ public class GroupByQuery extends BaseQuery private final List aggregatorSpecs; private final List postAggregatorSpecs; - private final Function, Sequence> orderByLimitFn; + private final Function, Sequence> limitFn; @JsonCreator public GroupByQuery( @@ -85,8 +85,9 @@ public class GroupByQuery extends BaseQuery @JsonProperty("postAggregations") List postAggregatorSpecs, @JsonProperty("having") HavingSpec havingSpec, @JsonProperty("limitSpec") LimitSpec limitSpec, - @JsonProperty("orderBy") LimitSpec orderBySpec, - @JsonProperty("context") Map context + @JsonProperty("context") Map context, + // Backwards compatible + @JsonProperty("orderBy") LimitSpec orderBySpec ) { super(dataSource, querySegmentSpec, context); @@ -129,7 +130,7 @@ public class GroupByQuery extends BaseQuery ); } - orderByLimitFn = postProcFn; + limitFn = postProcFn; } /** @@ -146,7 +147,7 @@ public class GroupByQuery extends BaseQuery List postAggregatorSpecs, HavingSpec havingSpec, LimitSpec orderBySpec, - Function, Sequence> orderByLimitFn, + Function, Sequence> limitFn, Map context ) { @@ -159,7 +160,7 @@ public class GroupByQuery extends BaseQuery this.postAggregatorSpecs = postAggregatorSpecs; this.havingSpec = havingSpec; this.limitSpec = orderBySpec; - this.orderByLimitFn = orderByLimitFn; + this.limitFn = limitFn; } @JsonProperty("filter") @@ -199,7 +200,7 @@ public class GroupByQuery extends BaseQuery } @JsonProperty - public LimitSpec getOrderBy() + public LimitSpec getLimitSpec() { return limitSpec; } @@ -218,7 +219,7 @@ public class GroupByQuery extends BaseQuery public Sequence applyLimit(Sequence results) { - return orderByLimitFn.apply(results); + return limitFn.apply(results); } @Override @@ -234,7 +235,7 @@ public class GroupByQuery extends BaseQuery postAggregatorSpecs, havingSpec, limitSpec, - orderByLimitFn, + limitFn, computeOverridenContext(contextOverride) ); } @@ -252,7 +253,7 @@ public class GroupByQuery extends BaseQuery postAggregatorSpecs, havingSpec, limitSpec, - orderByLimitFn, + limitFn, getContext() ); } @@ -270,7 +271,7 @@ public class GroupByQuery extends BaseQuery postAggregatorSpecs, havingSpec, limitSpec, - orderByLimitFn, + limitFn, getContext() ); } @@ -300,7 +301,7 @@ public class GroupByQuery extends BaseQuery { dataSource = query.getDataSource(); querySegmentSpec = query.getQuerySegmentSpec(); - limitSpec = query.getOrderBy(); + limitSpec = query.getLimitSpec(); dimFilter = query.getDimFilter(); granularity = query.getGranularity(); dimensions = query.getDimensions(); @@ -504,7 +505,11 @@ public class GroupByQuery extends BaseQuery { final LimitSpec theLimitSpec; if (limitSpec == null) { - theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); + if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) { + theLimitSpec = new NoopLimitSpec(); + } else { + theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); + } } else { theLimitSpec = limitSpec; } @@ -518,9 +523,9 @@ public class GroupByQuery extends BaseQuery aggregatorSpecs, postAggregatorSpecs, havingSpec, - null, theLimitSpec, - context + context, + null ); } } @@ -535,7 +540,7 @@ public class GroupByQuery extends BaseQuery ", dimensions=" + dimensions + ", aggregatorSpecs=" + aggregatorSpecs + ", postAggregatorSpecs=" + postAggregatorSpecs + - ", orderByLimitFn=" + orderByLimitFn + + ", limitFn=" + limitFn + '}'; } @@ -572,7 +577,7 @@ public class GroupByQuery extends BaseQuery if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) { return false; } - if (orderByLimitFn != null ? !orderByLimitFn.equals(that.orderByLimitFn) : that.orderByLimitFn != null) { + if (limitFn != null ? !limitFn.equals(that.limitFn) : that.limitFn != null) { return false; } if (postAggregatorSpecs != null @@ -595,7 +600,7 @@ public class GroupByQuery extends BaseQuery result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); - result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0); + result = 31 * result + (limitFn != null ? limitFn.hashCode() : 0); return result; } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 681871cfd93..09cf7e431a8 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -44,7 +44,6 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.PostAggregator; import io.druid.segment.incremental.IncrementalIndex; @@ -52,7 +51,6 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.Interval; import org.joda.time.Minutes; -import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -89,7 +87,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest run(Query input) { - if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { + if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner); } else { return runner.run(input); @@ -100,8 +98,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults(final GroupByQuery query, QueryRunner runner) { - Sequence result; - // If there's a subquery, merge subquery results and then apply the aggregator final DataSource dataSource = query.getDataSource(); if (dataSource instanceof QueryDataSource) { @@ -117,25 +113,30 @@ public class GroupByQueryQueryToolChest extends QueryToolChestnewArrayList()) - .build(); + + // We need the inner incremental index to have all the columns required by the outer query + final GroupByQuery innerQuery = new GroupByQuery.Builder(query) + .setAggregatorSpecs(aggs) + .setInterval(subquery.getIntervals()) + .setPostAggregatorSpecs(Lists.newArrayList()) + .build(); + + final GroupByQuery outerQuery = new GroupByQuery.Builder(query) + .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec())) + .build(); final IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter( makeIncrementalIndex(innerQuery, subqueryResult) ); - return engine.process(query, adapter); + return outerQuery.applyLimit(engine.process(outerQuery, adapter)); } else { - result = runner.run(query); - return postAggregate(query, makeIncrementalIndex(query, result)); + return query.applyLimit(postAggregate(query, makeIncrementalIndex(query, runner.run(query)))); } } - private Sequence postAggregate(final GroupByQuery query, IncrementalIndex index) { - Sequence sequence = Sequences.map( + return Sequences.map( Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), new Function() { @@ -151,7 +152,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest rows) diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java index eda54ea0dc3..3d78e112cb5 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -87,12 +87,17 @@ public class DefaultLimitSpec implements LimitSpec if (limit == Integer.MAX_VALUE) { return new SortingFn(ordering); - } - else { + } else { return new TopNFunction(ordering, limit); } } + @Override + public LimitSpec merge(LimitSpec other) + { + return this; + } + private Ordering makeComparator( List dimensions, List aggs, List postAggs ) @@ -200,12 +205,18 @@ public class DefaultLimitSpec implements LimitSpec @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } LimitingFn that = (LimitingFn) o; - if (limit != that.limit) return false; + if (limit != that.limit) { + return false; + } return true; } @@ -232,12 +243,18 @@ public class DefaultLimitSpec implements LimitSpec @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } SortingFn sortingFn = (SortingFn) o; - if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) return false; + if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) { + return false; + } return true; } @@ -273,13 +290,21 @@ public class DefaultLimitSpec implements LimitSpec @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } TopNFunction that = (TopNFunction) o; - if (limit != that.limit) return false; - if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) return false; + if (limit != that.limit) { + return false; + } + if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) { + return false; + } return true; } @@ -296,13 +321,21 @@ public class DefaultLimitSpec implements LimitSpec @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } DefaultLimitSpec that = (DefaultLimitSpec) o; - if (limit != that.limit) return false; - if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false; + if (limit != that.limit) { + return false; + } + if (columns != null ? !columns.equals(that.columns) : that.columns != null) { + return false; + } return true; } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java index 0d07f1f91c9..fa50d62016c 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/LimitSpec.java @@ -38,5 +38,11 @@ import java.util.List; }) public interface LimitSpec { - public Function, Sequence> build(List dimensions, List aggs, List postAggs); + public Function, Sequence> build( + List dimensions, + List aggs, + List postAggs + ); + + public LimitSpec merge(LimitSpec other); } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java index d975e24a65f..e71038d4918 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/NoopLimitSpec.java @@ -41,6 +41,12 @@ public class NoopLimitSpec implements LimitSpec return Functions.identity(); } + @Override + public LimitSpec merge(LimitSpec other) + { + return other; + } + @Override public String toString() { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 6b44645ac81..511b042e3e6 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -56,6 +56,7 @@ import io.druid.query.filter.JavaScriptDimFilter; import io.druid.query.filter.RegexDimFilter; import io.druid.query.groupby.having.EqualToHavingSpec; import io.druid.query.groupby.having.GreaterThanHavingSpec; +import io.druid.query.groupby.having.HavingSpec; import io.druid.query.groupby.having.OrHavingSpec; import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.query.groupby.orderby.LimitSpec; @@ -1113,6 +1114,97 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testSubqueryWithPostAggregatorsAndHaving() + { + final GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }")) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx_subagg", "index") + ) + ) + .setPostAggregatorSpecs( + Arrays.asList( + new ArithmeticPostAggregator( + "idx_subpostagg", + "+", + Arrays.asList( + new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"), + new ConstantPostAggregator("thousand", 1000, 1000) + ) + ) + + ) + ) + .setHavingSpec( + new HavingSpec() + { + @Override + public boolean eval(Row row) + { + return (row.getFloatMetric("idx_subpostagg") < 3800); + } + } + ) + .addOrderByColumn("alias") + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("alias", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + new LongSumAggregatorFactory("rows", "rows"), + new LongSumAggregatorFactory("idx", "idx_subpostagg") + ) + ) + .setPostAggregatorSpecs( + Arrays.asList( + new ArithmeticPostAggregator( + "idx", "+", Arrays.asList( + new FieldAccessPostAggregator("the_idx_agg", "idx"), + new ConstantPostAggregator("ten_thousand", 10000, 10000) + ) + ) + + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 11135.0), + createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 11118.0), + createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0), + createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0), + createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0), + createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0), + createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0), + + createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 11147.0), + createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 11112.0), + createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 11166.0), + createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 11113.0), + createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 13447.0), + createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 11114.0), + createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 13505.0), + createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 11097.0), + createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 11126.0) + ); + + // Subqueries are handled by the ToolChest + Iterable results = runQuery(query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } private Iterable runQuery(GroupByQuery query) {