Fix failure in nested groupBy with multiple aggregators with same fieldName

Version 2 - Throws an exception if an outer query references an
aggregator that doesn't exist in the inner query, and then uses the
inner query aggregator names to form the columns for the intermediate
incremental index.

Also deleted all the getRequiredColumns() methods which are no longer
being used.

We do something wacky by adding an aggregator factory for the post
aggregators when building the intermediate incremental index, otherwise
queries on post aggregate results fail because the data isn't in the
incremental index.

Closes #1419
This commit is contained in:
David Lim 2015-09-22 15:45:00 -06:00 committed by dclim
parent 8199ecf1a4
commit 70ae5ca922
16 changed files with 78 additions and 117 deletions

View File

@ -114,21 +114,6 @@ public class ApproximateHistogramAggregatorFactory implements AggregatorFactory
return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(
new ApproximateHistogramAggregatorFactory(
fieldName,
fieldName,
resolution,
numBuckets,
lowerLimit,
upperLimit
)
);
}
@Override
public Object deserialize(Object object)
{

View File

@ -62,13 +62,6 @@ public interface AggregatorFactory
*/
public AggregatorFactory getCombiningFactory();
/**
* Gets a list of all columns that this AggregatorFactory will scan
*
* @return AggregatorFactories for the columns to scan of the parent AggregatorFactory
*/
public List<AggregatorFactory> getRequiredColumns();
/**
* A method that knows how to "deserialize" the object from whatever form it might have been put into
* in order to transfer via JSON.

View File

@ -75,12 +75,6 @@ public class CountAggregatorFactory implements AggregatorFactory
return new LongSumAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new CountAggregatorFactory(name));
}
@Override
public Object deserialize(Object object)
{

View File

@ -81,12 +81,6 @@ public class DoubleMaxAggregatorFactory implements AggregatorFactory
return new DoubleMaxAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new DoubleMaxAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{

View File

@ -81,12 +81,6 @@ public class DoubleMinAggregatorFactory implements AggregatorFactory
return new DoubleMinAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new DoubleMinAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{

View File

@ -84,12 +84,6 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
return new DoubleSumAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new DoubleSumAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{

View File

@ -153,12 +153,6 @@ public class FilteredAggregatorFactory implements AggregatorFactory
return filter;
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return delegate.getRequiredColumns();
}
@Override
public String toString()
{

View File

@ -98,12 +98,6 @@ public class HistogramAggregatorFactory implements AggregatorFactory
return new HistogramAggregatorFactory(name, name, breaksList);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new HistogramAggregatorFactory(fieldName, fieldName, breaksList));
}
@Override
public Object deserialize(Object object)
{

View File

@ -135,22 +135,6 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Lists.transform(
fieldNames,
new com.google.common.base.Function<String, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(String input)
{
return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine);
}
}
);
}
@Override
public Object deserialize(Object object)
{

View File

@ -83,12 +83,6 @@ public class LongMaxAggregatorFactory implements AggregatorFactory
return new LongMaxAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongMaxAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{

View File

@ -83,12 +83,6 @@ public class LongMinAggregatorFactory implements AggregatorFactory
return new LongMinAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongMinAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{

View File

@ -84,12 +84,6 @@ public class LongSumAggregatorFactory implements AggregatorFactory
return new LongSumAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongSumAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{

View File

@ -144,22 +144,6 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
return new HyperUniquesAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Lists.transform(
fieldNames,
new Function<String, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(String input)
{
return new CardinalityAggregatorFactory(input, fieldNames, byRow);
}
}
);
}
@Override
public Object deserialize(Object object)
{

View File

@ -137,12 +137,6 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
return new HyperUniquesAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new HyperUniquesAggregatorFactory(fieldName, fieldName));
}
@Override
public Object deserialize(Object object)
{

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
@ -56,6 +57,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
@ -158,12 +160,33 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
final List<AggregatorFactory> aggs = Lists.newArrayList();
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
aggs.addAll(aggregatorFactory.getRequiredColumns());
// check that all fieldName parameters in the outer query match up with a name parameter in the inner query
// for an aggregator or a post aggregator
Set<String> innerFieldNames = Sets.newHashSet();
for (AggregatorFactory innerAggregator : subquery.getAggregatorSpecs()) {
innerFieldNames.add(innerAggregator.getName());
}
for (PostAggregator innerPostAggregator : subquery.getPostAggregatorSpecs()) {
innerFieldNames.add(innerPostAggregator.getName());
}
for (AggregatorFactory outerAggregator : query.getAggregatorSpecs()) {
for (final String fieldName : outerAggregator.requiredFields()) {
if (!innerFieldNames.contains(fieldName)) {
throw new IllegalArgumentException(
String.format("Subquery must have an aggregator or post aggregator with name '%s'", fieldName)
);
}
}
}
// We need the inner incremental index to have all the columns required by the outer query
final List<AggregatorFactory> aggs = Lists.newArrayList(subquery.getAggregatorSpecs());
for (PostAggregator postAgg : subquery.getPostAggregatorSpecs()) {
aggs.add(new CountAggregatorFactory(postAgg.getName())); // aggregator type doesn't matter here
}
final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery)
.setAggregatorSpecs(aggs)
.setInterval(subquery.getIntervals())

View File

@ -51,6 +51,7 @@ import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
@ -2436,6 +2437,57 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testDifferentGroupingSubqueryMultipleAggregatorsOnSameField()
{
GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setPostAggregatorSpecs(
Lists.<PostAggregator>newArrayList(
new ArithmeticPostAggregator(
"post_agg",
"+",
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator("idx", "idx"),
new FieldAccessPostAggregator("idx", "idx")
)
)
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
new DoubleMaxAggregatorFactory("idx1", "idx"),
new DoubleMaxAggregatorFactory("idx2", "idx"),
new DoubleMaxAggregatorFactory("idx3", "post_agg")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "idx1", 2900.0, "idx2", 2900.0, "idx3", 5800.0),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "idx1", 2505.0, "idx2", 2505.0, "idx3", 5010.0)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testDifferentGroupingSubqueryWithFilter()