Fix an issue that after broker forwards GroupByQuery to historical, havingSpec is still applied

on postAggregations which are removed in the forwarded query.

Add a unit test to replicate the issue.
Add a query that can replicate this issue into integration test.
This commit is contained in:
Bingkun Guo 2015-04-16 15:46:48 -05:00
parent 8b68b8f3c2
commit cf155e4eba
3 changed files with 136 additions and 1 deletions

View File

@ -12,5 +12,53 @@
"maxTime" : "2013-08-31T12:41:27.000Z" "maxTime" : "2013-08-31T12:41:27.000Z"
} }
} ] } ]
},
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"wikipedia_index_test",
"granularity":"day",
"dimensions":[
"page"
],
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"added",
"name":"added_count"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"2013-08-31T00:00/2013-09-01T00:00"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "2013-08-31T00:00:00.000Z",
"event" : {
"added_count_times_ten" : 9050.0,
"page" : "Crimson Typhoon",
"added_count" : 905,
"rows" : 1
}
} ]
} }
] ]

View File

@ -190,7 +190,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
query.getAggregatorSpecs(), query.getAggregatorSpecs(),
// Don't do post aggs until the end of this method. // Don't do post aggs until the end of this method.
ImmutableList.<PostAggregator>of(), ImmutableList.<PostAggregator>of(),
query.getHavingSpec(), // Don't do "having" clause until the end of this method.
null,
query.getLimitSpec(), query.getLimitSpec(),
query.getContext() query.getContext()
).withOverriddenContext( ).withOverriddenContext(

View File

@ -1510,6 +1510,92 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery, context), "merged"); TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery, context), "merged");
} }
@Test
public void testMergedPostAggHavingSpec()
{
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L, "rows_times_10", 20.0),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L, "rows_times_10", 60.0),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L, "rows_times_10", 60.0)
);
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setPostAggregatorSpecs(
Arrays.<PostAggregator>asList(
new ArithmeticPostAggregator(
"rows_times_10",
"*",
Arrays.<PostAggregator>asList(
new FieldAccessPostAggregator(
"rows",
"rows"
),
new ConstantPostAggregator(
"const",
10L
)
)
)
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setHavingSpec(
new OrHavingSpec(
ImmutableList.of(
new GreaterThanHavingSpec("rows_times_10", 20L),
new EqualToHavingSpec("idx", 217L)
)
)
);
final GroupByQuery fullQuery = builder.build();
QueryRunner mergedRunner = factory.getToolchest().mergeResults(
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(
Query<Row> query, Map<String, Object> responseContext
)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return Sequences.concat(
runner.run(query1, responseContext),
runner.run(query2, responseContext)
);
}
}
);
Map<String, Object> context = Maps.newHashMap();
// add an extra layer of merging, simulate broker forwarding query to historical
TestHelper.assertExpectedObjects(
expectedResults,
factory.getToolchest().postMergeQueryDecoration(
factory.getToolchest().mergeResults(
factory.getToolchest().preMergeQueryDecoration(mergedRunner)
)
).run(fullQuery, context),
"merged"
);
}
@Test @Test
public void testGroupByWithRegEx() throws Exception public void testGroupByWithRegEx() throws Exception
{ {