Add SQL "OFFSET" clause. (#10279)

* Add SQL "OFFSET" clause.

Under the hood, this uses the new offset features from #10233 (Scan)
and #10235 (GroupBy). Since Timeseries and TopN queries do not currently
have an offset feature, SQL planning will switch from one of those to
Scan or GroupBy if users add an OFFSET.

Includes a refactoring to harmonize offset and limit planning using an
OffsetLimit wrapper class. This is useful because it ensures that the
various places that need to deal with offset and limit collapsing all
behave the same way, using its "andThen" method.

* Fix test and add another test.
This commit is contained in:
Gian Merlino 2020-08-21 14:11:54 -07:00 committed by GitHub
parent b5b3e6ecce
commit 0910d22f48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 610 additions and 201 deletions

View File

@ -54,6 +54,7 @@ FROM { <table> | (<subquery>) | <o1> [ INNER | LEFT ] JOIN <o2> ON condition }
[ HAVING expr ]
[ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ]
[ LIMIT limit ]
[ OFFSET offset ]
[ UNION ALL <another query> ]
```
@ -118,11 +119,29 @@ can only order by the `__time` column. For aggregation queries, ORDER BY can ord
### LIMIT
The LIMIT clause can be used to limit the number of rows returned. It can be used with any query type. It is pushed down
to Data processes for queries that run with the native TopN query type, but not the native GroupBy query type. Future
versions of Druid will support pushing down limits using the native GroupBy query type as well. If you notice that
adding a limit doesn't change performance very much, then it's likely that Druid didn't push down the limit for your
query.
The LIMIT clause limits the number of rows returned. In some situations Druid will push down this limit to data servers,
which boosts performance. Limits are always pushed down for queries that run with the native Scan or TopN query types.
With the native GroupBy query type, it is pushed down when ordering on a column that you are grouping by. If you notice
that adding a limit doesn't change performance very much, then it's possible that Druid wasn't able to push down the
limit for your query.
### OFFSET
The OFFSET clause skips a certain number of rows when returning results.
If both LIMIT and OFFSET are provided, then OFFSET will be applied first, followed by LIMIT. For example, using
LIMIT 100 OFFSET 10 will return 100 rows, starting from row number 10.
Together, LIMIT and OFFSET can be used to implement pagination. However, note that if the underlying datasource is
modified between page fetches, then the different pages will not necessarily align with each other.
There are two important factors that can affect the performance of queries that use OFFSET:
- Skipped rows still need to be generated internally and then discarded, meaning that raising offsets to high values
can cause queries to use additional resources.
- OFFSET is only supported by the Scan and GroupBy [native query types](#query-types). Therefore, a query with OFFSET
will use one of those two types, even if it might otherwise have run as a Timeseries or TopN. Switching query engines
in this way can affect performance.
### UNION ALL
@ -715,14 +734,15 @@ approximate, regardless of configuration.
Druid does not support all SQL features. In particular, the following features are not supported.
- JOIN between native datasources (table, lookup, subquery) and system tables.
- JOIN between native datasources (table, lookup, subquery) and [system tables](#metadata-tables).
- JOIN conditions that are not an equality between expressions from the left- and right-hand sides.
- JOIN conditions containing a constant value inside the condition.
- JOIN conditions on a column which contains a multi-value dimension.
- OVER clauses, and analytic functions such as `LAG` and `LEAD`.
- OFFSET clauses.
- ORDER BY for a non-aggregating query, except for `ORDER BY __time` or `ORDER BY __time DESC`, which are supported.
This restriction only applies to non-aggregating queries; you can ORDER BY any column in an aggregating query.
- DDL and DML.
- Using Druid-specific functions like `TIME_PARSE` and `APPROX_QUANTILE_DS` on [metadata tables](#metadata-tables).
- Using Druid-specific functions like `TIME_PARSE` and `APPROX_QUANTILE_DS` on [system tables](#metadata-tables).
Additionally, some Druid native query features are not supported by the SQL language. Some unsupported Druid features
include:

View File

@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Chars;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexLiteral;
@ -370,39 +369,6 @@ public class Calcites
return StringUtils.format("%s:%s", prefix, suffix);
}
public static int getInt(RexNode rex, int defaultValue)
{
return rex == null ? defaultValue : RexLiteral.intValue(rex);
}
public static int getOffset(Sort sort)
{
return Calcites.getInt(sort.offset, 0);
}
public static int getFetch(Sort sort)
{
return Calcites.getInt(sort.fetch, -1);
}
public static int collapseFetch(int innerFetch, int outerFetch, int outerOffset)
{
final int fetch;
if (innerFetch < 0 && outerFetch < 0) {
// Neither has a limit => no limit overall.
fetch = -1;
} else if (innerFetch < 0) {
// Outer limit only.
fetch = outerFetch;
} else if (outerFetch < 0) {
// Inner limit only.
fetch = Math.max(0, innerFetch - outerOffset);
} else {
fetch = Math.max(0, Math.min(innerFetch - outerOffset, outerFetch));
}
return fetch;
}
public static Class<?> sqlTypeNameJdbcToJavaClass(SqlTypeName typeName)
{
// reference: https://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html

View File

@ -24,7 +24,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfig;
@ -52,7 +51,6 @@ import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.BasicSqlType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
@ -270,7 +268,10 @@ public class DruidPlanner implements Closeable
return planExplanation(bindableRel, explain, ImmutableSet.of());
} else {
final BindableRel theRel = bindableRel;
final DataContext dataContext = plannerContext.createDataContext((JavaTypeFactory) planner.getTypeFactory(), plannerContext.getParameters());
final DataContext dataContext = plannerContext.createDataContext(
(JavaTypeFactory) planner.getTypeFactory(),
plannerContext.getParameters()
);
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
final Enumerable enumerable = theRel.bind(dataContext);
final Enumerator enumerator = enumerable.enumerator();
@ -317,12 +318,11 @@ public class DruidPlanner implements Closeable
* the web console, allowing it to apply a limit to queries without rewriting the original SQL.
*
* @param root root node
*
* @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
*/
@Nullable
private RelNode possiblyWrapRootWithOuterLimitFromContext(
RelRoot root
)
private RelNode possiblyWrapRootWithOuterLimitFromContext(RelRoot root)
{
Object outerLimitObj = plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT);
Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true);
@ -331,42 +331,30 @@ public class DruidPlanner implements Closeable
}
if (root.rel instanceof Sort) {
Sort innerSort = (Sort) root.rel;
final int offset = Calcites.getOffset(innerSort);
final int innerLimit = Calcites.getFetch(innerSort);
final int fetch = Calcites.collapseFetch(
innerLimit,
Ints.checkedCast(outerLimit),
0
);
Sort sort = (Sort) root.rel;
if (fetch == innerLimit) {
final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort);
final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit));
if (newOffsetLimit.equals(originalOffsetLimit)) {
// nothing to do, don't bother to make a new sort
return root.rel;
}
return LogicalSort.create(
innerSort.getInput(),
innerSort.collation,
offset > 0 ? makeBigIntLiteral(offset) : null,
makeBigIntLiteral(fetch)
sort.getInput(),
sort.collation,
newOffsetLimit.getOffsetAsRexNode(rexBuilder),
newOffsetLimit.getLimitAsRexNode(rexBuilder)
);
} else {
return LogicalSort.create(
root.rel,
root.collation,
null,
new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder)
);
}
return LogicalSort.create(
root.rel,
root.collation,
null,
makeBigIntLiteral(outerLimit)
);
}
private RexNode makeBigIntLiteral(long value)
{
return rexBuilder.makeLiteral(
value,
new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT),
false
);
}
private PlannerResult planExplanation(

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import com.google.common.base.Preconditions;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.BasicSqlType;
import org.apache.calcite.sql.type.SqlTypeName;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Represents an offset and a limit.
*/
public class OffsetLimit
{
private final long offset;
@Nullable
private final Long limit;
OffsetLimit(long offset, @Nullable Long limit)
{
Preconditions.checkArgument(offset >= 0, "offset >= 0");
Preconditions.checkArgument(limit == null || limit >= 0, "limit == null || limit >= 0");
this.offset = offset;
this.limit = limit;
}
public static OffsetLimit none()
{
return new OffsetLimit(0, null);
}
public static OffsetLimit fromSort(Sort sort)
{
final long offset = sort.offset == null ? 0 : (long) RexLiteral.intValue(sort.offset);
final Long limit = sort.fetch == null ? null : (long) RexLiteral.intValue(sort.fetch);
return new OffsetLimit(offset, limit);
}
public boolean hasOffset()
{
return offset > 0;
}
public long getOffset()
{
return offset;
}
public boolean hasLimit()
{
return limit != null;
}
public long getLimit()
{
Preconditions.checkState(limit != null, "limit is not present");
return limit;
}
@Nullable
public RexNode getOffsetAsRexNode(final RexBuilder builder)
{
if (offset == 0) {
return null;
} else {
return builder.makeLiteral(
offset,
new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT),
false
);
}
}
@Nullable
public RexNode getLimitAsRexNode(final RexBuilder builder)
{
if (limit == null) {
return null;
} else {
return builder.makeLiteral(
limit,
new BasicSqlType(DruidTypeSystem.INSTANCE, SqlTypeName.BIGINT),
false
);
}
}
/**
* Return a new instance that represents applying this one first, then the {@param next} one.
*/
public OffsetLimit andThen(final OffsetLimit next)
{
final Long newLimit;
if (limit == null && next.limit == null) {
// Neither has a limit => no limit overall.
newLimit = null;
} else if (limit == null) {
// Outer limit only.
newLimit = next.limit;
} else if (next.limit == null) {
// Inner limit only.
newLimit = Math.max(0, limit - next.offset);
} else {
// Both outer and inner limit.
newLimit = Math.max(0, Math.min(limit - next.offset, next.limit));
}
return new OffsetLimit(offset + next.offset, newLimit);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OffsetLimit that = (OffsetLimit) o;
return Objects.equals(offset, that.offset) &&
Objects.equals(limit, that.limit);
}
@Override
public int hashCode()
{
return Objects.hash(offset, limit);
}
@Override
public String toString()
{
return "OffsetLimit{" +
"offset=" + offset +
", limit=" + limit +
'}';
}
}

View File

@ -37,7 +37,6 @@ import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
@ -53,8 +52,6 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
@ -76,6 +73,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.OffsetLimit;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rule.GroupByRules;
import org.apache.druid.sql.calcite.table.RowSignatures;
@ -88,6 +86,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
@ -525,16 +524,11 @@ public class DruidQuery
final Sort sort = Preconditions.checkNotNull(partialQuery.getSort(), "sort");
final Project sortProject = partialQuery.getSortProject();
// Extract limit.
final Long limit = sort.fetch != null ? ((Number) RexLiteral.value(sort.fetch)).longValue() : null;
final List<OrderByColumnSpec> orderBys = new ArrayList<>(sort.getChildExps().size());
if (sort.offset != null) {
// Druid cannot currently handle LIMIT with OFFSET.
throw new CannotBuildQueryException(sort);
}
// Extract limit and offset.
final OffsetLimit offsetLimit = OffsetLimit.fromSort(sort);
// Extract orderBy column specs.
final List<OrderByColumnSpec> orderBys = new ArrayList<>(sort.getChildExps().size());
for (int sortKey = 0; sortKey < sort.getChildExps().size(); sortKey++) {
final RexNode sortExpression = sort.getChildExps().get(sortKey);
final RelFieldCollation collation = sort.getCollation().getFieldCollations().get(sortKey);
@ -583,7 +577,7 @@ public class DruidQuery
projection = Projection.postAggregation(sortProject, plannerContext, rowSignature, "s");
}
return Sorting.create(orderBys, limit, projection);
return Sorting.create(orderBys, offsetLimit, projection);
}
/**
@ -762,9 +756,20 @@ public class DruidQuery
Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec().getOutputName()
);
if (sorting != null) {
// If there is sorting, set timeseriesLimit to given value if less than Integer.Max_VALUE
if (sorting.isLimited()) {
timeseriesLimit = Ints.checkedCast(sorting.getLimit());
if (sorting.getOffsetLimit().hasOffset()) {
// Timeseries cannot handle offsets.
return null;
}
if (sorting.getOffsetLimit().hasLimit()) {
final long limit = sorting.getOffsetLimit().getLimit();
if (limit == 0) {
// Can't handle zero limit (the Timeseries query engine would treat it as unlimited).
return null;
}
timeseriesLimit = Ints.checkedCast(limit);
}
switch (sorting.getSortKind(dimensionExpression.getOutputName())) {
@ -817,14 +822,18 @@ public class DruidQuery
@Nullable
public TopNQuery toTopNQuery()
{
// Must have GROUP BY one column, no GROUPING SETS, ORDER BY 1 column, limit less than maxTopNLimit, no HAVING.
// Must have GROUP BY one column, no GROUPING SETS, ORDER BY 1 column, LIMIT > 0 and maxTopNLimit,
// no OFFSET, no HAVING.
final boolean topNOk = grouping != null
&& grouping.getDimensions().size() == 1
&& !grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
&& sorting != null
&& (sorting.getOrderBys().size() <= 1
&& sorting.isLimited() && sorting.getLimit() <= plannerContext.getPlannerConfig()
.getMaxTopNLimit())
&& sorting.getOffsetLimit().hasLimit()
&& sorting.getOffsetLimit().getLimit() > 0
&& sorting.getOffsetLimit().getLimit() <= plannerContext.getPlannerConfig()
.getMaxTopNLimit()
&& !sorting.getOffsetLimit().hasOffset())
&& grouping.getHavingFilter() == null;
if (!topNOk) {
@ -875,7 +884,7 @@ public class DruidQuery
getVirtualColumns(true),
dimensionSpec,
topNMetricSpec,
Ints.checkedCast(sorting.getLimit()),
Ints.checkedCast(sorting.getOffsetLimit().getLimit()),
filtration.getQuerySegmentSpec(),
filtration.getDimFilter(),
Granularities.ALL,
@ -897,6 +906,11 @@ public class DruidQuery
return null;
}
if (sorting != null && sorting.getOffsetLimit().hasLimit() && sorting.getOffsetLimit().getLimit() <= 0) {
// Cannot handle zero or negative limits.
return null;
}
final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
final DimFilterHavingSpec havingSpec;
@ -925,13 +939,7 @@ public class DruidQuery
grouping.getAggregatorFactories(),
postAggregators,
havingSpec,
sorting != null
? new DefaultLimitSpec(
sorting.getOrderBys(),
0,
sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null
)
: NoopLimitSpec.instance(),
Optional.ofNullable(sorting).orElse(Sorting.none()).limitSpec(),
grouping.getSubtotals().toSubtotalsSpec(grouping.getDimensionSpecs()),
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
@ -957,11 +965,21 @@ public class DruidQuery
final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
final ScanQuery.Order order;
long scanOffset = 0L;
long scanLimit = 0L;
if (sorting != null) {
if (sorting.isLimited()) {
scanLimit = sorting.getLimit();
scanOffset = sorting.getOffsetLimit().getOffset();
if (sorting.getOffsetLimit().hasLimit()) {
final long limit = sorting.getOffsetLimit().getLimit();
if (limit == 0) {
// Can't handle zero limit (the Scan query engine would treat it as unlimited).
return null;
}
scanLimit = limit;
}
final Sorting.SortKind sortKind = sorting.getSortKind(ColumnHolder.TIME_COLUMN_NAME);
@ -995,7 +1013,7 @@ public class DruidQuery
getVirtualColumns(true),
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST,
0,
0,
scanOffset,
scanLimit,
order,
filtration.getDimFilter(),

View File

@ -21,9 +21,16 @@ package org.apache.druid.sql.calcite.rel;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.LimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.sql.calcite.planner.OffsetLimit;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -50,27 +57,31 @@ public class Sorting
@Nullable
private final Projection projection;
@Nullable
private final Long limit;
private final OffsetLimit offsetLimit;
private Sorting(
final List<OrderByColumnSpec> orderBys,
@Nullable final Long limit,
final OffsetLimit offsetLimit,
@Nullable final Projection projection
)
{
this.orderBys = Preconditions.checkNotNull(orderBys, "orderBys");
this.limit = limit;
this.offsetLimit = offsetLimit;
this.projection = projection;
}
public static Sorting create(
final List<OrderByColumnSpec> orderBys,
@Nullable final Long limit,
final OffsetLimit offsetLimit,
@Nullable final Projection projection
)
{
return new Sorting(orderBys, limit, projection);
return new Sorting(orderBys, offsetLimit, projection);
}
public static Sorting none()
{
return new Sorting(Collections.emptyList(), OffsetLimit.none(), null);
}
public SortKind getSortKind(final String timeColumn)
@ -102,15 +113,30 @@ public class Sorting
return projection;
}
public boolean isLimited()
public OffsetLimit getOffsetLimit()
{
return limit != null;
return offsetLimit;
}
@Nullable
public Long getLimit()
/**
* Returns a LimitSpec that encapsulates the orderBys, offset, and limit of this Sorting instance. Does not
* encapsulate the projection at all; you must still call {@link #getProjection()} for that.
*/
public LimitSpec limitSpec()
{
return limit;
if (orderBys.isEmpty() && !offsetLimit.hasOffset() && !offsetLimit.hasLimit()) {
return NoopLimitSpec.instance();
} else {
final Integer offsetAsInteger = offsetLimit.hasOffset() ? Ints.checkedCast(offsetLimit.getOffset()) : null;
final Integer limitAsInteger = offsetLimit.hasLimit() ? Ints.checkedCast(offsetLimit.getLimit()) : null;
if (limitAsInteger != null && limitAsInteger == 0) {
// Zero limit would be rejected by DefaultLimitSpec.
throw new ISE("Cannot create LimitSpec with zero limit");
}
return new DefaultLimitSpec(orderBys, offsetAsInteger, limitAsInteger);
}
}
@Override
@ -125,13 +151,13 @@ public class Sorting
Sorting sorting = (Sorting) o;
return Objects.equals(orderBys, sorting.orderBys) &&
Objects.equals(projection, sorting.projection) &&
Objects.equals(limit, sorting.limit);
Objects.equals(offsetLimit, sorting.offsetLimit);
}
@Override
public int hashCode()
{
return Objects.hash(orderBys, projection, limit);
return Objects.hash(orderBys, projection, offsetLimit);
}
@Override
@ -140,7 +166,7 @@ public class Sorting
return "Sorting{" +
"orderBys=" + orderBys +
", projection=" + projection +
", limit=" + limit +
", offsetLimit=" + offsetLimit +
'}';
}
}

View File

@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.rule;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Sort;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.OffsetLimit;
/**
* Collapses two adjacent Sort operations together. Useful for queries like
@ -50,21 +50,13 @@ public class SortCollapseRule extends RelOptRule
if (outerSort.collation.getFieldCollations().isEmpty()
|| outerSort.collation.getFieldCollations().equals(innerSort.collation.getFieldCollations())) {
final int innerOffset = Calcites.getOffset(innerSort);
final int innerFetch = Calcites.getFetch(innerSort);
final int outerOffset = Calcites.getOffset(outerSort);
final int outerFetch = Calcites.getFetch(outerSort);
// Add up the offsets.
final int offset = innerOffset + outerOffset;
final int fetch = Calcites.collapseFetch(innerFetch, outerFetch, outerOffset);
final OffsetLimit offsetLimit = OffsetLimit.fromSort(innerSort).andThen(OffsetLimit.fromSort(outerSort));
final Sort combined = innerSort.copy(
innerSort.getTraitSet(),
innerSort.getInput(),
innerSort.getCollation(),
offset == 0 ? null : call.builder().literal(offset),
fetch < 0 ? null : call.builder().literal(fetch)
offsetLimit.getOffsetAsRexNode(call.builder().getRexBuilder()),
offsetLimit.getLimitAsRexNode(call.builder().getRexBuilder())
);
call.transformTo(combined);

View File

@ -1090,6 +1090,32 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testSelectStarWithLimitAndOffset() throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT_NO_COMPLEX_SERDE,
QUERY_CONTEXT_DEFAULT,
"SELECT * FROM druid.foo LIMIT 2 OFFSET 1",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.offset(1)
.limit(2)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{timestamp("2000-01-02"), 1L, "10.1", NULL_STRING, "[\"b\",\"c\"]", 2.0f, 2.0, HLLC_STRING},
new Object[]{timestamp("2000-01-03"), 1L, "2", "", "d", 3f, 3.0, HLLC_STRING}
)
);
}
@Test
public void testSelectWithProjection() throws Exception
{
@ -1290,6 +1316,80 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testSelectLimitWrappingOnTopOfOffset() throws Exception
{
testQuery(
"SELECT dim1 FROM druid.foo ORDER BY __time DESC OFFSET 1",
OUTER_LIMIT_CONTEXT,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns(ImmutableList.of("__time", "dim1"))
.offset(1)
.limit(2)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(OUTER_LIMIT_CONTEXT)
.build()
),
ImmutableList.of(
new Object[]{"def"},
new Object[]{"1"}
)
);
}
@Test
public void testSelectLimitWrappingOnTopOfOffsetAndLowLimit() throws Exception
{
testQuery(
"SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 1 OFFSET 1",
OUTER_LIMIT_CONTEXT,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns(ImmutableList.of("__time", "dim1"))
.offset(1)
.limit(1)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(OUTER_LIMIT_CONTEXT)
.build()
),
ImmutableList.of(
new Object[]{"def"}
)
);
}
@Test
public void testSelectLimitWrappingOnTopOfOffsetAndHighLimit() throws Exception
{
testQuery(
"SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 10 OFFSET 1",
OUTER_LIMIT_CONTEXT,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns(ImmutableList.of("__time", "dim1"))
.offset(1)
.limit(2)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(OUTER_LIMIT_CONTEXT)
.build()
),
ImmutableList.of(
new Object[]{"def"},
new Object[]{"1"}
)
);
}
@Test
public void testSelectLimitWrappingAgainAkaIDontReallyQuiteUnderstandCalciteQueryPlanning() throws Exception
{
@ -1527,11 +1627,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new DefaultDimensionSpec("dim2", "d1", ValueType.STRING)
)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.LEXICOGRAPHIC)),
2
)
DefaultLimitSpec
.builder()
.orderBy(new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.LEXICOGRAPHIC))
.limit(2)
.build()
)
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(OUTER_LIMIT_CONTEXT)
@ -1589,12 +1689,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new DefaultDimensionSpec("dim2", "d1", ValueType.STRING)
)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec("a0", Direction.DESCENDING, StringComparators.NUMERIC)
),
2
)
DefaultLimitSpec
.builder()
.orderBy(new OrderByColumnSpec("a0", Direction.DESCENDING, StringComparators.NUMERIC))
.limit(2)
.build()
)
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(OUTER_LIMIT_CONTEXT)
@ -1701,16 +1800,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0")))
.setGranularity(Granularities.ALL)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
DefaultLimitSpec
.builder()
.orderBy(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.DESCENDING,
StringComparators.LEXICOGRAPHIC
)
),
Integer.MAX_VALUE
)
)
.build()
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
@ -2868,16 +2967,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setDimensions(dimensions(new DefaultDimensionSpec("cnt", "d0", ValueType.LONG)))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
DefaultLimitSpec
.builder()
.orderBy(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.build()
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
@ -2926,16 +3025,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setDimensions(dimensions(new DefaultDimensionSpec("cnt", "d0", ValueType.LONG)))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
DefaultLimitSpec
.builder()
.orderBy(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.build()
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
@ -3386,21 +3485,23 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
expressionPostAgg("p0", "substring(\"d0\", 1, -1)"),
expressionPostAgg("p1", "strlen(\"d0\")")
))
.setLimitSpec(new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"p1",
OrderByColumnSpec.Direction.DESCENDING,
StringComparators.NUMERIC
),
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.LEXICOGRAPHIC
.setLimitSpec(
DefaultLimitSpec
.builder()
.orderBy(
new OrderByColumnSpec(
"p1",
OrderByColumnSpec.Direction.DESCENDING,
StringComparators.NUMERIC
),
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.LEXICOGRAPHIC
)
)
),
Integer.MAX_VALUE
))
.build()
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -4194,9 +4295,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
// SELECT query with order by non-__time.
"SELECT dim1 FROM druid.foo ORDER BY dim1",
// DISTINCT with OFFSET.
"SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5",
// JOIN condition with not-equals (<>).
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k",
@ -5049,16 +5147,17 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setPostAggregatorSpecs(ImmutableList.of(expressionPostAgg("p0", "(\"a0\" + \"a1\")")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
DefaultLimitSpec
.builder()
.orderBy(
new OrderByColumnSpec(
"p0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
3
)
)
.limit(3)
.build()
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
@ -5097,16 +5196,17 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
DefaultLimitSpec
.builder()
.orderBy(
new OrderByColumnSpec(
"p0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
3
)
)
.limit(3)
.build()
)
.setContext(QUERY_CONTEXT_NO_TOPN)
.build()
@ -5360,16 +5460,16 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ValueType.FLOAT)))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
DefaultLimitSpec
.builder()
.orderBy(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.DESCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.build()
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
@ -6403,18 +6503,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
public void testSelectDistinctWithSortAsOuterQuery3() throws Exception
{
// Query reduces to LIMIT 0.
testQuery(
"SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2",
ImmutableList.of(),
ImmutableList.of()
);
}
@Test
public void testSelectDistinctWithSortAsOuterQuery4() throws Exception
{
testQuery(
"SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 DESC LIMIT 5) LIMIT 10",
@ -6444,6 +6532,42 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testSelectNonAggregatingWithLimitLiterallyZero() throws Exception
{
// Query reduces to LIMIT 0.
testQuery(
"SELECT dim2 FROM druid.foo ORDER BY dim2 LIMIT 0",
ImmutableList.of(),
ImmutableList.of()
);
}
@Test
public void testSelectNonAggregatingWithLimitReducedToZero() throws Exception
{
// Query reduces to LIMIT 0.
testQuery(
"SELECT * FROM (SELECT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2",
ImmutableList.of(),
ImmutableList.of()
);
}
@Test
public void testSelectAggregatingWithLimitReducedToZero() throws Exception
{
// Query reduces to LIMIT 0.
testQuery(
"SELECT * FROM (SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5) OFFSET 2",
ImmutableList.of(),
ImmutableList.of()
);
}
@Test
public void testCountDistinct() throws Exception
{
@ -11293,6 +11417,47 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testTimeseriesWithLimitAndOffset() throws Exception
{
// Cannot vectorize due to expressions.
cannotVectorize();
// Timeseries cannot handle offsets, so the query morphs into a groupBy.
testQuery(
"SELECT gran, SUM(cnt)\n"
+ "FROM (\n"
+ " SELECT floor(__time TO month) AS gran, cnt\n"
+ " FROM druid.foo\n"
+ ") AS x\n"
+ "GROUP BY gran\n"
+ "LIMIT 2\n"
+ "OFFSET 1",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(
expressionVirtualColumn(
"v0",
"timestamp_floor(\"__time\",'P1M',null,'UTC')",
ValueType.LONG
)
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ValueType.LONG)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(DefaultLimitSpec.builder().offset(1).limit(2).build())
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{timestamp("2001-01-01"), 3L}
)
);
}
@Test
public void testTimeseriesWithOrderByAndLimit() throws Exception
{

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
public class OffsetLimitTest
{
@Test
public void testAndThen()
{
final List<String> things = ImmutableList.of("a", "b", "c", "d", "e", "f", "g", "h");
for (int innerOffset = 0; innerOffset < 5; innerOffset++) {
for (int innerLimit = -1; innerLimit < 5; innerLimit++) {
for (int outerOffset = 0; outerOffset < 5; outerOffset++) {
for (int outerLimit = -1; outerLimit < 5; outerLimit++) {
final OffsetLimit inner = new OffsetLimit(innerOffset, innerLimit < 0 ? null : (long) innerLimit);
final OffsetLimit outer = new OffsetLimit(outerOffset, outerLimit < 0 ? null : (long) outerLimit);
final OffsetLimit combined = inner.andThen(outer);
Assert.assertEquals(
StringUtils.format(
"innerOffset[%s], innerLimit[%s], outerOffset[%s], outerLimit[%s]",
innerOffset,
innerLimit,
outerOffset,
outerLimit
),
things.stream()
.skip(innerOffset)
.limit(innerLimit < 0 ? Long.MAX_VALUE : innerLimit)
.skip(outerOffset)
.limit(outerLimit < 0 ? Long.MAX_VALUE : outerLimit)
.collect(Collectors.toList()),
things.stream()
.skip(combined.getOffset())
.limit(combined.hasLimit() ? combined.getLimit() : Long.MAX_VALUE)
.collect(Collectors.toList())
);
}
}
}
}
}
}