From 0910d22f487ab2ac603708d51a16da0d8bf942dc Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 21 Aug 2020 14:11:54 -0700 Subject: [PATCH] Add SQL "OFFSET" clause. (#10279) * Add SQL "OFFSET" clause. Under the hood, this uses the new offset features from #10233 (Scan) and #10235 (GroupBy). Since Timeseries and TopN queries do not currently have an offset feature, SQL planning will switch from one of those to Scan or GroupBy if users add an OFFSET. Includes a refactoring to harmonize offset and limit planning using an OffsetLimit wrapper class. This is useful because it ensures that the various places that need to deal with offset and limit collapsing all behave the same way, using its "andThen" method. * Fix test and add another test. --- docs/querying/sql.md | 36 ++- .../druid/sql/calcite/planner/Calcites.java | 34 -- .../sql/calcite/planner/DruidPlanner.java | 56 ++-- .../sql/calcite/planner/OffsetLimit.java | 165 ++++++++++ .../druid/sql/calcite/rel/DruidQuery.java | 76 +++-- .../apache/druid/sql/calcite/rel/Sorting.java | 54 +++- .../sql/calcite/rule/SortCollapseRule.java | 16 +- .../druid/sql/calcite/CalciteQueryTest.java | 305 ++++++++++++++---- .../sql/calcite/planner/OffsetLimitTest.java | 69 ++++ 9 files changed, 610 insertions(+), 201 deletions(-) create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/planner/OffsetLimit.java create mode 100644 sql/src/test/java/org/apache/druid/sql/calcite/planner/OffsetLimitTest.java diff --git a/docs/querying/sql.md b/docs/querying/sql.md index f836efcb3ca..94faa892fd6 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -54,6 +54,7 @@ FROM { | () | [ INNER | LEFT ] JOIN ON condition } [ HAVING expr ] [ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ] [ LIMIT limit ] +[ OFFSET offset ] [ UNION ALL ] ``` @@ -118,11 +119,29 @@ can only order by the `__time` column. For aggregation queries, ORDER BY can ord ### LIMIT -The LIMIT clause can be used to limit the number of rows returned. It can be used with any query type. It is pushed down -to Data processes for queries that run with the native TopN query type, but not the native GroupBy query type. Future -versions of Druid will support pushing down limits using the native GroupBy query type as well. If you notice that -adding a limit doesn't change performance very much, then it's likely that Druid didn't push down the limit for your -query. +The LIMIT clause limits the number of rows returned. In some situations Druid will push down this limit to data servers, +which boosts performance. Limits are always pushed down for queries that run with the native Scan or TopN query types. +With the native GroupBy query type, it is pushed down when ordering on a column that you are grouping by. If you notice +that adding a limit doesn't change performance very much, then it's possible that Druid wasn't able to push down the +limit for your query. + +### OFFSET + +The OFFSET clause skips a certain number of rows when returning results. + +If both LIMIT and OFFSET are provided, then OFFSET will be applied first, followed by LIMIT. For example, using +LIMIT 100 OFFSET 10 will return 100 rows, starting from row number 10. + +Together, LIMIT and OFFSET can be used to implement pagination. However, note that if the underlying datasource is +modified between page fetches, then the different pages will not necessarily align with each other. + +There are two important factors that can affect the performance of queries that use OFFSET: + +- Skipped rows still need to be generated internally and then discarded, meaning that raising offsets to high values + can cause queries to use additional resources. +- OFFSET is only supported by the Scan and GroupBy [native query types](#query-types). Therefore, a query with OFFSET + will use one of those two types, even if it might otherwise have run as a Timeseries or TopN. Switching query engines + in this way can affect performance. ### UNION ALL @@ -715,14 +734,15 @@ approximate, regardless of configuration. Druid does not support all SQL features. In particular, the following features are not supported. -- JOIN between native datasources (table, lookup, subquery) and system tables. +- JOIN between native datasources (table, lookup, subquery) and [system tables](#metadata-tables). - JOIN conditions that are not an equality between expressions from the left- and right-hand sides. - JOIN conditions containing a constant value inside the condition. - JOIN conditions on a column which contains a multi-value dimension. - OVER clauses, and analytic functions such as `LAG` and `LEAD`. -- OFFSET clauses. +- ORDER BY for a non-aggregating query, except for `ORDER BY __time` or `ORDER BY __time DESC`, which are supported. + This restriction only applies to non-aggregating queries; you can ORDER BY any column in an aggregating query. - DDL and DML. -- Using Druid-specific functions like `TIME_PARSE` and `APPROX_QUANTILE_DS` on [metadata tables](#metadata-tables). +- Using Druid-specific functions like `TIME_PARSE` and `APPROX_QUANTILE_DS` on [system tables](#metadata-tables). Additionally, some Druid native query features are not supported by the SQL language. Some unsupported Druid features include: diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java index 9ba5214446f..bd738917b88 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.io.BaseEncoding; import com.google.common.primitives.Chars; -import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexLiteral; @@ -370,39 +369,6 @@ public class Calcites return StringUtils.format("%s:%s", prefix, suffix); } - public static int getInt(RexNode rex, int defaultValue) - { - return rex == null ? defaultValue : RexLiteral.intValue(rex); - } - - public static int getOffset(Sort sort) - { - return Calcites.getInt(sort.offset, 0); - } - - public static int getFetch(Sort sort) - { - return Calcites.getInt(sort.fetch, -1); - } - - public static int collapseFetch(int innerFetch, int outerFetch, int outerOffset) - { - final int fetch; - if (innerFetch < 0 && outerFetch < 0) { - // Neither has a limit => no limit overall. - fetch = -1; - } else if (innerFetch < 0) { - // Outer limit only. - fetch = outerFetch; - } else if (outerFetch < 0) { - // Inner limit only. - fetch = Math.max(0, innerFetch - outerOffset); - } else { - fetch = Math.max(0, Math.min(innerFetch - outerOffset, outerFetch)); - } - return fetch; - } - public static Class sqlTypeNameJdbcToJavaClass(SqlTypeName typeName) { // reference: https://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 795bacaa6f0..3e156545544 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -24,7 +24,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.primitives.Ints; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.CalciteConnectionConfig; @@ -52,7 +51,6 @@ import org.apache.calcite.sql.SqlExplain; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.type.BasicSqlType; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorUtil; @@ -270,7 +268,10 @@ public class DruidPlanner implements Closeable return planExplanation(bindableRel, explain, ImmutableSet.of()); } else { final BindableRel theRel = bindableRel; - final DataContext dataContext = plannerContext.createDataContext((JavaTypeFactory) planner.getTypeFactory(), plannerContext.getParameters()); + final DataContext dataContext = plannerContext.createDataContext( + (JavaTypeFactory) planner.getTypeFactory(), + plannerContext.getParameters() + ); final Supplier> resultsSupplier = () -> { final Enumerable enumerable = theRel.bind(dataContext); final Enumerator enumerator = enumerable.enumerator(); @@ -317,12 +318,11 @@ public class DruidPlanner implements Closeable * the web console, allowing it to apply a limit to queries without rewriting the original SQL. * * @param root root node + * * @return root node wrapped with a limiting logical sort if a limit is specified in the query context. */ @Nullable - private RelNode possiblyWrapRootWithOuterLimitFromContext( - RelRoot root - ) + private RelNode possiblyWrapRootWithOuterLimitFromContext(RelRoot root) { Object outerLimitObj = plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT); Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true); @@ -331,42 +331,30 @@ public class DruidPlanner implements Closeable } if (root.rel instanceof Sort) { - Sort innerSort = (Sort) root.rel; - final int offset = Calcites.getOffset(innerSort); - final int innerLimit = Calcites.getFetch(innerSort); - final int fetch = Calcites.collapseFetch( - innerLimit, - Ints.checkedCast(outerLimit), - 0 - ); + Sort sort = (Sort) root.rel; - if (fetch == innerLimit) { + final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort); + final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit)); + + if (newOffsetLimit.equals(originalOffsetLimit)) { // nothing to do, don't bother to make a new sort return root.rel; } return LogicalSort.create( - innerSort.getInput(), - innerSort.collation, - offset > 0 ? makeBigIntLiteral(offset) : null, - makeBigIntLiteral(fetch) + sort.getInput(), + sort.collation, + newOffsetLimit.getOffsetAsRexNode(rexBuilder), + newOffsetLimit.getLimitAsRexNode(rexBuilder) + ); + } else { + return LogicalSort.create( + root.rel, + root.collation, + null, + new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder) ); } - return LogicalSort.create( - root.rel, - root.collation, - null, - makeBigIntLiteral(outerLimit) - ); - } - - private RexNode makeBigIntLiteral(long value) - { - return rexBuilder.makeLiteral( - value, - new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT), - false - ); } private PlannerResult planExplanation( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/OffsetLimit.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/OffsetLimit.java new file mode 100644 index 00000000000..8d4a375d558 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/OffsetLimit.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.google.common.base.Preconditions; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.SqlTypeName; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Represents an offset and a limit. + */ +public class OffsetLimit +{ + private final long offset; + + @Nullable + private final Long limit; + + OffsetLimit(long offset, @Nullable Long limit) + { + Preconditions.checkArgument(offset >= 0, "offset >= 0"); + Preconditions.checkArgument(limit == null || limit >= 0, "limit == null || limit >= 0"); + + this.offset = offset; + this.limit = limit; + } + + public static OffsetLimit none() + { + return new OffsetLimit(0, null); + } + + public static OffsetLimit fromSort(Sort sort) + { + final long offset = sort.offset == null ? 0 : (long) RexLiteral.intValue(sort.offset); + final Long limit = sort.fetch == null ? null : (long) RexLiteral.intValue(sort.fetch); + return new OffsetLimit(offset, limit); + } + + public boolean hasOffset() + { + return offset > 0; + } + + public long getOffset() + { + return offset; + } + + public boolean hasLimit() + { + return limit != null; + } + + public long getLimit() + { + Preconditions.checkState(limit != null, "limit is not present"); + return limit; + } + + @Nullable + public RexNode getOffsetAsRexNode(final RexBuilder builder) + { + if (offset == 0) { + return null; + } else { + return builder.makeLiteral( + offset, + new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT), + false + ); + } + } + + @Nullable + public RexNode getLimitAsRexNode(final RexBuilder builder) + { + if (limit == null) { + return null; + } else { + return builder.makeLiteral( + limit, + new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT), + false + ); + } + } + + /** + * Return a new instance that represents applying this one first, then the {@param next} one. + */ + public OffsetLimit andThen(final OffsetLimit next) + { + final Long newLimit; + + if (limit == null && next.limit == null) { + // Neither has a limit => no limit overall. + newLimit = null; + } else if (limit == null) { + // Outer limit only. + newLimit = next.limit; + } else if (next.limit == null) { + // Inner limit only. + newLimit = Math.max(0, limit - next.offset); + } else { + // Both outer and inner limit. + newLimit = Math.max(0, Math.min(limit - next.offset, next.limit)); + } + + return new OffsetLimit(offset + next.offset, newLimit); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OffsetLimit that = (OffsetLimit) o; + return Objects.equals(offset, that.offset) && + Objects.equals(limit, that.limit); + } + + @Override + public int hashCode() + { + return Objects.hash(offset, limit); + } + + @Override + public String toString() + { + return "OffsetLimit{" + + "offset=" + offset + + ", limit=" + limit + + '}'; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 45c29d1787a..d683cecc0d0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -37,7 +37,6 @@ import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlTypeName; @@ -53,8 +52,6 @@ import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.having.DimFilterHavingSpec; -import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; -import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; @@ -76,6 +73,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.Expressions; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.OffsetLimit; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rule.GroupByRules; import org.apache.druid.sql.calcite.table.RowSignatures; @@ -88,6 +86,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -525,16 +524,11 @@ public class DruidQuery final Sort sort = Preconditions.checkNotNull(partialQuery.getSort(), "sort"); final Project sortProject = partialQuery.getSortProject(); - // Extract limit. - final Long limit = sort.fetch != null ? ((Number) RexLiteral.value(sort.fetch)).longValue() : null; - final List orderBys = new ArrayList<>(sort.getChildExps().size()); - - if (sort.offset != null) { - // Druid cannot currently handle LIMIT with OFFSET. - throw new CannotBuildQueryException(sort); - } + // Extract limit and offset. + final OffsetLimit offsetLimit = OffsetLimit.fromSort(sort); // Extract orderBy column specs. + final List orderBys = new ArrayList<>(sort.getChildExps().size()); for (int sortKey = 0; sortKey < sort.getChildExps().size(); sortKey++) { final RexNode sortExpression = sort.getChildExps().get(sortKey); final RelFieldCollation collation = sort.getCollation().getFieldCollations().get(sortKey); @@ -583,7 +577,7 @@ public class DruidQuery projection = Projection.postAggregation(sortProject, plannerContext, rowSignature, "s"); } - return Sorting.create(orderBys, limit, projection); + return Sorting.create(orderBys, offsetLimit, projection); } /** @@ -762,9 +756,20 @@ public class DruidQuery Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec().getOutputName() ); if (sorting != null) { - // If there is sorting, set timeseriesLimit to given value if less than Integer.Max_VALUE - if (sorting.isLimited()) { - timeseriesLimit = Ints.checkedCast(sorting.getLimit()); + if (sorting.getOffsetLimit().hasOffset()) { + // Timeseries cannot handle offsets. + return null; + } + + if (sorting.getOffsetLimit().hasLimit()) { + final long limit = sorting.getOffsetLimit().getLimit(); + + if (limit == 0) { + // Can't handle zero limit (the Timeseries query engine would treat it as unlimited). + return null; + } + + timeseriesLimit = Ints.checkedCast(limit); } switch (sorting.getSortKind(dimensionExpression.getOutputName())) { @@ -817,14 +822,18 @@ public class DruidQuery @Nullable public TopNQuery toTopNQuery() { - // Must have GROUP BY one column, no GROUPING SETS, ORDER BY ≤ 1 column, limit less than maxTopNLimit, no HAVING. + // Must have GROUP BY one column, no GROUPING SETS, ORDER BY ≤ 1 column, LIMIT > 0 and ≤ maxTopNLimit, + // no OFFSET, no HAVING. final boolean topNOk = grouping != null && grouping.getDimensions().size() == 1 && !grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs()) && sorting != null && (sorting.getOrderBys().size() <= 1 - && sorting.isLimited() && sorting.getLimit() <= plannerContext.getPlannerConfig() - .getMaxTopNLimit()) + && sorting.getOffsetLimit().hasLimit() + && sorting.getOffsetLimit().getLimit() > 0 + && sorting.getOffsetLimit().getLimit() <= plannerContext.getPlannerConfig() + .getMaxTopNLimit() + && !sorting.getOffsetLimit().hasOffset()) && grouping.getHavingFilter() == null; if (!topNOk) { @@ -875,7 +884,7 @@ public class DruidQuery getVirtualColumns(true), dimensionSpec, topNMetricSpec, - Ints.checkedCast(sorting.getLimit()), + Ints.checkedCast(sorting.getOffsetLimit().getLimit()), filtration.getQuerySegmentSpec(), filtration.getDimFilter(), Granularities.ALL, @@ -897,6 +906,11 @@ public class DruidQuery return null; } + if (sorting != null && sorting.getOffsetLimit().hasLimit() && sorting.getOffsetLimit().getLimit() <= 0) { + // Cannot handle zero or negative limits. + return null; + } + final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature()); final DimFilterHavingSpec havingSpec; @@ -925,13 +939,7 @@ public class DruidQuery grouping.getAggregatorFactories(), postAggregators, havingSpec, - sorting != null - ? new DefaultLimitSpec( - sorting.getOrderBys(), - 0, - sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null - ) - : NoopLimitSpec.instance(), + Optional.ofNullable(sorting).orElse(Sorting.none()).limitSpec(), grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()), ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) ); @@ -957,11 +965,21 @@ public class DruidQuery final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature()); final ScanQuery.Order order; + long scanOffset = 0L; long scanLimit = 0L; if (sorting != null) { - if (sorting.isLimited()) { - scanLimit = sorting.getLimit(); + scanOffset = sorting.getOffsetLimit().getOffset(); + + if (sorting.getOffsetLimit().hasLimit()) { + final long limit = sorting.getOffsetLimit().getLimit(); + + if (limit == 0) { + // Can't handle zero limit (the Scan query engine would treat it as unlimited). + return null; + } + + scanLimit = limit; } final Sorting.SortKind sortKind = sorting.getSortKind(ColumnHolder.TIME_COLUMN_NAME); @@ -995,7 +1013,7 @@ public class DruidQuery getVirtualColumns(true), ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST, 0, - 0, + scanOffset, scanLimit, order, filtration.getDimFilter(), diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java index 4b939e66136..0f18102be2f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java @@ -21,9 +21,16 @@ package org.apache.druid.sql.calcite.rel; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +import com.google.common.primitives.Ints; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.LimitSpec; +import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.sql.calcite.planner.OffsetLimit; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -50,27 +57,31 @@ public class Sorting @Nullable private final Projection projection; - @Nullable - private final Long limit; + private final OffsetLimit offsetLimit; private Sorting( final List orderBys, - @Nullable final Long limit, + final OffsetLimit offsetLimit, @Nullable final Projection projection ) { this.orderBys = Preconditions.checkNotNull(orderBys, "orderBys"); - this.limit = limit; + this.offsetLimit = offsetLimit; this.projection = projection; } public static Sorting create( final List orderBys, - @Nullable final Long limit, + final OffsetLimit offsetLimit, @Nullable final Projection projection ) { - return new Sorting(orderBys, limit, projection); + return new Sorting(orderBys, offsetLimit, projection); + } + + public static Sorting none() + { + return new Sorting(Collections.emptyList(), OffsetLimit.none(), null); } public SortKind getSortKind(final String timeColumn) @@ -102,15 +113,30 @@ public class Sorting return projection; } - public boolean isLimited() + public OffsetLimit getOffsetLimit() { - return limit != null; + return offsetLimit; } - @Nullable - public Long getLimit() + /** + * Returns a LimitSpec that encapsulates the orderBys, offset, and limit of this Sorting instance. Does not + * encapsulate the projection at all; you must still call {@link #getProjection()} for that. + */ + public LimitSpec limitSpec() { - return limit; + if (orderBys.isEmpty() && !offsetLimit.hasOffset() && !offsetLimit.hasLimit()) { + return NoopLimitSpec.instance(); + } else { + final Integer offsetAsInteger = offsetLimit.hasOffset() ? Ints.checkedCast(offsetLimit.getOffset()) : null; + final Integer limitAsInteger = offsetLimit.hasLimit() ? Ints.checkedCast(offsetLimit.getLimit()) : null; + + if (limitAsInteger != null && limitAsInteger == 0) { + // Zero limit would be rejected by DefaultLimitSpec. + throw new ISE("Cannot create LimitSpec with zero limit"); + } + + return new DefaultLimitSpec(orderBys, offsetAsInteger, limitAsInteger); + } } @Override @@ -125,13 +151,13 @@ public class Sorting Sorting sorting = (Sorting) o; return Objects.equals(orderBys, sorting.orderBys) && Objects.equals(projection, sorting.projection) && - Objects.equals(limit, sorting.limit); + Objects.equals(offsetLimit, sorting.offsetLimit); } @Override public int hashCode() { - return Objects.hash(orderBys, projection, limit); + return Objects.hash(orderBys, projection, offsetLimit); } @Override @@ -140,7 +166,7 @@ public class Sorting return "Sorting{" + "orderBys=" + orderBys + ", projection=" + projection + - ", limit=" + limit + + ", offsetLimit=" + offsetLimit + '}'; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java index bad8bdef7ed..ad621c5b467 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java @@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.rule; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.core.Sort; -import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.OffsetLimit; /** * Collapses two adjacent Sort operations together. Useful for queries like @@ -50,21 +50,13 @@ public class SortCollapseRule extends RelOptRule if (outerSort.collation.getFieldCollations().isEmpty() || outerSort.collation.getFieldCollations().equals(innerSort.collation.getFieldCollations())) { - final int innerOffset = Calcites.getOffset(innerSort); - final int innerFetch = Calcites.getFetch(innerSort); - final int outerOffset = Calcites.getOffset(outerSort); - final int outerFetch = Calcites.getFetch(outerSort); - - // Add up the offsets. - final int offset = innerOffset + outerOffset; - final int fetch = Calcites.collapseFetch(innerFetch, outerFetch, outerOffset); - + final OffsetLimit offsetLimit = OffsetLimit.fromSort(innerSort).andThen(OffsetLimit.fromSort(outerSort)); final Sort combined = innerSort.copy( innerSort.getTraitSet(), innerSort.getInput(), innerSort.getCollation(), - offset == 0 ? null : call.builder().literal(offset), - fetch < 0 ? null : call.builder().literal(fetch) + offsetLimit.getOffsetAsRexNode(call.builder().getRexBuilder()), + offsetLimit.getLimitAsRexNode(call.builder().getRexBuilder()) ); call.transformTo(combined); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 47d7777674a..05ea116bfed 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1090,6 +1090,32 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testSelectStarWithLimitAndOffset() throws Exception + { + testQuery( + PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE, + QUERY_CONTEXT_DEFAULT, + "SELECT * FROM druid.foo LIMIT 2 OFFSET 1", + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .offset(1) + .limit(2) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING}, + new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING} + ) + ); + } + @Test public void testSelectWithProjection() throws Exception { @@ -1290,6 +1316,80 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testSelectLimitWrappingOnTopOfOffset() throws Exception + { + testQuery( + "SELECT dim1 FROM druid.foo ORDER BY __time DESC OFFSET 1", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "dim1")) + .offset(1) + .limit(2) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of( + new Object[]{"def"}, + new Object[]{"1"} + ) + ); + } + + @Test + public void testSelectLimitWrappingOnTopOfOffsetAndLowLimit() throws Exception + { + testQuery( + "SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 1 OFFSET 1", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "dim1")) + .offset(1) + .limit(1) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of( + new Object[]{"def"} + ) + ); + } + + @Test + public void testSelectLimitWrappingOnTopOfOffsetAndHighLimit() throws Exception + { + testQuery( + "SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 10 OFFSET 1", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "dim1")) + .offset(1) + .limit(2) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of( + new Object[]{"def"}, + new Object[]{"1"} + ) + ); + } + @Test public void testSelectLimitWrappingAgainAkaIDontReallyQuiteUnderstandCalciteQueryPlanning() throws Exception { @@ -1527,11 +1627,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest new DefaultDimensionSpec("dim2", "d1", ValueType.STRING) ) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( - new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.LEXICOGRAPHIC)), - 2 - ) + DefaultLimitSpec + .builder() + .orderBy(new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.LEXICOGRAPHIC)) + .limit(2) + .build() ) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setContext(OUTER_LIMIT_CONTEXT) @@ -1589,12 +1689,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest new DefaultDimensionSpec("dim2", "d1", ValueType.STRING) ) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( - new OrderByColumnSpec("a0", Direction.DESCENDING, StringComparators.NUMERIC) - ), - 2 - ) + DefaultLimitSpec + .builder() + .orderBy(new OrderByColumnSpec("a0", Direction.DESCENDING, StringComparators.NUMERIC)) + .limit(2) + .build() ) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setContext(OUTER_LIMIT_CONTEXT) @@ -1701,16 +1800,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0"))) .setGranularity(Granularities.ALL) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "d0", OrderByColumnSpec.Direction.DESCENDING, StringComparators.LEXICOGRAPHIC ) - ), - Integer.MAX_VALUE - ) + ) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -2868,16 +2967,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDimensions(dimensions(new DefaultDimensionSpec("cnt", "d0", ValueType.LONG))) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "d0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC ) - ), - Integer.MAX_VALUE - ) + ) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -2926,16 +3025,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDimensions(dimensions(new DefaultDimensionSpec("cnt", "d0", ValueType.LONG))) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "d0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC ) - ), - Integer.MAX_VALUE - ) + ) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -3386,21 +3485,23 @@ public class CalciteQueryTest extends BaseCalciteQueryTest expressionPostAgg("p0", "substring(\"d0\", 1, -1)"), expressionPostAgg("p1", "strlen(\"d0\")") )) - .setLimitSpec(new DefaultLimitSpec( - ImmutableList.of( - new OrderByColumnSpec( - "p1", - OrderByColumnSpec.Direction.DESCENDING, - StringComparators.NUMERIC - ), - new OrderByColumnSpec( - "d0", - OrderByColumnSpec.Direction.ASCENDING, - StringComparators.LEXICOGRAPHIC + .setLimitSpec( + DefaultLimitSpec + .builder() + .orderBy( + new OrderByColumnSpec( + "p1", + OrderByColumnSpec.Direction.DESCENDING, + StringComparators.NUMERIC + ), + new OrderByColumnSpec( + "d0", + OrderByColumnSpec.Direction.ASCENDING, + StringComparators.LEXICOGRAPHIC + ) ) - ), - Integer.MAX_VALUE - )) + .build() + ) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), @@ -4194,9 +4295,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest // SELECT query with order by non-__time. "SELECT dim1 FROM druid.foo ORDER BY dim1", - // DISTINCT with OFFSET. - "SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5", - // JOIN condition with not-equals (<>). "SELECT foo.dim1, foo.dim2, l.k, l.v\n" + "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k", @@ -5049,16 +5147,17 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) .setPostAggregatorSpecs(ImmutableList.of(expressionPostAgg("p0", "(\"a0\" + \"a1\")"))) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "p0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC ) - ), - 3 - ) + ) + .limit(3) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -5097,16 +5196,17 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) ) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "p0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC ) - ), - 3 - ) + ) + .limit(3) + .build() ) .setContext(QUERY_CONTEXT_NO_TOPN) .build() @@ -5360,16 +5460,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ValueType.FLOAT))) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "d0", OrderByColumnSpec.Direction.DESCENDING, StringComparators.NUMERIC ) - ), - Integer.MAX_VALUE - ) + ) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -6403,18 +6503,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testSelectDistinctWithSortAsOuterQuery3() throws Exception - { - // Query reduces to LIMIT 0. - - testQuery( - "SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2", - ImmutableList.of(), - ImmutableList.of() - ); - } - - @Test - public void testSelectDistinctWithSortAsOuterQuery4() throws Exception { testQuery( "SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 DESC LIMIT 5) LIMIT 10", @@ -6444,6 +6532,42 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testSelectNonAggregatingWithLimitLiterallyZero() throws Exception + { + // Query reduces to LIMIT 0. + + testQuery( + "SELECT dim2 FROM druid.foo ORDER BY dim2 LIMIT 0", + ImmutableList.of(), + ImmutableList.of() + ); + } + + @Test + public void testSelectNonAggregatingWithLimitReducedToZero() throws Exception + { + // Query reduces to LIMIT 0. + + testQuery( + "SELECT * FROM (SELECT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2", + ImmutableList.of(), + ImmutableList.of() + ); + } + + @Test + public void testSelectAggregatingWithLimitReducedToZero() throws Exception + { + // Query reduces to LIMIT 0. + + testQuery( + "SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2", + ImmutableList.of(), + ImmutableList.of() + ); + } + @Test public void testCountDistinct() throws Exception { @@ -11293,6 +11417,47 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testTimeseriesWithLimitAndOffset() throws Exception + { + // Cannot vectorize due to expressions. + cannotVectorize(); + + // Timeseries cannot handle offsets, so the query morphs into a groupBy. + + testQuery( + "SELECT gran, SUM(cnt)\n" + + "FROM (\n" + + " SELECT floor(__time TO month) AS gran, cnt\n" + + " FROM druid.foo\n" + + ") AS x\n" + + "GROUP BY gran\n" + + "LIMIT 2\n" + + "OFFSET 1", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns( + expressionVirtualColumn( + "v0", + "timestamp_floor(\"__time\",'P1M',null,'UTC')", + ValueType.LONG + ) + ) + .setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ValueType.LONG))) + .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) + .setLimitSpec(DefaultLimitSpec.builder().offset(1).limit(2).build()) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{timestamp("2001-01-01"), 3L} + ) + ); + } + @Test public void testTimeseriesWithOrderByAndLimit() throws Exception { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/OffsetLimitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/OffsetLimitTest.java new file mode 100644 index 00000000000..c54e46a14dd --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/OffsetLimitTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class OffsetLimitTest +{ + @Test + public void testAndThen() + { + final List things = ImmutableList.of("a", "b", "c", "d", "e", "f", "g", "h"); + + for (int innerOffset = 0; innerOffset < 5; innerOffset++) { + for (int innerLimit = -1; innerLimit < 5; innerLimit++) { + for (int outerOffset = 0; outerOffset < 5; outerOffset++) { + for (int outerLimit = -1; outerLimit < 5; outerLimit++) { + final OffsetLimit inner = new OffsetLimit(innerOffset, innerLimit < 0 ? null : (long) innerLimit); + final OffsetLimit outer = new OffsetLimit(outerOffset, outerLimit < 0 ? null : (long) outerLimit); + final OffsetLimit combined = inner.andThen(outer); + + Assert.assertEquals( + StringUtils.format( + "innerOffset[%s], innerLimit[%s], outerOffset[%s], outerLimit[%s]", + innerOffset, + innerLimit, + outerOffset, + outerLimit + ), + things.stream() + .skip(innerOffset) + .limit(innerLimit < 0 ? Long.MAX_VALUE : innerLimit) + .skip(outerOffset) + .limit(outerLimit < 0 ? Long.MAX_VALUE : outerLimit) + .collect(Collectors.toList()), + things.stream() + .skip(combined.getOffset()) + .limit(combined.hasLimit() ? combined.getLimit() : Long.MAX_VALUE) + .collect(Collectors.toList()) + ); + } + } + } + } + } +}