mirror of https://github.com/apache/druid.git
Support multiple outer aggregators of same type and provide more helpful exception when the same inner aggregator is referenced by multiple types of outer aggregators
This commit is contained in:
parent
4da1575680
commit
f4e0a76820
|
@ -30,7 +30,9 @@ import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import com.metamx.common.IAE;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
import com.metamx.common.guava.Accumulator;
|
import com.metamx.common.guava.Accumulator;
|
||||||
|
@ -158,14 +160,27 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
|
final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
|
||||||
final List<AggregatorFactory> aggs = Lists.newArrayList();
|
final Set<AggregatorFactory> aggs = Sets.newHashSet();
|
||||||
|
|
||||||
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
|
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
|
||||||
aggs.addAll(aggregatorFactory.getRequiredColumns());
|
for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) {
|
||||||
|
if (Iterables.any(aggs, new Predicate<AggregatorFactory>() {
|
||||||
|
@Override
|
||||||
|
public boolean apply(AggregatorFactory agg) {
|
||||||
|
return agg.getName().equals(transferAgg.getName()) && !agg.equals(transferAgg);
|
||||||
|
}
|
||||||
|
})) {
|
||||||
|
throw new IAE("Inner aggregator can currently only be referenced by a single type of outer aggregator" +
|
||||||
|
" for '%s'", transferAgg.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
aggs.add(transferAgg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We need the inner incremental index to have all the columns required by the outer query
|
// We need the inner incremental index to have all the columns required by the outer query
|
||||||
final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery)
|
final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery)
|
||||||
.setAggregatorSpecs(aggs)
|
.setAggregatorSpecs(Lists.newArrayList(aggs))
|
||||||
.setInterval(subquery.getIntervals())
|
.setInterval(subquery.getIntervals())
|
||||||
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList())
|
.setPostAggregatorSpecs(Lists.<PostAggregator>newArrayList())
|
||||||
.build();
|
.build();
|
||||||
|
|
|
@ -2436,6 +2436,61 @@ public class GroupByQueryRunnerTest
|
||||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDifferentGroupingSubqueryMultipleAggregatorsOnSameField()
|
||||||
|
{
|
||||||
|
GroupByQuery subquery = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
Arrays.asList(
|
||||||
|
QueryRunnerTestHelper.rowsCount,
|
||||||
|
new LongSumAggregatorFactory("idx", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setPostAggregatorSpecs(
|
||||||
|
Lists.<PostAggregator>newArrayList(
|
||||||
|
new ArithmeticPostAggregator(
|
||||||
|
"post_agg",
|
||||||
|
"+",
|
||||||
|
Lists.<PostAggregator>newArrayList(
|
||||||
|
new FieldAccessPostAggregator("idx", "idx"),
|
||||||
|
new FieldAccessPostAggregator("idx", "idx")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
GroupByQuery query = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(subquery)
|
||||||
|
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
Arrays.<AggregatorFactory>asList(
|
||||||
|
new DoubleMaxAggregatorFactory("idx1", "idx"),
|
||||||
|
new DoubleMaxAggregatorFactory("idx2", "idx"),
|
||||||
|
new DoubleMaxAggregatorFactory("idx3", "post_agg"),
|
||||||
|
new DoubleMaxAggregatorFactory("idx4", "post_agg")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Row> expectedResults = Arrays.asList(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "idx1", 2900.0, "idx2", 2900.0,
|
||||||
|
"idx3", 5800.0, "idx4", 5800.0),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "idx1", 2505.0, "idx2", 2505.0,
|
||||||
|
"idx3", 5010.0, "idx4", 5010.0)
|
||||||
|
);
|
||||||
|
|
||||||
|
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDifferentGroupingSubqueryWithFilter()
|
public void testDifferentGroupingSubqueryWithFilter()
|
||||||
|
|
Loading…
Reference in New Issue