From c3a1ce693378cb7ad548b56ed4bfd73b3c71b7b9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 12 Sep 2017 14:37:27 -0700 Subject: [PATCH] SQL: Fix toTimeseriesQuery and toTopNQuery. (#4780) The former would sometimes eat limits, and the latter would sometimes use the wrong dimension comparator. --- .../io/druid/query/groupby/GroupByQuery.java | 4 +- .../groupby/orderby/DefaultLimitSpec.java | 13 +++-- .../druid/sql/calcite/planner/Calcites.java | 5 ++ .../sql/calcite/rel/DruidQueryBuilder.java | 47 ++++++++++++------- .../druid/sql/calcite/rel/DruidQueryRel.java | 2 +- .../druid/sql/calcite/rule/GroupByRules.java | 2 +- .../druid/sql/calcite/CalciteQueryTest.java | 36 ++++++++++++++ 7 files changed, 83 insertions(+), 26 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 73bc86f5ac3..31d3548f8c2 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -348,7 +348,7 @@ public class GroupByQuery extends BaseQuery throw new IAE("When forcing limit push down, a limit spec must be provided."); } - if (((DefaultLimitSpec) limitSpec).getLimit() == Integer.MAX_VALUE) { + if (!((DefaultLimitSpec) limitSpec).isLimited()) { throw new IAE("When forcing limit push down, the provided limit spec must have a limit."); } @@ -373,7 +373,7 @@ public class GroupByQuery extends BaseQuery DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) limitSpec; // If only applying an orderby without a limit, don't try to push down - if (defaultLimitSpec.getLimit() == Integer.MAX_VALUE) { + if (!defaultLimitSpec.isLimited()) { return false; } diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java index 60707f6f090..cc3d9e6014b 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/DefaultLimitSpec.java @@ -110,6 +110,11 @@ public class DefaultLimitSpec implements LimitSpec return limit; } + public boolean isLimited() + { + return limit < Integer.MAX_VALUE; + } + @Override public Function, Sequence> build( List dimensions, @@ -163,16 +168,16 @@ public class DefaultLimitSpec implements LimitSpec } if (!sortingNeeded) { - return limit == Integer.MAX_VALUE ? Functions.>identity() : new LimitingFn(limit); + return isLimited() ? new LimitingFn(limit) : Functions.identity(); } // Materialize the Comparator first for fast-fail error checking. final Ordering ordering = makeComparator(dimensions, aggs, postAggs); - if (limit == Integer.MAX_VALUE) { - return new SortingFn(ordering); - } else { + if (isLimited()) { return new TopNFunction(ordering, limit); + } else { + return new SortingFn(ordering); } } diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java index 779b7928176..83f6337ef16 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java @@ -133,6 +133,11 @@ public class Calcites public static StringComparator getStringComparatorForSqlTypeName(SqlTypeName sqlTypeName) { final ValueType valueType = getValueTypeForSqlTypeName(sqlTypeName); + return getStringComparatorForValueType(valueType); + } + + public static StringComparator getStringComparatorForValueType(ValueType valueType) + { if (ValueType.isNumeric(valueType)) { return StringComparators.NUMERIC; } else if (ValueType.STRING == valueType) { diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java index 212d0270ecc..c69326ef72d 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java @@ -38,7 +38,6 @@ import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.having.DimFilterHavingSpec; import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.query.groupby.orderby.OrderByColumnSpec; -import io.druid.query.ordering.StringComparators; import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQuery; import io.druid.query.timeseries.TimeseriesQuery; @@ -408,10 +407,12 @@ public class DruidQueryBuilder } final Granularity queryGranularity; + final boolean descending; final List dimensions = grouping.getDimensionSpecs(); if (dimensions.isEmpty()) { queryGranularity = Granularities.ALL; + descending = false; } else if (dimensions.size() == 1) { final DimensionSpec dimensionSpec = Iterables.getOnlyElement(dimensions); final Granularity gran = ExtractionFns.toQueryGranularity(dimensionSpec.getExtractionFn()); @@ -419,32 +420,42 @@ public class DruidQueryBuilder if (gran == null || !dimensionSpec.getDimension().equals(Column.TIME_COLUMN_NAME)) { // Timeseries only applies if the single dimension is granular __time. return null; + } else { + queryGranularity = gran; } - // Timeseries only applies if sort is null, or if the first sort field is the time dimension. - final boolean sortingOnTime = - limitSpec == null || limitSpec.getColumns().isEmpty() - || (limitSpec.getLimit() == Integer.MAX_VALUE - && limitSpec.getColumns().get(0).getDimension().equals(dimensionSpec.getOutputName())); + if (limitSpec != null) { + // If there is a limit spec, timeseries cannot LIMIT; and must be ORDER BY time (or nothing). - if (sortingOnTime) { - queryGranularity = gran; + if (limitSpec.isLimited()) { + return null; + } + + if (limitSpec.getColumns().isEmpty()) { + descending = false; + } else { + // We're ok if the first order by is time (since every time value is distinct, the rest of the columns + // wouldn't matter anyway). + final OrderByColumnSpec firstOrderBy = limitSpec.getColumns().get(0); + + if (firstOrderBy.getDimension().equals(dimensionSpec.getOutputName())) { + // Order by time. + descending = firstOrderBy.getDirection() == OrderByColumnSpec.Direction.DESCENDING; + } else { + // Order by something else. + return null; + } + } } else { - return null; + // No limitSpec. + descending = false; } } else { + // More than one dimension, timeseries cannot handle. return null; } final Filtration filtration = Filtration.create(filter).optimize(sourceRowSignature); - - final boolean descending; - if (limitSpec != null && !limitSpec.getColumns().isEmpty()) { - descending = limitSpec.getColumns().get(0).getDirection() == OrderByColumnSpec.Direction.DESCENDING; - } else { - descending = false; - } - final Map theContext = Maps.newHashMap(); theContext.put("skipEmptyBuckets", true); theContext.putAll(plannerContext.getQueryContext()); @@ -494,7 +505,7 @@ public class DruidQueryBuilder limitColumn = new OrderByColumnSpec( dimensionSpec.getOutputName(), OrderByColumnSpec.Direction.ASCENDING, - StringComparators.LEXICOGRAPHIC + Calcites.getStringComparatorForValueType(dimensionSpec.getOutputType()) ); } else { limitColumn = Iterables.getOnlyElement(limitSpec.getColumns()); diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java index 875ef579a60..95afcb76a47 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java @@ -220,7 +220,7 @@ public class DruidQueryRel extends DruidRel cost += COST_PER_COLUMN * queryBuilder.getGrouping().getPostAggregators().size(); } - if (queryBuilder.getLimitSpec() != null && queryBuilder.getLimitSpec().getLimit() < Integer.MAX_VALUE) { + if (queryBuilder.getLimitSpec() != null && queryBuilder.getLimitSpec().isLimited()) { cost *= COST_LIMIT_MULTIPLIER; } diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java index 5d5780a3a71..f71d5bba9f3 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java @@ -648,7 +648,7 @@ public class GroupByRules } } - if (!orderBys.isEmpty() || limitSpec.getLimit() < Integer.MAX_VALUE) { + if (!orderBys.isEmpty() || limitSpec.isLimited()) { return druidRel.withQueryBuilder( druidRel.getQueryBuilder() .withAdjustedGrouping( diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index fb765e2ecd0..e7046e066a4 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -5197,6 +5197,42 @@ public class CalciteQueryTest @Test public void testTimeseriesWithLimit() throws Exception + { + testQuery( + "SELECT gran, SUM(cnt)\n" + + "FROM (\n" + + " SELECT floor(__time TO month) AS gran, cnt\n" + + " FROM druid.foo\n" + + ") AS x\n" + + "GROUP BY gran\n" + + "LIMIT 1", + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .granularity(Granularities.ALL) + .dimension( + new ExtractionDimensionSpec( + "__time", + "d0", + ValueType.LONG, + new TimeFormatExtractionFn(null, null, null, Granularities.MONTH, true) + ) + ) + .aggregators(AGGS(new LongSumAggregatorFactory("a0", "cnt"))) + .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) + .threshold(1) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{T("2000-01-01"), 3L} + ) + ); + } + + @Test + public void testTimeseriesWithOrderByAndLimit() throws Exception { testQuery( "SELECT gran, SUM(cnt)\n"