mirror of https://github.com/apache/druid.git
Merge pull request #1781 from dclim/nested-groupby-multiple-same-aggregator-fix-v2
Fix failure in nested groupBy with multiple aggregators with same fie…
This commit is contained in:
commit
dae488b7c0
|
@ -114,21 +114,6 @@ public class ApproximateHistogramAggregatorFactory implements AggregatorFactory
|
|||
return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(
|
||||
new ApproximateHistogramAggregatorFactory(
|
||||
fieldName,
|
||||
fieldName,
|
||||
resolution,
|
||||
numBuckets,
|
||||
lowerLimit,
|
||||
upperLimit
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -62,13 +62,6 @@ public interface AggregatorFactory
|
|||
*/
|
||||
public AggregatorFactory getCombiningFactory();
|
||||
|
||||
/**
|
||||
* Gets a list of all columns that this AggregatorFactory will scan
|
||||
*
|
||||
* @return AggregatorFactories for the columns to scan of the parent AggregatorFactory
|
||||
*/
|
||||
public List<AggregatorFactory> getRequiredColumns();
|
||||
|
||||
/**
|
||||
* A method that knows how to "deserialize" the object from whatever form it might have been put into
|
||||
* in order to transfer via JSON.
|
||||
|
|
|
@ -75,12 +75,6 @@ public class CountAggregatorFactory implements AggregatorFactory
|
|||
return new LongSumAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new CountAggregatorFactory(name));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -81,12 +81,6 @@ public class DoubleMaxAggregatorFactory implements AggregatorFactory
|
|||
return new DoubleMaxAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new DoubleMaxAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -81,12 +81,6 @@ public class DoubleMinAggregatorFactory implements AggregatorFactory
|
|||
return new DoubleMinAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new DoubleMinAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -84,12 +84,6 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
|
|||
return new DoubleSumAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new DoubleSumAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -153,12 +153,6 @@ public class FilteredAggregatorFactory implements AggregatorFactory
|
|||
return filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return delegate.getRequiredColumns();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -98,12 +98,6 @@ public class HistogramAggregatorFactory implements AggregatorFactory
|
|||
return new HistogramAggregatorFactory(name, name, breaksList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new HistogramAggregatorFactory(fieldName, fieldName, breaksList));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -135,22 +135,6 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
|
|||
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Lists.transform(
|
||||
fieldNames,
|
||||
new com.google.common.base.Function<String, AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public AggregatorFactory apply(String input)
|
||||
{
|
||||
return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -83,12 +83,6 @@ public class LongMaxAggregatorFactory implements AggregatorFactory
|
|||
return new LongMaxAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new LongMaxAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -83,12 +83,6 @@ public class LongMinAggregatorFactory implements AggregatorFactory
|
|||
return new LongMinAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new LongMinAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -84,12 +84,6 @@ public class LongSumAggregatorFactory implements AggregatorFactory
|
|||
return new LongSumAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new LongSumAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -144,22 +144,6 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
|
|||
return new HyperUniquesAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Lists.transform(
|
||||
fieldNames,
|
||||
new Function<String, AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public AggregatorFactory apply(String input)
|
||||
{
|
||||
return new CardinalityAggregatorFactory(input, fieldNames, byRow);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -137,12 +137,6 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
|
|||
return new HyperUniquesAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new HyperUniquesAggregatorFactory(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
|
|
|
@ -30,6 +30,7 @@ import com.google.common.collect.Iterables;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
|
@ -56,6 +57,7 @@ import io.druid.query.QueryRunner;
|
|||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.SubqueryQueryRunner;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.MetricManipulationFn;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
|
@ -158,12 +160,36 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
}
|
||||
|
||||
final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
|
||||
final List<AggregatorFactory> aggs = Lists.newArrayList();
|
||||
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
|
||||
aggs.addAll(aggregatorFactory.getRequiredColumns());
|
||||
|
||||
// check that all fieldName parameters in the outer query match up with a name parameter in the inner query
|
||||
// for an aggregator or a post aggregator
|
||||
Set<String> innerFieldNames = Sets.newHashSet();
|
||||
for (AggregatorFactory innerAggregator : subquery.getAggregatorSpecs()) {
|
||||
innerFieldNames.add(innerAggregator.getName());
|
||||
}
|
||||
for (PostAggregator innerPostAggregator : subquery.getPostAggregatorSpecs()) {
|
||||
innerFieldNames.add(innerPostAggregator.getName());
|
||||
}
|
||||
|
||||
for (AggregatorFactory outerAggregator : query.getAggregatorSpecs()) {
|
||||
for (final String fieldName : outerAggregator.requiredFields()) {
|
||||
if (!innerFieldNames.contains(fieldName)) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Subquery must have an aggregator or post aggregator with name '%s'", fieldName)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We need the inner incremental index to have all the columns required by the outer query
|
||||
final List<AggregatorFactory> aggs = Lists.newArrayList(subquery.getAggregatorSpecs());
|
||||
for (PostAggregator postAgg : subquery.getPostAggregatorSpecs()) {
|
||||
// This causes the post aggregators from the inner query to be copied to the incremental index so that they are
|
||||
// available as columns for the outer query. The data isn't modified by the aggregator since it has already
|
||||
// been fully grouped by the inner query. Somewhat of a hack to get this working with an incremental index.
|
||||
aggs.add(new DoubleSumAggregatorFactory(postAgg.getName(), postAgg.getName()));
|
||||
}
|
||||
|
||||
final GroupByQuery innerQuery = new GroupByQuery.Builder(subquery)
|
||||
.setAggregatorSpecs(aggs)
|
||||
.setInterval(subquery.getIntervals())
|
||||
|
|
|
@ -51,6 +51,7 @@ import io.druid.query.aggregation.FilteredAggregatorFactory;
|
|||
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
|
@ -2436,6 +2437,68 @@ public class GroupByQueryRunnerTest
|
|||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentGroupingSubqueryMultipleAggregatorsOnSameField()
|
||||
{
|
||||
GroupByQuery subquery = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory("idx", "index")
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
Lists.<PostAggregator>newArrayList(
|
||||
new ArithmeticPostAggregator(
|
||||
"post_agg",
|
||||
"+",
|
||||
Lists.<PostAggregator>newArrayList(
|
||||
new FieldAccessPostAggregator("idx", "idx"),
|
||||
new FieldAccessPostAggregator("idx", "idx")
|
||||
)
|
||||
),
|
||||
new ArithmeticPostAggregator(
|
||||
"post_agg2",
|
||||
"quotient",
|
||||
Lists.<PostAggregator>newArrayList(
|
||||
new FieldAccessPostAggregator("idx", "idx"),
|
||||
new ConstantPostAggregator("constant", 1.23)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource(subquery)
|
||||
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||
.setAggregatorSpecs(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
new DoubleMaxAggregatorFactory("idx1", "idx"),
|
||||
new DoubleMaxAggregatorFactory("idx2", "idx"),
|
||||
new DoubleMaxAggregatorFactory("idx3", "post_agg"),
|
||||
new DoubleMaxAggregatorFactory("idx4", "post_agg2")
|
||||
)
|
||||
)
|
||||
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||
.build();
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "idx1", 2900.0, "idx2", 2900.0,
|
||||
"idx3", 5800.0, "idx4", 2357.7236328125),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "idx1", 2505.0, "idx2", 2505.0,
|
||||
"idx3", 5010.0, "idx4", 2036.5853271484375)
|
||||
);
|
||||
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentGroupingSubqueryWithFilter()
|
||||
|
|
Loading…
Reference in New Issue