make having and limitspec work for nested queries

This commit is contained in:
fjy 2014-06-10 15:50:13 -07:00
parent 5955ecf032
commit cde86d81e2
6 changed files with 192 additions and 50 deletions

View File

@ -72,7 +72,7 @@ public class GroupByQuery extends BaseQuery<Row>
private final List<AggregatorFactory> aggregatorSpecs; private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs; private final List<PostAggregator> postAggregatorSpecs;
private final Function<Sequence<Row>, Sequence<Row>> orderByLimitFn; private final Function<Sequence<Row>, Sequence<Row>> limitFn;
@JsonCreator @JsonCreator
public GroupByQuery( public GroupByQuery(
@ -85,8 +85,9 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs, @JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") HavingSpec havingSpec, @JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec, @JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("orderBy") LimitSpec orderBySpec, @JsonProperty("context") Map<String, Object> context,
@JsonProperty("context") Map<String, Object> context // Backwards compatible
@JsonProperty("orderBy") LimitSpec orderBySpec
) )
{ {
super(dataSource, querySegmentSpec, context); super(dataSource, querySegmentSpec, context);
@ -129,7 +130,7 @@ public class GroupByQuery extends BaseQuery<Row>
); );
} }
orderByLimitFn = postProcFn; limitFn = postProcFn;
} }
/** /**
@ -146,7 +147,7 @@ public class GroupByQuery extends BaseQuery<Row>
List<PostAggregator> postAggregatorSpecs, List<PostAggregator> postAggregatorSpecs,
HavingSpec havingSpec, HavingSpec havingSpec,
LimitSpec orderBySpec, LimitSpec orderBySpec,
Function<Sequence<Row>, Sequence<Row>> orderByLimitFn, Function<Sequence<Row>, Sequence<Row>> limitFn,
Map<String, Object> context Map<String, Object> context
) )
{ {
@ -159,7 +160,7 @@ public class GroupByQuery extends BaseQuery<Row>
this.postAggregatorSpecs = postAggregatorSpecs; this.postAggregatorSpecs = postAggregatorSpecs;
this.havingSpec = havingSpec; this.havingSpec = havingSpec;
this.limitSpec = orderBySpec; this.limitSpec = orderBySpec;
this.orderByLimitFn = orderByLimitFn; this.limitFn = limitFn;
} }
@JsonProperty("filter") @JsonProperty("filter")
@ -199,7 +200,7 @@ public class GroupByQuery extends BaseQuery<Row>
} }
@JsonProperty @JsonProperty
public LimitSpec getOrderBy() public LimitSpec getLimitSpec()
{ {
return limitSpec; return limitSpec;
} }
@ -218,7 +219,7 @@ public class GroupByQuery extends BaseQuery<Row>
public Sequence<Row> applyLimit(Sequence<Row> results) public Sequence<Row> applyLimit(Sequence<Row> results)
{ {
return orderByLimitFn.apply(results); return limitFn.apply(results);
} }
@Override @Override
@ -234,7 +235,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs, postAggregatorSpecs,
havingSpec, havingSpec,
limitSpec, limitSpec,
orderByLimitFn, limitFn,
computeOverridenContext(contextOverride) computeOverridenContext(contextOverride)
); );
} }
@ -252,7 +253,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs, postAggregatorSpecs,
havingSpec, havingSpec,
limitSpec, limitSpec,
orderByLimitFn, limitFn,
getContext() getContext()
); );
} }
@ -270,7 +271,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs, postAggregatorSpecs,
havingSpec, havingSpec,
limitSpec, limitSpec,
orderByLimitFn, limitFn,
getContext() getContext()
); );
} }
@ -300,7 +301,7 @@ public class GroupByQuery extends BaseQuery<Row>
{ {
dataSource = query.getDataSource(); dataSource = query.getDataSource();
querySegmentSpec = query.getQuerySegmentSpec(); querySegmentSpec = query.getQuerySegmentSpec();
limitSpec = query.getOrderBy(); limitSpec = query.getLimitSpec();
dimFilter = query.getDimFilter(); dimFilter = query.getDimFilter();
granularity = query.getGranularity(); granularity = query.getGranularity();
dimensions = query.getDimensions(); dimensions = query.getDimensions();
@ -504,7 +505,11 @@ public class GroupByQuery extends BaseQuery<Row>
{ {
final LimitSpec theLimitSpec; final LimitSpec theLimitSpec;
if (limitSpec == null) { if (limitSpec == null) {
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit); if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) {
theLimitSpec = new NoopLimitSpec();
} else {
theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
}
} else { } else {
theLimitSpec = limitSpec; theLimitSpec = limitSpec;
} }
@ -518,9 +523,9 @@ public class GroupByQuery extends BaseQuery<Row>
aggregatorSpecs, aggregatorSpecs,
postAggregatorSpecs, postAggregatorSpecs,
havingSpec, havingSpec,
null,
theLimitSpec, theLimitSpec,
context context,
null
); );
} }
} }
@ -535,7 +540,7 @@ public class GroupByQuery extends BaseQuery<Row>
", dimensions=" + dimensions + ", dimensions=" + dimensions +
", aggregatorSpecs=" + aggregatorSpecs + ", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs + ", postAggregatorSpecs=" + postAggregatorSpecs +
", orderByLimitFn=" + orderByLimitFn + ", limitFn=" + limitFn +
'}'; '}';
} }
@ -572,7 +577,7 @@ public class GroupByQuery extends BaseQuery<Row>
if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) { if (limitSpec != null ? !limitSpec.equals(that.limitSpec) : that.limitSpec != null) {
return false; return false;
} }
if (orderByLimitFn != null ? !orderByLimitFn.equals(that.orderByLimitFn) : that.orderByLimitFn != null) { if (limitFn != null ? !limitFn.equals(that.limitFn) : that.limitFn != null) {
return false; return false;
} }
if (postAggregatorSpecs != null if (postAggregatorSpecs != null
@ -595,7 +600,7 @@ public class GroupByQuery extends BaseQuery<Row>
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0); result = 31 * result + (aggregatorSpecs != null ? aggregatorSpecs.hashCode() : 0);
result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0); result = 31 * result + (postAggregatorSpecs != null ? postAggregatorSpecs.hashCode() : 0);
result = 31 * result + (orderByLimitFn != null ? orderByLimitFn.hashCode() : 0); result = 31 * result + (limitFn != null ? limitFn.hashCode() : 0);
return result; return result;
} }
} }

View File

@ -44,7 +44,6 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner; import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
@ -52,7 +51,6 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Minutes; import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -89,7 +87,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override @Override
public Sequence<Row> run(Query<Row> input) public Sequence<Row> run(Query<Row> input)
{ {
if (Boolean.valueOf((String) input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) { if (Boolean.valueOf(input.getContextValue(GROUP_BY_MERGE_KEY, "true"))) {
return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner); return mergeGroupByResults(((GroupByQuery) input).withOverriddenContext(NO_MERGE_CONTEXT), runner);
} else { } else {
return runner.run(input); return runner.run(input);
@ -100,8 +98,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner) private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
{ {
Sequence<Row> result;
// If there's a subquery, merge subquery results and then apply the aggregator // If there's a subquery, merge subquery results and then apply the aggregator
final DataSource dataSource = query.getDataSource(); final DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) { if (dataSource instanceof QueryDataSource) {
@ -117,25 +113,30 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
aggs.addAll(aggregatorFactory.getBaseFactories()); aggs.addAll(aggregatorFactory.getBaseFactories());
} }
final GroupByQuery innerQuery = new GroupByQuery.Builder(query).setAggregatorSpecs(aggs)
.setInterval(subquery.getIntervals()) // We need the inner incremental index to have all the columns required by the outer query
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList()) final GroupByQuery innerQuery = new GroupByQuery.Builder(query)
.build(); .setAggregatorSpecs(aggs)
.setInterval(subquery.getIntervals())
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList())
.build();
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build();
final IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter( final IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(
makeIncrementalIndex(innerQuery, subqueryResult) makeIncrementalIndex(innerQuery, subqueryResult)
); );
return engine.process(query, adapter); return outerQuery.applyLimit(engine.process(outerQuery, adapter));
} else { } else {
result = runner.run(query); return query.applyLimit(postAggregate(query, makeIncrementalIndex(query, runner.run(query))));
return postAggregate(query, makeIncrementalIndex(query, result));
} }
} }
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index) private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
{ {
Sequence<Row> sequence = Sequences.map( return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>() new Function<Row, Row>()
{ {
@ -151,7 +152,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
} }
} }
); );
return query.applyLimit(sequence);
} }
private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows) private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)

View File

@ -87,12 +87,17 @@ public class DefaultLimitSpec implements LimitSpec
if (limit == Integer.MAX_VALUE) { if (limit == Integer.MAX_VALUE) {
return new SortingFn(ordering); return new SortingFn(ordering);
} } else {
else {
return new TopNFunction(ordering, limit); return new TopNFunction(ordering, limit);
} }
} }
@Override
public LimitSpec merge(LimitSpec other)
{
return this;
}
private Ordering<Row> makeComparator( private Ordering<Row> makeComparator(
List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs
) )
@ -200,12 +205,18 @@ public class DefaultLimitSpec implements LimitSpec
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LimitingFn that = (LimitingFn) o; LimitingFn that = (LimitingFn) o;
if (limit != that.limit) return false; if (limit != that.limit) {
return false;
}
return true; return true;
} }
@ -232,12 +243,18 @@ public class DefaultLimitSpec implements LimitSpec
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SortingFn sortingFn = (SortingFn) o; SortingFn sortingFn = (SortingFn) o;
if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) return false; if (ordering != null ? !ordering.equals(sortingFn.ordering) : sortingFn.ordering != null) {
return false;
}
return true; return true;
} }
@ -273,13 +290,21 @@ public class DefaultLimitSpec implements LimitSpec
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TopNFunction that = (TopNFunction) o; TopNFunction that = (TopNFunction) o;
if (limit != that.limit) return false; if (limit != that.limit) {
if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) return false; return false;
}
if (sorter != null ? !sorter.equals(that.sorter) : that.sorter != null) {
return false;
}
return true; return true;
} }
@ -296,13 +321,21 @@ public class DefaultLimitSpec implements LimitSpec
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultLimitSpec that = (DefaultLimitSpec) o; DefaultLimitSpec that = (DefaultLimitSpec) o;
if (limit != that.limit) return false; if (limit != that.limit) {
if (columns != null ? !columns.equals(that.columns) : that.columns != null) return false; return false;
}
if (columns != null ? !columns.equals(that.columns) : that.columns != null) {
return false;
}
return true; return true;
} }

View File

@ -38,5 +38,11 @@ import java.util.List;
}) })
public interface LimitSpec public interface LimitSpec
{ {
public Function<Sequence<Row>, Sequence<Row>> build(List<DimensionSpec> dimensions, List<AggregatorFactory> aggs, List<PostAggregator> postAggs); public Function<Sequence<Row>, Sequence<Row>> build(
List<DimensionSpec> dimensions,
List<AggregatorFactory> aggs,
List<PostAggregator> postAggs
);
public LimitSpec merge(LimitSpec other);
} }

View File

@ -41,6 +41,12 @@ public class NoopLimitSpec implements LimitSpec
return Functions.identity(); return Functions.identity();
} }
@Override
public LimitSpec merge(LimitSpec other)
{
return other;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -56,6 +56,7 @@ import io.druid.query.filter.JavaScriptDimFilter;
import io.druid.query.filter.RegexDimFilter; import io.druid.query.filter.RegexDimFilter;
import io.druid.query.groupby.having.EqualToHavingSpec; import io.druid.query.groupby.having.EqualToHavingSpec;
import io.druid.query.groupby.having.GreaterThanHavingSpec; import io.druid.query.groupby.having.GreaterThanHavingSpec;
import io.druid.query.groupby.having.HavingSpec;
import io.druid.query.groupby.having.OrHavingSpec; import io.druid.query.groupby.having.OrHavingSpec;
import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.groupby.orderby.LimitSpec; import io.druid.query.groupby.orderby.LimitSpec;
@ -1113,6 +1114,97 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testSubqueryWithPostAggregatorsAndHaving()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx_subagg", "index")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"idx_subpostagg",
"+",
Arrays.asList(
new FieldAccessPostAggregator("the_idx_subagg", "idx_subagg"),
new ConstantPostAggregator("thousand", 1000, 1000)
)
)
)
)
.setHavingSpec(
new HavingSpec()
{
@Override
public boolean eval(Row row)
{
return (row.getFloatMetric("idx_subpostagg") < 3800);
}
}
)
.addOrderByColumn("alias")
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("alias", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new LongSumAggregatorFactory("rows", "rows"),
new LongSumAggregatorFactory("idx", "idx_subpostagg")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"idx", "+", Arrays.asList(
new FieldAccessPostAggregator("the_idx_agg", "idx"),
new ConstantPostAggregator("ten_thousand", 10000, 10000)
)
)
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 11135.0),
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 11118.0),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 11158.0),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 11120.0),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 11121.0),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 11078.0),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 11119.0),
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 11147.0),
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 11112.0),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 11166.0),
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 11113.0),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 13447.0),
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 11114.0),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 13505.0),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 11097.0),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 11126.0)
);
// Subqueries are handled by the ToolChest
Iterable<Row> results = runQuery(query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
private Iterable<Row> runQuery(GroupByQuery query) private Iterable<Row> runQuery(GroupByQuery query)
{ {