diff --git a/docs/querying/sql.md b/docs/querying/sql.md index f836efcb3ca..94faa892fd6 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -54,6 +54,7 @@ FROM { | () | [ INNER | LEFT ] JOIN ON condition } [ HAVING expr ] [ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ] [ LIMIT limit ] +[ OFFSET offset ] [ UNION ALL ] ``` @@ -118,11 +119,29 @@ can only order by the `__time` column. For aggregation queries, ORDER BY can ord ### LIMIT -The LIMIT clause can be used to limit the number of rows returned. It can be used with any query type. It is pushed down -to Data processes for queries that run with the native TopN query type, but not the native GroupBy query type. Future -versions of Druid will support pushing down limits using the native GroupBy query type as well. If you notice that -adding a limit doesn't change performance very much, then it's likely that Druid didn't push down the limit for your -query. +The LIMIT clause limits the number of rows returned. In some situations Druid will push down this limit to data servers, +which boosts performance. Limits are always pushed down for queries that run with the native Scan or TopN query types. +With the native GroupBy query type, it is pushed down when ordering on a column that you are grouping by. If you notice +that adding a limit doesn't change performance very much, then it's possible that Druid wasn't able to push down the +limit for your query. + +### OFFSET + +The OFFSET clause skips a certain number of rows when returning results. + +If both LIMIT and OFFSET are provided, then OFFSET will be applied first, followed by LIMIT. For example, using +LIMIT 100 OFFSET 10 will return 100 rows, starting from row number 10. + +Together, LIMIT and OFFSET can be used to implement pagination. However, note that if the underlying datasource is +modified between page fetches, then the different pages will not necessarily align with each other. + +There are two important factors that can affect the performance of queries that use OFFSET: + +- Skipped rows still need to be generated internally and then discarded, meaning that raising offsets to high values + can cause queries to use additional resources. +- OFFSET is only supported by the Scan and GroupBy [native query types](#query-types). Therefore, a query with OFFSET + will use one of those two types, even if it might otherwise have run as a Timeseries or TopN. Switching query engines + in this way can affect performance. ### UNION ALL @@ -715,14 +734,15 @@ approximate, regardless of configuration. Druid does not support all SQL features. In particular, the following features are not supported. -- JOIN between native datasources (table, lookup, subquery) and system tables. +- JOIN between native datasources (table, lookup, subquery) and [system tables](#metadata-tables). - JOIN conditions that are not an equality between expressions from the left- and right-hand sides. - JOIN conditions containing a constant value inside the condition. - JOIN conditions on a column which contains a multi-value dimension. - OVER clauses, and analytic functions such as `LAG` and `LEAD`. -- OFFSET clauses. +- ORDER BY for a non-aggregating query, except for `ORDER BY __time` or `ORDER BY __time DESC`, which are supported. + This restriction only applies to non-aggregating queries; you can ORDER BY any column in an aggregating query. - DDL and DML. -- Using Druid-specific functions like `TIME_PARSE` and `APPROX_QUANTILE_DS` on [metadata tables](#metadata-tables). +- Using Druid-specific functions like `TIME_PARSE` and `APPROX_QUANTILE_DS` on [system tables](#metadata-tables). Additionally, some Druid native query features are not supported by the SQL language. Some unsupported Druid features include: diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java index 9ba5214446f..bd738917b88 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.io.BaseEncoding; import com.google.common.primitives.Chars; -import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexLiteral; @@ -370,39 +369,6 @@ public class Calcites return StringUtils.format("%s:%s", prefix, suffix); } - public static int getInt(RexNode rex, int defaultValue) - { - return rex == null ? defaultValue : RexLiteral.intValue(rex); - } - - public static int getOffset(Sort sort) - { - return Calcites.getInt(sort.offset, 0); - } - - public static int getFetch(Sort sort) - { - return Calcites.getInt(sort.fetch, -1); - } - - public static int collapseFetch(int innerFetch, int outerFetch, int outerOffset) - { - final int fetch; - if (innerFetch < 0 && outerFetch < 0) { - // Neither has a limit => no limit overall. - fetch = -1; - } else if (innerFetch < 0) { - // Outer limit only. - fetch = outerFetch; - } else if (outerFetch < 0) { - // Inner limit only. - fetch = Math.max(0, innerFetch - outerOffset); - } else { - fetch = Math.max(0, Math.min(innerFetch - outerOffset, outerFetch)); - } - return fetch; - } - public static Class sqlTypeNameJdbcToJavaClass(SqlTypeName typeName) { // reference: https://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 795bacaa6f0..3e156545544 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -24,7 +24,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.primitives.Ints; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.CalciteConnectionConfig; @@ -52,7 +51,6 @@ import org.apache.calcite.sql.SqlExplain; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.type.BasicSqlType; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorUtil; @@ -270,7 +268,10 @@ public class DruidPlanner implements Closeable return planExplanation(bindableRel, explain, ImmutableSet.of()); } else { final BindableRel theRel = bindableRel; - final DataContext dataContext = plannerContext.createDataContext((JavaTypeFactory) planner.getTypeFactory(), plannerContext.getParameters()); + final DataContext dataContext = plannerContext.createDataContext( + (JavaTypeFactory) planner.getTypeFactory(), + plannerContext.getParameters() + ); final Supplier> resultsSupplier = () -> { final Enumerable enumerable = theRel.bind(dataContext); final Enumerator enumerator = enumerable.enumerator(); @@ -317,12 +318,11 @@ public class DruidPlanner implements Closeable * the web console, allowing it to apply a limit to queries without rewriting the original SQL. * * @param root root node + * * @return root node wrapped with a limiting logical sort if a limit is specified in the query context. */ @Nullable - private RelNode possiblyWrapRootWithOuterLimitFromContext( - RelRoot root - ) + private RelNode possiblyWrapRootWithOuterLimitFromContext(RelRoot root) { Object outerLimitObj = plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT); Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true); @@ -331,42 +331,30 @@ public class DruidPlanner implements Closeable } if (root.rel instanceof Sort) { - Sort innerSort = (Sort) root.rel; - final int offset = Calcites.getOffset(innerSort); - final int innerLimit = Calcites.getFetch(innerSort); - final int fetch = Calcites.collapseFetch( - innerLimit, - Ints.checkedCast(outerLimit), - 0 - ); + Sort sort = (Sort) root.rel; - if (fetch == innerLimit) { + final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort); + final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit)); + + if (newOffsetLimit.equals(originalOffsetLimit)) { // nothing to do, don't bother to make a new sort return root.rel; } return LogicalSort.create( - innerSort.getInput(), - innerSort.collation, - offset > 0 ? makeBigIntLiteral(offset) : null, - makeBigIntLiteral(fetch) + sort.getInput(), + sort.collation, + newOffsetLimit.getOffsetAsRexNode(rexBuilder), + newOffsetLimit.getLimitAsRexNode(rexBuilder) + ); + } else { + return LogicalSort.create( + root.rel, + root.collation, + null, + new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder) ); } - return LogicalSort.create( - root.rel, - root.collation, - null, - makeBigIntLiteral(outerLimit) - ); - } - - private RexNode makeBigIntLiteral(long value) - { - return rexBuilder.makeLiteral( - value, - new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT), - false - ); } private PlannerResult planExplanation( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/OffsetLimit.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/OffsetLimit.java new file mode 100644 index 00000000000..8d4a375d558 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/OffsetLimit.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.google.common.base.Preconditions; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.SqlTypeName; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Represents an offset and a limit. + */ +public class OffsetLimit +{ + private final long offset; + + @Nullable + private final Long limit; + + OffsetLimit(long offset, @Nullable Long limit) + { + Preconditions.checkArgument(offset >= 0, "offset >= 0"); + Preconditions.checkArgument(limit == null || limit >= 0, "limit == null || limit >= 0"); + + this.offset = offset; + this.limit = limit; + } + + public static OffsetLimit none() + { + return new OffsetLimit(0, null); + } + + public static OffsetLimit fromSort(Sort sort) + { + final long offset = sort.offset == null ? 0 : (long) RexLiteral.intValue(sort.offset); + final Long limit = sort.fetch == null ? null : (long) RexLiteral.intValue(sort.fetch); + return new OffsetLimit(offset, limit); + } + + public boolean hasOffset() + { + return offset > 0; + } + + public long getOffset() + { + return offset; + } + + public boolean hasLimit() + { + return limit != null; + } + + public long getLimit() + { + Preconditions.checkState(limit != null, "limit is not present"); + return limit; + } + + @Nullable + public RexNode getOffsetAsRexNode(final RexBuilder builder) + { + if (offset == 0) { + return null; + } else { + return builder.makeLiteral( + offset, + new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT), + false + ); + } + } + + @Nullable + public RexNode getLimitAsRexNode(final RexBuilder builder) + { + if (limit == null) { + return null; + } else { + return builder.makeLiteral( + limit, + new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT), + false + ); + } + } + + /** + * Return a new instance that represents applying this one first, then the {@param next} one. + */ + public OffsetLimit andThen(final OffsetLimit next) + { + final Long newLimit; + + if (limit == null && next.limit == null) { + // Neither has a limit => no limit overall. + newLimit = null; + } else if (limit == null) { + // Outer limit only. + newLimit = next.limit; + } else if (next.limit == null) { + // Inner limit only. + newLimit = Math.max(0, limit - next.offset); + } else { + // Both outer and inner limit. + newLimit = Math.max(0, Math.min(limit - next.offset, next.limit)); + } + + return new OffsetLimit(offset + next.offset, newLimit); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OffsetLimit that = (OffsetLimit) o; + return Objects.equals(offset, that.offset) && + Objects.equals(limit, that.limit); + } + + @Override + public int hashCode() + { + return Objects.hash(offset, limit); + } + + @Override + public String toString() + { + return "OffsetLimit{" + + "offset=" + offset + + ", limit=" + limit + + '}'; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 45c29d1787a..d683cecc0d0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -37,7 +37,6 @@ import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlTypeName; @@ -53,8 +52,6 @@ import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.having.DimFilterHavingSpec; -import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; -import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; @@ -76,6 +73,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.Expressions; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.OffsetLimit; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rule.GroupByRules; import org.apache.druid.sql.calcite.table.RowSignatures; @@ -88,6 +86,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -525,16 +524,11 @@ public class DruidQuery final Sort sort = Preconditions.checkNotNull(partialQuery.getSort(), "sort"); final Project sortProject = partialQuery.getSortProject(); - // Extract limit. - final Long limit = sort.fetch != null ? ((Number) RexLiteral.value(sort.fetch)).longValue() : null; - final List orderBys = new ArrayList<>(sort.getChildExps().size()); - - if (sort.offset != null) { - // Druid cannot currently handle LIMIT with OFFSET. - throw new CannotBuildQueryException(sort); - } + // Extract limit and offset. + final OffsetLimit offsetLimit = OffsetLimit.fromSort(sort); // Extract orderBy column specs. + final List orderBys = new ArrayList<>(sort.getChildExps().size()); for (int sortKey = 0; sortKey < sort.getChildExps().size(); sortKey++) { final RexNode sortExpression = sort.getChildExps().get(sortKey); final RelFieldCollation collation = sort.getCollation().getFieldCollations().get(sortKey); @@ -583,7 +577,7 @@ public class DruidQuery projection = Projection.postAggregation(sortProject, plannerContext, rowSignature, "s"); } - return Sorting.create(orderBys, limit, projection); + return Sorting.create(orderBys, offsetLimit, projection); } /** @@ -762,9 +756,20 @@ public class DruidQuery Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec().getOutputName() ); if (sorting != null) { - // If there is sorting, set timeseriesLimit to given value if less than Integer.Max_VALUE - if (sorting.isLimited()) { - timeseriesLimit = Ints.checkedCast(sorting.getLimit()); + if (sorting.getOffsetLimit().hasOffset()) { + // Timeseries cannot handle offsets. + return null; + } + + if (sorting.getOffsetLimit().hasLimit()) { + final long limit = sorting.getOffsetLimit().getLimit(); + + if (limit == 0) { + // Can't handle zero limit (the Timeseries query engine would treat it as unlimited). + return null; + } + + timeseriesLimit = Ints.checkedCast(limit); } switch (sorting.getSortKind(dimensionExpression.getOutputName())) { @@ -817,14 +822,18 @@ public class DruidQuery @Nullable public TopNQuery toTopNQuery() { - // Must have GROUP BY one column, no GROUPING SETS, ORDER BY ≤ 1 column, limit less than maxTopNLimit, no HAVING. + // Must have GROUP BY one column, no GROUPING SETS, ORDER BY ≤ 1 column, LIMIT > 0 and ≤ maxTopNLimit, + // no OFFSET, no HAVING. final boolean topNOk = grouping != null && grouping.getDimensions().size() == 1 && !grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs()) && sorting != null && (sorting.getOrderBys().size() <= 1 - && sorting.isLimited() && sorting.getLimit() <= plannerContext.getPlannerConfig() - .getMaxTopNLimit()) + && sorting.getOffsetLimit().hasLimit() + && sorting.getOffsetLimit().getLimit() > 0 + && sorting.getOffsetLimit().getLimit() <= plannerContext.getPlannerConfig() + .getMaxTopNLimit() + && !sorting.getOffsetLimit().hasOffset()) && grouping.getHavingFilter() == null; if (!topNOk) { @@ -875,7 +884,7 @@ public class DruidQuery getVirtualColumns(true), dimensionSpec, topNMetricSpec, - Ints.checkedCast(sorting.getLimit()), + Ints.checkedCast(sorting.getOffsetLimit().getLimit()), filtration.getQuerySegmentSpec(), filtration.getDimFilter(), Granularities.ALL, @@ -897,6 +906,11 @@ public class DruidQuery return null; } + if (sorting != null && sorting.getOffsetLimit().hasLimit() && sorting.getOffsetLimit().getLimit() <= 0) { + // Cannot handle zero or negative limits. + return null; + } + final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature()); final DimFilterHavingSpec havingSpec; @@ -925,13 +939,7 @@ public class DruidQuery grouping.getAggregatorFactories(), postAggregators, havingSpec, - sorting != null - ? new DefaultLimitSpec( - sorting.getOrderBys(), - 0, - sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null - ) - : NoopLimitSpec.instance(), + Optional.ofNullable(sorting).orElse(Sorting.none()).limitSpec(), grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()), ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) ); @@ -957,11 +965,21 @@ public class DruidQuery final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature()); final ScanQuery.Order order; + long scanOffset = 0L; long scanLimit = 0L; if (sorting != null) { - if (sorting.isLimited()) { - scanLimit = sorting.getLimit(); + scanOffset = sorting.getOffsetLimit().getOffset(); + + if (sorting.getOffsetLimit().hasLimit()) { + final long limit = sorting.getOffsetLimit().getLimit(); + + if (limit == 0) { + // Can't handle zero limit (the Scan query engine would treat it as unlimited). + return null; + } + + scanLimit = limit; } final Sorting.SortKind sortKind = sorting.getSortKind(ColumnHolder.TIME_COLUMN_NAME); @@ -995,7 +1013,7 @@ public class DruidQuery getVirtualColumns(true), ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST, 0, - 0, + scanOffset, scanLimit, order, filtration.getDimFilter(), diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java index 4b939e66136..0f18102be2f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Sorting.java @@ -21,9 +21,16 @@ package org.apache.druid.sql.calcite.rel; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +import com.google.common.primitives.Ints; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.LimitSpec; +import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.sql.calcite.planner.OffsetLimit; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -50,27 +57,31 @@ public class Sorting @Nullable private final Projection projection; - @Nullable - private final Long limit; + private final OffsetLimit offsetLimit; private Sorting( final List orderBys, - @Nullable final Long limit, + final OffsetLimit offsetLimit, @Nullable final Projection projection ) { this.orderBys = Preconditions.checkNotNull(orderBys, "orderBys"); - this.limit = limit; + this.offsetLimit = offsetLimit; this.projection = projection; } public static Sorting create( final List orderBys, - @Nullable final Long limit, + final OffsetLimit offsetLimit, @Nullable final Projection projection ) { - return new Sorting(orderBys, limit, projection); + return new Sorting(orderBys, offsetLimit, projection); + } + + public static Sorting none() + { + return new Sorting(Collections.emptyList(), OffsetLimit.none(), null); } public SortKind getSortKind(final String timeColumn) @@ -102,15 +113,30 @@ public class Sorting return projection; } - public boolean isLimited() + public OffsetLimit getOffsetLimit() { - return limit != null; + return offsetLimit; } - @Nullable - public Long getLimit() + /** + * Returns a LimitSpec that encapsulates the orderBys, offset, and limit of this Sorting instance. Does not + * encapsulate the projection at all; you must still call {@link #getProjection()} for that. + */ + public LimitSpec limitSpec() { - return limit; + if (orderBys.isEmpty() && !offsetLimit.hasOffset() && !offsetLimit.hasLimit()) { + return NoopLimitSpec.instance(); + } else { + final Integer offsetAsInteger = offsetLimit.hasOffset() ? Ints.checkedCast(offsetLimit.getOffset()) : null; + final Integer limitAsInteger = offsetLimit.hasLimit() ? Ints.checkedCast(offsetLimit.getLimit()) : null; + + if (limitAsInteger != null && limitAsInteger == 0) { + // Zero limit would be rejected by DefaultLimitSpec. + throw new ISE("Cannot create LimitSpec with zero limit"); + } + + return new DefaultLimitSpec(orderBys, offsetAsInteger, limitAsInteger); + } } @Override @@ -125,13 +151,13 @@ public class Sorting Sorting sorting = (Sorting) o; return Objects.equals(orderBys, sorting.orderBys) && Objects.equals(projection, sorting.projection) && - Objects.equals(limit, sorting.limit); + Objects.equals(offsetLimit, sorting.offsetLimit); } @Override public int hashCode() { - return Objects.hash(orderBys, projection, limit); + return Objects.hash(orderBys, projection, offsetLimit); } @Override @@ -140,7 +166,7 @@ public class Sorting return "Sorting{" + "orderBys=" + orderBys + ", projection=" + projection + - ", limit=" + limit + + ", offsetLimit=" + offsetLimit + '}'; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java index bad8bdef7ed..ad621c5b467 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/SortCollapseRule.java @@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.rule; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.core.Sort; -import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.OffsetLimit; /** * Collapses two adjacent Sort operations together. Useful for queries like @@ -50,21 +50,13 @@ public class SortCollapseRule extends RelOptRule if (outerSort.collation.getFieldCollations().isEmpty() || outerSort.collation.getFieldCollations().equals(innerSort.collation.getFieldCollations())) { - final int innerOffset = Calcites.getOffset(innerSort); - final int innerFetch = Calcites.getFetch(innerSort); - final int outerOffset = Calcites.getOffset(outerSort); - final int outerFetch = Calcites.getFetch(outerSort); - - // Add up the offsets. - final int offset = innerOffset + outerOffset; - final int fetch = Calcites.collapseFetch(innerFetch, outerFetch, outerOffset); - + final OffsetLimit offsetLimit = OffsetLimit.fromSort(innerSort).andThen(OffsetLimit.fromSort(outerSort)); final Sort combined = innerSort.copy( innerSort.getTraitSet(), innerSort.getInput(), innerSort.getCollation(), - offset == 0 ? null : call.builder().literal(offset), - fetch < 0 ? null : call.builder().literal(fetch) + offsetLimit.getOffsetAsRexNode(call.builder().getRexBuilder()), + offsetLimit.getLimitAsRexNode(call.builder().getRexBuilder()) ); call.transformTo(combined); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 47d7777674a..05ea116bfed 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1090,6 +1090,32 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testSelectStarWithLimitAndOffset() throws Exception + { + testQuery( + PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE, + QUERY_CONTEXT_DEFAULT, + "SELECT * FROM druid.foo LIMIT 2 OFFSET 1", + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .offset(1) + .limit(2) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING}, + new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING} + ) + ); + } + @Test public void testSelectWithProjection() throws Exception { @@ -1290,6 +1316,80 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testSelectLimitWrappingOnTopOfOffset() throws Exception + { + testQuery( + "SELECT dim1 FROM druid.foo ORDER BY __time DESC OFFSET 1", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "dim1")) + .offset(1) + .limit(2) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of( + new Object[]{"def"}, + new Object[]{"1"} + ) + ); + } + + @Test + public void testSelectLimitWrappingOnTopOfOffsetAndLowLimit() throws Exception + { + testQuery( + "SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 1 OFFSET 1", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "dim1")) + .offset(1) + .limit(1) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of( + new Object[]{"def"} + ) + ); + } + + @Test + public void testSelectLimitWrappingOnTopOfOffsetAndHighLimit() throws Exception + { + testQuery( + "SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 10 OFFSET 1", + OUTER_LIMIT_CONTEXT, + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "dim1")) + .offset(1) + .limit(2) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of( + new Object[]{"def"}, + new Object[]{"1"} + ) + ); + } + @Test public void testSelectLimitWrappingAgainAkaIDontReallyQuiteUnderstandCalciteQueryPlanning() throws Exception { @@ -1527,11 +1627,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest new DefaultDimensionSpec("dim2", "d1", ValueType.STRING) ) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( - new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.LEXICOGRAPHIC)), - 2 - ) + DefaultLimitSpec + .builder() + .orderBy(new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.LEXICOGRAPHIC)) + .limit(2) + .build() ) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setContext(OUTER_LIMIT_CONTEXT) @@ -1589,12 +1689,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest new DefaultDimensionSpec("dim2", "d1", ValueType.STRING) ) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( - new OrderByColumnSpec("a0", Direction.DESCENDING, StringComparators.NUMERIC) - ), - 2 - ) + DefaultLimitSpec + .builder() + .orderBy(new OrderByColumnSpec("a0", Direction.DESCENDING, StringComparators.NUMERIC)) + .limit(2) + .build() ) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setContext(OUTER_LIMIT_CONTEXT) @@ -1701,16 +1800,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0"))) .setGranularity(Granularities.ALL) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "d0", OrderByColumnSpec.Direction.DESCENDING, StringComparators.LEXICOGRAPHIC ) - ), - Integer.MAX_VALUE - ) + ) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -2868,16 +2967,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDimensions(dimensions(new DefaultDimensionSpec("cnt", "d0", ValueType.LONG))) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "d0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC ) - ), - Integer.MAX_VALUE - ) + ) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -2926,16 +3025,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDimensions(dimensions(new DefaultDimensionSpec("cnt", "d0", ValueType.LONG))) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "d0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC ) - ), - Integer.MAX_VALUE - ) + ) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -3386,21 +3485,23 @@ public class CalciteQueryTest extends BaseCalciteQueryTest expressionPostAgg("p0", "substring(\"d0\", 1, -1)"), expressionPostAgg("p1", "strlen(\"d0\")") )) - .setLimitSpec(new DefaultLimitSpec( - ImmutableList.of( - new OrderByColumnSpec( - "p1", - OrderByColumnSpec.Direction.DESCENDING, - StringComparators.NUMERIC - ), - new OrderByColumnSpec( - "d0", - OrderByColumnSpec.Direction.ASCENDING, - StringComparators.LEXICOGRAPHIC + .setLimitSpec( + DefaultLimitSpec + .builder() + .orderBy( + new OrderByColumnSpec( + "p1", + OrderByColumnSpec.Direction.DESCENDING, + StringComparators.NUMERIC + ), + new OrderByColumnSpec( + "d0", + OrderByColumnSpec.Direction.ASCENDING, + StringComparators.LEXICOGRAPHIC + ) ) - ), - Integer.MAX_VALUE - )) + .build() + ) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), @@ -4194,9 +4295,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest // SELECT query with order by non-__time. "SELECT dim1 FROM druid.foo ORDER BY dim1", - // DISTINCT with OFFSET. - "SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5", - // JOIN condition with not-equals (<>). "SELECT foo.dim1, foo.dim2, l.k, l.v\n" + "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k", @@ -5049,16 +5147,17 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) .setPostAggregatorSpecs(ImmutableList.of(expressionPostAgg("p0", "(\"a0\" + \"a1\")"))) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "p0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC ) - ), - 3 - ) + ) + .limit(3) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -5097,16 +5196,17 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) ) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "p0", OrderByColumnSpec.Direction.ASCENDING, StringComparators.NUMERIC ) - ), - 3 - ) + ) + .limit(3) + .build() ) .setContext(QUERY_CONTEXT_NO_TOPN) .build() @@ -5360,16 +5460,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ValueType.FLOAT))) .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) .setLimitSpec( - new DefaultLimitSpec( - ImmutableList.of( + DefaultLimitSpec + .builder() + .orderBy( new OrderByColumnSpec( "d0", OrderByColumnSpec.Direction.DESCENDING, StringComparators.NUMERIC ) - ), - Integer.MAX_VALUE - ) + ) + .build() ) .setContext(QUERY_CONTEXT_DEFAULT) .build() @@ -6403,18 +6503,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @Test public void testSelectDistinctWithSortAsOuterQuery3() throws Exception - { - // Query reduces to LIMIT 0. - - testQuery( - "SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2", - ImmutableList.of(), - ImmutableList.of() - ); - } - - @Test - public void testSelectDistinctWithSortAsOuterQuery4() throws Exception { testQuery( "SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 DESC LIMIT 5) LIMIT 10", @@ -6444,6 +6532,42 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testSelectNonAggregatingWithLimitLiterallyZero() throws Exception + { + // Query reduces to LIMIT 0. + + testQuery( + "SELECT dim2 FROM druid.foo ORDER BY dim2 LIMIT 0", + ImmutableList.of(), + ImmutableList.of() + ); + } + + @Test + public void testSelectNonAggregatingWithLimitReducedToZero() throws Exception + { + // Query reduces to LIMIT 0. + + testQuery( + "SELECT * FROM (SELECT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2", + ImmutableList.of(), + ImmutableList.of() + ); + } + + @Test + public void testSelectAggregatingWithLimitReducedToZero() throws Exception + { + // Query reduces to LIMIT 0. + + testQuery( + "SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2", + ImmutableList.of(), + ImmutableList.of() + ); + } + @Test public void testCountDistinct() throws Exception { @@ -11293,6 +11417,47 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testTimeseriesWithLimitAndOffset() throws Exception + { + // Cannot vectorize due to expressions. + cannotVectorize(); + + // Timeseries cannot handle offsets, so the query morphs into a groupBy. + + testQuery( + "SELECT gran, SUM(cnt)\n" + + "FROM (\n" + + " SELECT floor(__time TO month) AS gran, cnt\n" + + " FROM druid.foo\n" + + ") AS x\n" + + "GROUP BY gran\n" + + "LIMIT 2\n" + + "OFFSET 1", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns( + expressionVirtualColumn( + "v0", + "timestamp_floor(\"__time\",'P1M',null,'UTC')", + ValueType.LONG + ) + ) + .setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ValueType.LONG))) + .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) + .setLimitSpec(DefaultLimitSpec.builder().offset(1).limit(2).build()) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{timestamp("2001-01-01"), 3L} + ) + ); + } + @Test public void testTimeseriesWithOrderByAndLimit() throws Exception { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/OffsetLimitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/OffsetLimitTest.java new file mode 100644 index 00000000000..c54e46a14dd --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/OffsetLimitTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.planner; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.StringUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class OffsetLimitTest +{ + @Test + public void testAndThen() + { + final List things = ImmutableList.of("a", "b", "c", "d", "e", "f", "g", "h"); + + for (int innerOffset = 0; innerOffset < 5; innerOffset++) { + for (int innerLimit = -1; innerLimit < 5; innerLimit++) { + for (int outerOffset = 0; outerOffset < 5; outerOffset++) { + for (int outerLimit = -1; outerLimit < 5; outerLimit++) { + final OffsetLimit inner = new OffsetLimit(innerOffset, innerLimit < 0 ? null : (long) innerLimit); + final OffsetLimit outer = new OffsetLimit(outerOffset, outerLimit < 0 ? null : (long) outerLimit); + final OffsetLimit combined = inner.andThen(outer); + + Assert.assertEquals( + StringUtils.format( + "innerOffset[%s], innerLimit[%s], outerOffset[%s], outerLimit[%s]", + innerOffset, + innerLimit, + outerOffset, + outerLimit + ), + things.stream() + .skip(innerOffset) + .limit(innerLimit < 0 ? Long.MAX_VALUE : innerLimit) + .skip(outerOffset) + .limit(outerLimit < 0 ? Long.MAX_VALUE : outerLimit) + .collect(Collectors.toList()), + things.stream() + .skip(combined.getOffset()) + .limit(combined.hasLimit() ? combined.getLimit() : Long.MAX_VALUE) + .collect(Collectors.toList()) + ); + } + } + } + } + } +}