From d66ae5ac1079ad5104e506d4138faf4e29220e67 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 15 Apr 2014 13:37:31 +0530 Subject: [PATCH] fix dependent PostAggregators --- .../query/aggregation/AggregatorUtil.java | 53 ++++++++++++++ .../AggregateTopNMetricFirstAlgorithm.java | 8 ++- .../topn/DimExtractionTopNAlgorithm.java | 11 +-- .../query/topn/InvertedTopNMetricSpec.java | 6 +- .../topn/LexicographicTopNMetricSpec.java | 6 +- .../query/topn/NumericTopNMetricSpec.java | 6 +- .../druid/query/topn/PooledTopNAlgorithm.java | 26 ++++--- .../io/druid/query/topn/TopNBinaryFn.java | 24 +++++-- .../topn/TopNLexicographicResultBuilder.java | 10 +-- .../io/druid/query/topn/TopNMetricSpec.java | 4 +- .../query/topn/TopNNumericResultBuilder.java | 18 ++--- .../druid/query/topn/TopNResultBuilder.java | 4 +- .../io/druid/query/QueryRunnerTestHelper.java | 14 +++- .../query/aggregation/AggregatorUtilTest.java | 69 ++++++++++++++++++ .../druid/query/topn/TopNQueryRunnerTest.java | 71 +++++++++++++++++++ 15 files changed, 282 insertions(+), 48 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java create mode 100644 processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java new file mode 100644 index 00000000000..6c4b49fd067 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java @@ -0,0 +1,53 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import com.google.common.collect.Lists; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +public class AggregatorUtil +{ + /** returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg + * @param postAggregatorList List of postAggregator, there is a restriction that the postAgg should be in order that it can + * @param postAggName name of the postAgg on which dependency is to be calculated + */ + public static List pruneDependentPostAgg(List postAggregatorList, String postAggName) + { + LinkedList rv = Lists.newLinkedList(); + Set deps = new HashSet<>(); + deps.add(postAggName); + // Iterate backwards to calculate deps + for (int i = postAggregatorList.size() - 1; i >= 0; i--) { + PostAggregator agg = postAggregatorList.get(i); + if (deps.contains(agg.getName())) { + rv.addFirst(agg); // add to the beginning of List + deps.remove(agg.getName()); + deps.addAll(agg.getDependentFields()); + } + } + + return rv; + } + +} diff --git a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index 84be056fea1..cd1a60e72fc 100644 --- a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -56,7 +56,6 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm aggFactories, + List postAggs ) { - return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator); + return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator, aggFactories, postAggs); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java index 5c1b9ab41e8..b7c7c6a2565 100644 --- a/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/LexicographicTopNMetricSpec.java @@ -80,10 +80,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec DateTime timestamp, DimensionSpec dimSpec, int threshold, - Comparator comparator + Comparator comparator, + List aggFactories, + List postAggs ) { - return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator); + return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator, aggFactories); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java index fe5bce241a6..9ad97e239cd 100644 --- a/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/NumericTopNMetricSpec.java @@ -121,10 +121,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec DateTime timestamp, DimensionSpec dimSpec, int threshold, - Comparator comparator + Comparator comparator, + List aggFactories, + List postAggs ) { - return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator); + return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator, aggFactories, postAggs); } @Override diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index d87631c7b57..6dd44d0a4a8 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -35,7 +35,8 @@ import java.util.Comparator; /** */ -public class PooledTopNAlgorithm extends BaseTopNAlgorithm +public class PooledTopNAlgorithm + extends BaseTopNAlgorithm { private final Capabilities capabilities; private final TopNQuery query; @@ -117,7 +118,12 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm resultsBufHolder; private final ByteBuffer resultsBuf; private final int[] aggregatorSizes; @@ -278,6 +277,11 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm getResultsBufHolder() { return resultsBufHolder; diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java index 8c91b8f7b53..c6ab513c726 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java @@ -24,6 +24,7 @@ import io.druid.granularity.AllGranularity; import io.druid.granularity.QueryGranularity; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import org.joda.time.DateTime; @@ -63,9 +64,13 @@ public class TopNBinaryFn implements BinaryFn, Result, Result retVal = new LinkedHashMap(aggregations.size() + 2); retVal.put(dimension, dimensionValue); @@ -103,9 +108,7 @@ public class TopNBinaryFn implements BinaryFn, Result, Result aggFactories; private MinMaxPriorityQueue pQueue = null; public TopNLexicographicResultBuilder( @@ -48,12 +48,14 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder DimensionSpec dimSpec, int threshold, String previousStop, - final Comparator comparator + final Comparator comparator, + List aggFactories ) { this.timestamp = timestamp; this.dimSpec = dimSpec; this.previousStop = previousStop; + this.aggFactories = aggFactories; instantiatePQueue(threshold, comparator); } @@ -62,9 +64,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder public TopNResultBuilder addEntry( String dimName, Object dimValIndex, - Object[] metricVals, - List aggFactories, - List postAggs + Object[] metricVals ) { Map metricValues = Maps.newLinkedHashMap(); diff --git a/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java b/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java index 44103f93708..267f2f278dd 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMetricSpec.java @@ -47,7 +47,9 @@ public interface TopNMetricSpec DateTime timestamp, DimensionSpec dimSpec, int threshold, - Comparator comparator + Comparator comparator, + List aggFactories, + List postAggs ); public byte[] getCacheKey(); diff --git a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java index bfe142efbb5..4a40f4bb2d5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import org.joda.time.DateTime; @@ -40,6 +41,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder private final DateTime timestamp; private final DimensionSpec dimSpec; private final String metricName; + private final List aggFactories; + private final List postAggs; private MinMaxPriorityQueue pQueue = null; public TopNNumericResultBuilder( @@ -47,12 +50,16 @@ public class TopNNumericResultBuilder implements TopNResultBuilder DimensionSpec dimSpec, String metricName, int threshold, - final Comparator comparator + final Comparator comparator, + List aggFactories, + List postAggs ) { this.timestamp = timestamp; this.dimSpec = dimSpec; this.metricName = metricName; + this.aggFactories = aggFactories; + this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName); instantiatePQueue(threshold, comparator); } @@ -61,9 +68,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder public TopNResultBuilder addEntry( String dimName, Object dimValIndex, - Object[] metricVals, - List aggFactories, - List postAggs + Object[] metricVals ) { Map metricValues = Maps.newLinkedHashMap(); @@ -76,10 +81,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder } for (PostAggregator postAgg : postAggs) { - if (postAgg.getName().equalsIgnoreCase(metricName)) { - metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); - break; - } + metricValues.put(postAgg.getName(), postAgg.compute(metricValues)); } Object topNMetricVal = metricValues.get(metricName); diff --git a/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java b/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java index 5823ee3eece..97b20175380 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java +++ b/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java @@ -33,9 +33,7 @@ public interface TopNResultBuilder public TopNResultBuilder addEntry( String dimName, Object dimValIndex, - Object[] metricVals, - List aggFactories, - List postAggs + Object[] metricVals ); public TopNResultBuilder addEntry( diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 7ca66b53d7e..b7e7fe0f6de 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -60,6 +60,7 @@ public class QueryRunnerTestHelper public static final String indexMetric = "index"; public static final String uniqueMetric = "uniques"; public static final String addRowsIndexConstantMetric = "addRowsIndexConstant"; + public static String dependentPostAggMetric = "dependentPostAgg"; public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index"); public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); @@ -72,8 +73,19 @@ public class QueryRunnerTestHelper public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); public static final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator( - "addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) + addRowsIndexConstantMetric, "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) ); + // dependent on AddRowsIndexContact postAgg + public static final ArithmeticPostAggregator dependentPostAgg = new ArithmeticPostAggregator( + dependentPostAggMetric, + "+", + Lists.newArrayList( + constant, + new FieldAccessPostAggregator(addRowsIndexConstantMetric, addRowsIndexConstantMetric) + ) + ); + + public static final List commonAggregators = Arrays.asList( rowsCount, indexDoubleSum, diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java new file mode 100644 index 00000000000..7fd91bf74ff --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/AggregatorUtilTest.java @@ -0,0 +1,69 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.aggregation; + +import com.google.common.collect.Lists; +import io.druid.query.aggregation.post.ArithmeticPostAggregator; +import io.druid.query.aggregation.post.ConstantPostAggregator; +import io.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class AggregatorUtilTest +{ + + @Test + public void testPruneDependentPostAgg() + { + PostAggregator agg1 = new ArithmeticPostAggregator( + "abc", "+", Lists.newArrayList( + new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L) + ) + ); + PostAggregator dependency1 = new ArithmeticPostAggregator( + "dep1", "+", Lists.newArrayList( + new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L) + ) + ); + PostAggregator agg2 = new FieldAccessPostAggregator("def", "def"); + PostAggregator dependency2 = new FieldAccessPostAggregator("dep2", "dep2"); + PostAggregator aggregator = new ArithmeticPostAggregator( + "finalAgg", + "+", + Lists.newArrayList( + new FieldAccessPostAggregator("dep1", "dep1"), + new FieldAccessPostAggregator("dep2", "dep2") + ) + ); + List prunedAgg = AggregatorUtil.pruneDependentPostAgg( + Lists.newArrayList( + agg1, + dependency1, + agg2, + dependency2, + aggregator + ), aggregator.getName() + ); + Assert.assertEquals(Lists.newArrayList(dependency1, dependency2, aggregator), prunedAgg); + } + +} diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 291eab8171a..c1908c290bf 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -1176,4 +1176,75 @@ public class TopNQueryRunnerTest TestHelper.assertExpectedResults(expectedResults, runner.run(query)); } + + @Test + public void testTopNDependentPostAgg() { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(providerDimension) + .metric(QueryRunnerTestHelper.dependentPostAggMetric) + .threshold(4) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + QueryRunnerTestHelper.commonAggregators, + Lists.newArrayList( + new MaxAggregatorFactory("maxIndex", "index"), + new MinAggregatorFactory("minIndex", "index") + ) + ) + ) + ) + .postAggregators( + Arrays.asList( + QueryRunnerTestHelper.addRowsIndexConstant, + QueryRunnerTestHelper.dependentPostAgg + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put(providerDimension, "total_market") + .put("rows", 186L) + .put("index", 215679.82879638672D) + .put("addRowsIndexConstant", 215866.82879638672D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 215867.82879638672D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1743.9217529296875D) + .put("minIndex", 792.3260498046875D) + .build(), + ImmutableMap.builder() + .put(providerDimension, "upfront") + .put("rows", 186L) + .put("index", 192046.1060180664D) + .put("addRowsIndexConstant", 192233.1060180664D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 192234.1060180664D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_2) + .put("maxIndex", 1870.06103515625D) + .put("minIndex", 545.9906005859375D) + .build(), + ImmutableMap.builder() + .put(providerDimension, "spot") + .put("rows", 837L) + .put("index", 95606.57232284546D) + .put("addRowsIndexConstant", 96444.57232284546D) + .put(QueryRunnerTestHelper.dependentPostAggMetric, 96445.57232284546D) + .put("uniques", QueryRunnerTestHelper.UNIQUES_9) + .put("maxIndex", 277.2735290527344D) + .put("minIndex", 59.02102279663086D) + .build() + ) + ) + ) + ); + + TestHelper.assertExpectedResults(expectedResults, runner.run(query)); + } }