From 80224df36a23e3400c4f0a223df2de5a89c12fba Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 28 Aug 2018 10:59:32 -0700 Subject: [PATCH] SQL: Fix post-aggregator naming logic for sort-project. (#6250) The old code assumes that post-aggregator prefixes are one character long followed by numbers. This isn't always true (we may pad with underscores to avoid conflicts). Instead, the new code uses a different base prefix for sort-project postaggregators ("s" instead of "p") and uses the usual Calcites.findUnusedPrefix function to avoid conflicts. --- .../io/druid/sql/calcite/rel/DruidQuery.java | 20 +++--- .../druid/sql/calcite/CalciteQueryTest.java | 65 ++++++++++++++++++- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java index e6d5ba5e767..a65e2c1819c 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java @@ -89,7 +89,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.OptionalInt; import java.util.TreeSet; import java.util.stream.Collectors; @@ -298,7 +297,7 @@ public class DruidQuery plannerContext, aggregateRowSignature, aggregateProject, - 0 + "p" ); projectRowOrderAndPostAggregations.postAggregations.forEach( postAggregator -> aggregations.add(Aggregation.create(postAggregator)) @@ -337,17 +336,11 @@ public class DruidQuery if (sortProject == null) { return null; } else { - final List postAggregators = grouping.getPostAggregators(); - final OptionalInt maybeMaxCounter = postAggregators - .stream() - .mapToInt(postAggregator -> Integer.parseInt(postAggregator.getName().substring(1))) - .max(); - final ProjectRowOrderAndPostAggregations projectRowOrderAndPostAggregations = computePostAggregations( plannerContext, sortingInputRowSignature, sortProject, - maybeMaxCounter.orElse(-1) + 1 // 0 if max doesn't exist + "s" ); return new SortProject( @@ -374,12 +367,17 @@ public class DruidQuery PlannerContext plannerContext, RowSignature inputRowSignature, Project project, - int outputNameCounter + String basePrefix ) { final List rowOrder = new ArrayList<>(); final List aggregations = new ArrayList<>(); + final String outputNamePrefix = Calcites.findUnusedPrefix( + basePrefix, + new TreeSet<>(inputRowSignature.getRowOrder()) + ); + int outputNameCounter = 0; for (final RexNode postAggregatorRexNode : project.getChildExps()) { // Attempt to convert to PostAggregator. final DruidExpression postAggregatorExpression = Expressions.toDruidExpression( @@ -397,7 +395,7 @@ public class DruidQuery // (There might be a SQL-level type cast that we don't care about) rowOrder.add(postAggregatorExpression.getDirectColumn()); } else { - final String postAggregatorName = "p" + outputNameCounter++; + final String postAggregatorName = outputNamePrefix + outputNameCounter++; final PostAggregator postAggregator = new ExpressionPostAggregator( postAggregatorName, postAggregatorExpression.getExpression(), diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 6e4da269663..848e76d3ad5 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -4397,6 +4397,69 @@ public class CalciteQueryTest extends CalciteTestBase ); } + @Test + public void testMinMaxAvgDailyCountWithLimit() throws Exception + { + testQuery( + "SELECT * FROM (" + + " SELECT max(cnt), min(cnt), avg(cnt), TIME_EXTRACT(max(t), 'EPOCH') last_time, count(1) num_days FROM (\n" + + " SELECT TIME_FLOOR(__time, 'P1D') AS t, count(1) cnt\n" + + " FROM \"foo\"\n" + + " GROUP BY 1\n" + + " )" + + ") LIMIT 1\n", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns( + EXPRESSION_VIRTUAL_COLUMN( + "d0:v", + "timestamp_floor(\"__time\",'P1D',null,'UTC')", + ValueType.LONG + ) + ) + .setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG))) + .setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(AGGS( + new LongMaxAggregatorFactory("_a0", "a0"), + new LongMinAggregatorFactory("_a1", "a0"), + new LongSumAggregatorFactory("_a2:sum", "a0"), + new CountAggregatorFactory("_a2:count"), + new LongMaxAggregatorFactory("_a3", "d0"), + new CountAggregatorFactory("_a4") + )) + .setPostAggregatorSpecs( + ImmutableList.of( + new ArithmeticPostAggregator( + "_a2", + "quotient", + ImmutableList.of( + new FieldAccessPostAggregator(null, "_a2:sum"), + new FieldAccessPostAggregator(null, "_a2:count") + ) + ), + EXPRESSION_POST_AGG("s0", "timestamp_extract(\"_a3\",'EPOCH','UTC')") + ) + ) + .setLimit(1) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(new Object[]{1L, 1L, 1L, 978480000L, 6L}) + ); + } + @Test public void testAvgDailyCountDistinct() throws Exception { @@ -6997,7 +7060,7 @@ public class CalciteQueryTest extends CalciteTestBase AGGS(new CountAggregatorFactory("a0"), new DoubleSumAggregatorFactory("a1", "m2")) ) .setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG( - "p0", + "s0", "(\"a1\" / \"a0\")" ))) .setLimitSpec(