From d4cace385fa8f314a947aae4d9b55d0ef71cb111 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 23 Oct 2021 17:18:43 -0700 Subject: [PATCH] SQL: Allow Scans to be used as outer queries. (#11831) * SQL: Allow Scans to be used as outer queries. This has been possible in the native query system for a while, but the capability hasn't yet propagated into the SQL layer. One example of where this is useful is a query like: SELECT * FROM (... LIMIT X) WHERE Because this expands the kinds of subquery structures the SQL layer will consider, it was also necessary to improve the cost calculations. These changes appear in PartialDruidQuery and DruidOuterQueryRel. The ideas are: - Attach per-column penalties to the output signature of each query, instead of to the initial projection that starts a query. This encourages moving projections into subqueries instead of leaving them on outer queries. - Only attach penalties to projections if there are actually expressions happening. So, now, projections that simply reorder or remove fields are free. - Attach a constant penalty to every outer query. This discourages creating them when they are not needed. The changes are generally beneficial to the test cases we have in CalciteQueryTest. Most plans are unchanged, or are changed in purely cosmetic ways. Two have changed for the better: - testUsingSubqueryWithLimit now returns a constant from the subquery, instead of returning every column. - testJoinOuterGroupByAndSubqueryHasLimit returns a minimal set of columns from the innermost subquery; two unnecessary columns are no longer there. * Fix various DS operator conversions. These were all implemented as direct conversions, which isn't appropriate because they do not actually map onto native functions. These are only usable as post-aggregations. * Test case adjustment. --- .../HllSketchEstimateOperatorConversion.java | 9 +- ...mateWithErrorBoundsOperatorConversion.java | 10 +- .../HllSketchToStringOperatorConversion.java | 9 +- ...ublesSketchQuantileOperatorConversion.java | 2 +- .../DoublesSketchRankOperatorConversion.java | 2 +- ...SketchSingleArgBaseOperatorConversion.java | 19 ++-- ...oublesSketchSummaryOperatorConversion.java | 9 +- ...ThetaSketchEstimateOperatorConversion.java | 9 +- ...mateWithErrorBoundsOperatorConversion.java | 10 +- .../expression/DirectOperatorConversion.java | 3 + .../druid/sql/calcite/rel/CostEstimates.java | 12 +-- .../sql/calcite/rel/DruidJoinQueryRel.java | 4 +- .../sql/calcite/rel/DruidOuterQueryRel.java | 5 +- .../druid/sql/calcite/rel/DruidQuery.java | 11 +- .../sql/calcite/rel/PartialDruidQuery.java | 28 +++-- .../druid/sql/calcite/rule/DruidJoinRule.java | 37 +++++-- .../druid/sql/calcite/rule/DruidRules.java | 86 ++++----------- .../calcite/CalciteCorrelatedQueryTest.java | 7 +- .../druid/sql/calcite/CalciteQueryTest.java | 101 +++++++++++++----- 19 files changed, 197 insertions(+), 176 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateOperatorConversion.java index 11bd15d3ae9..2379a144ba0 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateOperatorConversion.java @@ -32,16 +32,16 @@ import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimatePostAggregator; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; import java.util.List; -public class HllSketchEstimateOperatorConversion extends DirectOperatorConversion +public class HllSketchEstimateOperatorConversion implements SqlOperatorConversion { private static final String FUNCTION_NAME = "HLL_SKETCH_ESTIMATE"; private static final SqlFunction SQL_FUNCTION = OperatorConversions @@ -51,11 +51,6 @@ public class HllSketchEstimateOperatorConversion extends DirectOperatorConversio .returnTypeInference(ReturnTypes.DOUBLE) .build(); - public HllSketchEstimateOperatorConversion() - { - super(SQL_FUNCTION, FUNCTION_NAME); - } - @Override public SqlOperator calciteOperator() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java index c645ca724a5..9082e00211c 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java @@ -31,16 +31,16 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; import java.util.List; -public class HllSketchEstimateWithErrorBoundsOperatorConversion extends DirectOperatorConversion +public class HllSketchEstimateWithErrorBoundsOperatorConversion implements SqlOperatorConversion { private static final String FUNCTION_NAME = "HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS"; private static final SqlFunction SQL_FUNCTION = OperatorConversions @@ -50,12 +50,6 @@ public class HllSketchEstimateWithErrorBoundsOperatorConversion extends DirectOp .returnTypeNonNull(SqlTypeName.OTHER) .build(); - - public HllSketchEstimateWithErrorBoundsOperatorConversion() - { - super(SQL_FUNCTION, FUNCTION_NAME); - } - @Override public SqlOperator calciteOperator() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchToStringOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchToStringOperatorConversion.java index fe0c56b56ad..c143c6647c7 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchToStringOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchToStringOperatorConversion.java @@ -29,16 +29,16 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToStringPostAggregator; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; import java.util.List; -public class HllSketchToStringOperatorConversion extends DirectOperatorConversion +public class HllSketchToStringOperatorConversion implements SqlOperatorConversion { private static final String FUNCTION_NAME = "HLL_SKETCH_TO_STRING"; private static final SqlFunction SQL_FUNCTION = OperatorConversions @@ -47,11 +47,6 @@ public class HllSketchToStringOperatorConversion extends DirectOperatorConversio .returnTypeNonNull(SqlTypeName.VARCHAR) .build(); - public HllSketchToStringOperatorConversion() - { - super(SQL_FUNCTION, FUNCTION_NAME); - } - @Override public SqlOperator calciteOperator() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchQuantileOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchQuantileOperatorConversion.java index 2387fe8cccb..c54f6ddf564 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchQuantileOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchQuantileOperatorConversion.java @@ -40,7 +40,7 @@ public class DoublesSketchQuantileOperatorConversion extends DoublesSketchSingle public DoublesSketchQuantileOperatorConversion() { - super(SQL_FUNCTION, FUNCTION_NAME); + super(SQL_FUNCTION); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchRankOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchRankOperatorConversion.java index 327f757a718..613c9f2da4e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchRankOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchRankOperatorConversion.java @@ -39,7 +39,7 @@ public class DoublesSketchRankOperatorConversion extends DoublesSketchSingleArgB public DoublesSketchRankOperatorConversion() { - super(SQL_FUNCTION, FUNCTION_NAME); + super(SQL_FUNCTION); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSingleArgBaseOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSingleArgBaseOperatorConversion.java index a284c96de11..e1756507ee5 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSingleArgBaseOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSingleArgBaseOperatorConversion.java @@ -26,23 +26,28 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; import java.util.List; -public abstract class DoublesSketchSingleArgBaseOperatorConversion extends DirectOperatorConversion +public abstract class DoublesSketchSingleArgBaseOperatorConversion implements SqlOperatorConversion { - protected DoublesSketchSingleArgBaseOperatorConversion( - SqlOperator operator, - String druidFunctionName - ) + private final SqlOperator operator; + + protected DoublesSketchSingleArgBaseOperatorConversion(SqlOperator operator) { - super(operator, druidFunctionName); + this.operator = operator; + } + + @Override + public SqlOperator calciteOperator() + { + return operator; } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSummaryOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSummaryOperatorConversion.java index 4dd01fd8540..c0a742cc78e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSummaryOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSummaryOperatorConversion.java @@ -29,16 +29,16 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; import java.util.List; -public class DoublesSketchSummaryOperatorConversion extends DirectOperatorConversion +public class DoublesSketchSummaryOperatorConversion implements SqlOperatorConversion { private static final String FUNCTION_NAME = "DS_QUANTILE_SUMMARY"; private static final SqlFunction SQL_FUNCTION = OperatorConversions @@ -47,11 +47,6 @@ public class DoublesSketchSummaryOperatorConversion extends DirectOperatorConver .returnTypeNonNull(SqlTypeName.VARCHAR) .build(); - public DoublesSketchSummaryOperatorConversion() - { - super(SQL_FUNCTION, FUNCTION_NAME); - } - @Override public SqlOperator calciteOperator() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateOperatorConversion.java index 48702345cc0..cd943c71928 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateOperatorConversion.java @@ -29,16 +29,16 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; import java.util.List; -public class ThetaSketchEstimateOperatorConversion extends DirectOperatorConversion +public class ThetaSketchEstimateOperatorConversion implements SqlOperatorConversion { private static final String FUNCTION_NAME = "THETA_SKETCH_ESTIMATE"; private static final SqlFunction SQL_FUNCTION = OperatorConversions @@ -47,11 +47,6 @@ public class ThetaSketchEstimateOperatorConversion extends DirectOperatorConvers .returnTypeInference(ReturnTypes.DOUBLE) .build(); - public ThetaSketchEstimateOperatorConversion() - { - super(SQL_FUNCTION, FUNCTION_NAME); - } - @Override public SqlOperator calciteOperator() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java index c54f2c59b45..206777ac82f 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java @@ -31,16 +31,16 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.sql.calcite.expression.DirectOperatorConversion; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; +import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; import java.util.List; -public class ThetaSketchEstimateWithErrorBoundsOperatorConversion extends DirectOperatorConversion +public class ThetaSketchEstimateWithErrorBoundsOperatorConversion implements SqlOperatorConversion { private static final String FUNCTION_NAME = "THETA_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS"; private static final SqlFunction SQL_FUNCTION = OperatorConversions @@ -49,12 +49,6 @@ public class ThetaSketchEstimateWithErrorBoundsOperatorConversion extends Direct .returnTypeNonNull(SqlTypeName.OTHER) .build(); - - public ThetaSketchEstimateWithErrorBoundsOperatorConversion() - { - super(SQL_FUNCTION, FUNCTION_NAME); - } - @Override public SqlOperator calciteOperator() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/DirectOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/DirectOperatorConversion.java index 4246e4710a6..34be8e54b15 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/DirectOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/DirectOperatorConversion.java @@ -26,6 +26,9 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; +/** + * Conversion for SQL operators that map 1-1 onto native functions. + */ public class DirectOperatorConversion implements SqlOperatorConversion { private final SqlOperator operator; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/CostEstimates.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/CostEstimates.java index da93ddfbd17..28f7c21182d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/CostEstimates.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/CostEstimates.java @@ -35,9 +35,9 @@ public class CostEstimates static final double COST_BASE = 1; /** - * Cost to read a value out of a column directly. + * Cost to include a column in query output. */ - static final double COST_COLUMN_READ = 0.05; + static final double COST_OUTPUT_COLUMN = 0.05; /** * Cost to compute and read an expression. @@ -77,14 +77,14 @@ public class CostEstimates static final double MULTIPLIER_OUTER_QUERY = .1; /** - * Cost to add to a join when either side is a subquery. Strongly encourages avoiding subqueries, since they must be - * inlined and then the join must run on the Broker. + * Cost to add to a subquery. Strongly encourages avoiding subqueries, since they must be inlined and then the join + * must run on the Broker. */ - static final double COST_JOIN_SUBQUERY = 1e5; + static final double COST_SUBQUERY = 1e5; /** * Cost to perform a cross join. Strongly encourages pushing down filters into join conditions, even if it means - * we need to add a subquery (this is higher than {@link #COST_JOIN_SUBQUERY}). + * we need to add a subquery (this is higher than {@link #COST_SUBQUERY}). */ static final double COST_JOIN_CROSS = 1e8; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java index 0c53257d0e1..886798c50bd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java @@ -316,7 +316,7 @@ public class DruidJoinQueryRel extends DruidRel double cost; if (computeLeftRequiresSubquery(getSomeDruidChild(left))) { - cost = CostEstimates.COST_JOIN_SUBQUERY; + cost = CostEstimates.COST_SUBQUERY; } else { cost = partialQuery.estimateCost(); if (joinRel.getJoinType() == JoinRelType.INNER && plannerConfig.isComputeInnerJoinCostAsFilter()) { @@ -325,7 +325,7 @@ public class DruidJoinQueryRel extends DruidRel } if (computeRightRequiresSubquery(getSomeDruidChild(right))) { - cost += CostEstimates.COST_JOIN_SUBQUERY; + cost += CostEstimates.COST_SUBQUERY; } if (joinRel.getCondition().isA(SqlKind.LITERAL) && !joinRel.getCondition().isAlwaysFalse()) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java index 32e4b746d7e..e26acf6d450 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java @@ -72,7 +72,7 @@ public class DruidOuterQueryRel extends DruidRel { return new DruidOuterQueryRel( sourceRel.getCluster(), - sourceRel.getTraitSet(), + sourceRel.getTraitSet().plusAll(partialQuery.getRelTraits()), sourceRel, partialQuery, sourceRel.getQueryMaker() @@ -217,6 +217,7 @@ public class DruidOuterQueryRel extends DruidRel { return planner.getCostFactory() .makeCost(partialQuery.estimateCost(), 0, 0) - .multiplyBy(CostEstimates.MULTIPLIER_OUTER_QUERY); + .multiplyBy(CostEstimates.MULTIPLIER_OUTER_QUERY) + .plus(planner.getCostFactory().makeCost(CostEstimates.COST_SUBQUERY, 0, 0)); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index cb674d2012e..acf6c8fccad 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -735,15 +735,14 @@ public class DruidQuery private Query computeQuery() { if (dataSource instanceof QueryDataSource) { - // If there is a subquery then the outer query must be a groupBy. + // If there is a subquery, then we prefer the outer query to be a groupBy if possible, since this potentially + // enables more efficient execution. (The groupBy query toolchest can handle some subqueries by itself, without + // requiring the Broker to inline results.) final GroupByQuery outerQuery = toGroupByQuery(); - if (outerQuery == null) { - // Bug in the planner rules. They shouldn't allow this to happen. - throw new IllegalStateException("Can't use QueryDataSource without an outer groupBy query!"); + if (outerQuery != null) { + return outerQuery; } - - return outerQuery; } final TimeseriesQuery tsQuery = toTimeseriesQuery(); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java index 7a721a18133..7108e604722 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java @@ -404,11 +404,10 @@ public class PartialDruidQuery { double cost = CostEstimates.COST_BASE; + // Account for the cost of post-scan expressions. if (getSelectProject() != null) { for (final RexNode rexNode : getSelectProject().getChildExps()) { - if (rexNode.isA(SqlKind.INPUT_REF)) { - cost += CostEstimates.COST_COLUMN_READ; - } else { + if (!rexNode.isA(SqlKind.INPUT_REF)) { cost += CostEstimates.COST_EXPRESSION; } } @@ -421,12 +420,6 @@ public class PartialDruidQuery } if (getAggregate() != null) { - if (getSelectProject() == null) { - // No projection before aggregation, that means the aggregate operator is reading things directly. - // Account for the costs. - cost += CostEstimates.COST_COLUMN_READ * getAggregate().getGroupSet().size(); - } - cost += CostEstimates.COST_DIMENSION * getAggregate().getGroupSet().size(); cost += CostEstimates.COST_AGGREGATION * getAggregate().getAggCallList().size(); } @@ -441,14 +434,27 @@ public class PartialDruidQuery } } + // Account for the cost of post-aggregation expressions. if (getAggregateProject() != null) { - cost += CostEstimates.COST_EXPRESSION * getAggregateProject().getChildExps().size(); + for (final RexNode rexNode : getAggregateProject().getChildExps()) { + if (!rexNode.isA(SqlKind.INPUT_REF)) { + cost += CostEstimates.COST_EXPRESSION; + } + } } + // Account for the cost of post-sort expressions. if (getSortProject() != null) { - cost += CostEstimates.COST_EXPRESSION * getSortProject().getChildExps().size(); + for (final RexNode rexNode : getSortProject().getChildExps()) { + if (!rexNode.isA(SqlKind.INPUT_REF)) { + cost += CostEstimates.COST_EXPRESSION; + } + } } + // Account for the cost of generating outputs. + cost += CostEstimates.COST_OUTPUT_COLUMN * getRowType().getFieldCount(); + return cost; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java index b692a986fb0..a8843eefacf 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java @@ -112,7 +112,11 @@ public class DruidJoinRule extends RelOptRule // Already verified to be present in "matches", so just call "get". // Can't be final, because we're going to reassign it up to a couple of times. - ConditionAnalysis conditionAnalysis = analyzeCondition(join.getCondition(), join.getLeft().getRowType(), right).get(); + ConditionAnalysis conditionAnalysis = analyzeCondition( + join.getCondition(), + join.getLeft().getRowType(), + right + ).get(); final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (left instanceof DruidQueryRel); if (left.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT @@ -147,10 +151,14 @@ public class DruidJoinRule extends RelOptRule final Project rightProject = right.getPartialDruidQuery().getSelectProject(); // Right-side projection expressions rewritten to be on top of the join. - Iterables.addAll( - newProjectExprs, - RexUtil.shift(rightProject.getProjects(), newLeft.getRowType().getFieldCount()) - ); + for (final RexNode rexNode : RexUtil.shift(rightProject.getProjects(), newLeft.getRowType().getFieldCount())) { + if (join.getJoinType().generatesNullsOnRight()) { + newProjectExprs.add(makeNullableIfLiteral(rexNode, rexBuilder)); + } else { + newProjectExprs.add(rexNode); + } + } + newRight = right.withPartialQuery(PartialDruidQuery.create(rightScan)); conditionAnalysis = conditionAnalysis.pushThroughRightProject(rightProject); } else { @@ -195,6 +203,19 @@ public class DruidJoinRule extends RelOptRule call.transformTo(relBuilder.build()); } + private static RexNode makeNullableIfLiteral(final RexNode rexNode, final RexBuilder rexBuilder) + { + if (rexNode.isA(SqlKind.LITERAL)) { + return rexBuilder.makeLiteral( + RexLiteral.value(rexNode), + rexBuilder.getTypeFactory().createTypeWithNullability(rexNode.getType(), true), + false + ); + } else { + return rexNode; + } + } + /** * Returns whether {@link #analyzeCondition} would return something. */ @@ -208,7 +229,11 @@ public class DruidJoinRule extends RelOptRule * If this condition is an AND of some combination of (1) literals; (2) equality conditions of the form * {@code f(LeftRel) = RightColumn}, then return a {@link ConditionAnalysis}. */ - private static Optional analyzeCondition(final RexNode condition, final RelDataType leftRowType, DruidRel right) + private static Optional analyzeCondition( + final RexNode condition, + final RelDataType leftRowType, + DruidRel right + ) { final List subConditions = decomposeAnd(condition); final List> equalitySubConditions = new ArrayList<>(); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java index b3931f8b78f..6ba4dc75b2f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java @@ -19,7 +19,6 @@ package org.apache.druid.sql.calcite.rule; -import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; @@ -38,9 +37,11 @@ import org.apache.druid.sql.calcite.rel.PartialDruidQuery; import java.util.List; import java.util.function.BiFunction; +import java.util.function.Predicate; public class DruidRules { + @SuppressWarnings("rawtypes") public static final Predicate CAN_BUILD_ON = druidRel -> druidRel.getPartialDruidQuery() != null; private DruidRules() @@ -88,10 +89,9 @@ public class DruidRules PartialDruidQuery::withSortProject ), DruidOuterQueryRule.AGGREGATE, - DruidOuterQueryRule.FILTER_AGGREGATE, - DruidOuterQueryRule.FILTER_PROJECT_AGGREGATE, - DruidOuterQueryRule.PROJECT_AGGREGATE, - DruidOuterQueryRule.AGGREGATE_SORT_PROJECT, + DruidOuterQueryRule.WHERE_FILTER, + DruidOuterQueryRule.SELECT_PROJECT, + DruidOuterQueryRule.SORT, DruidUnionRule.instance(), DruidUnionDataSourceRule.instance(), DruidSortUnionRule.instance(), @@ -111,7 +111,7 @@ public class DruidRules ) { super( - operand(relClass, operand(DruidRel.class, null, CAN_BUILD_ON, any())), + operand(relClass, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())), StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage) ); this.stage = stage; @@ -143,7 +143,7 @@ public class DruidRules public abstract static class DruidOuterQueryRule extends RelOptRule { public static final RelOptRule AGGREGATE = new DruidOuterQueryRule( - operand(Aggregate.class, operand(DruidRel.class, null, CAN_BUILD_ON, any())), + operand(Aggregate.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())), "AGGREGATE" ) { @@ -164,23 +164,21 @@ public class DruidRules } }; - public static final RelOptRule FILTER_AGGREGATE = new DruidOuterQueryRule( - operand(Aggregate.class, operand(Filter.class, operand(DruidRel.class, null, CAN_BUILD_ON, any()))), - "FILTER_AGGREGATE" + public static final RelOptRule WHERE_FILTER = new DruidOuterQueryRule( + operand(Filter.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())), + "WHERE_FILTER" ) { @Override public void onMatch(final RelOptRuleCall call) { - final Aggregate aggregate = call.rel(0); - final Filter filter = call.rel(1); - final DruidRel druidRel = call.rel(2); + final Filter filter = call.rel(0); + final DruidRel druidRel = call.rel(1); final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create( druidRel, PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel()) .withWhereFilter(filter) - .withAggregate(aggregate) ); if (outerQueryRel.isValidDruidQuery()) { call.transformTo(outerQueryRel); @@ -188,28 +186,21 @@ public class DruidRules } }; - public static final RelOptRule FILTER_PROJECT_AGGREGATE = new DruidOuterQueryRule( - operand( - Aggregate.class, - operand(Project.class, operand(Filter.class, operand(DruidRel.class, null, CAN_BUILD_ON, any()))) - ), - "FILTER_PROJECT_AGGREGATE" + public static final RelOptRule SELECT_PROJECT = new DruidOuterQueryRule( + operand(Project.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())), + "SELECT_PROJECT" ) { @Override public void onMatch(final RelOptRuleCall call) { - final Aggregate aggregate = call.rel(0); - final Project project = call.rel(1); - final Filter filter = call.rel(2); - final DruidRel druidRel = call.rel(3); + final Project filter = call.rel(0); + final DruidRel druidRel = call.rel(1); final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create( druidRel, PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel()) - .withWhereFilter(filter) - .withSelectProject(project) - .withAggregate(aggregate) + .withSelectProject(filter) ); if (outerQueryRel.isValidDruidQuery()) { call.transformTo(outerQueryRel); @@ -217,52 +208,21 @@ public class DruidRules } }; - public static final RelOptRule PROJECT_AGGREGATE = new DruidOuterQueryRule( - operand(Aggregate.class, operand(Project.class, operand(DruidRel.class, null, CAN_BUILD_ON, any()))), - "PROJECT_AGGREGATE" + public static final RelOptRule SORT = new DruidOuterQueryRule( + operand(Sort.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())), + "SORT" ) { @Override public void onMatch(final RelOptRuleCall call) { - final Aggregate aggregate = call.rel(0); - final Project project = call.rel(1); - final DruidRel druidRel = call.rel(2); + final Sort sort = call.rel(0); + final DruidRel druidRel = call.rel(1); final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create( druidRel, PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel()) - .withSelectProject(project) - .withAggregate(aggregate) - ); - if (outerQueryRel.isValidDruidQuery()) { - call.transformTo(outerQueryRel); - } - } - }; - - public static final RelOptRule AGGREGATE_SORT_PROJECT = new DruidOuterQueryRule( - operand( - Project.class, - operand(Sort.class, operand(Aggregate.class, operand(DruidRel.class, null, CAN_BUILD_ON, any()))) - ), - "AGGREGATE_SORT_PROJECT" - ) - { - @Override - public void onMatch(RelOptRuleCall call) - { - final Project sortProject = call.rel(0); - final Sort sort = call.rel(1); - final Aggregate aggregate = call.rel(2); - final DruidRel druidRel = call.rel(3); - - final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create( - druidRel, - PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel()) - .withAggregate(aggregate) .withSort(sort) - .withSortProject(sortProject) ); if (outerQueryRel.isValidDruidQuery()) { call.transformTo(outerQueryRel); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java index 2ce8842242d..a933dcd2059 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java @@ -131,7 +131,12 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest .setDimensions(new DefaultDimensionSpec("d1", "_d0")) .setAggregatorSpecs( new LongSumAggregatorFactory("_a0:sum", "a0"), - new CountAggregatorFactory("_a0:count") + useDefault + ? new CountAggregatorFactory("_a0:count") + : new FilteredAggregatorFactory( + new CountAggregatorFactory("_a0:count"), + not(selector("a0", null, null)) + ) ) .setPostAggregatorSpecs(Collections.singletonList(new ArithmeticPostAggregator( "_a0", diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 17724c44070..91d02f8720b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -437,7 +437,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) .intervals(querySegmentSpec(Filtration.eternity())) .limit(10) - .columns("dim2", "j0.m1", "m1", "m2") + .columns("dim2", "m2") .context(QUERY_CONTEXT_DEFAULT) .build() ) @@ -2161,6 +2161,42 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testOrderThenLimitThenFilter() throws Exception + { + testQuery( + "SELECT dim1 FROM " + + "(SELECT __time, dim1 FROM druid.foo ORDER BY __time DESC LIMIT 4) " + + "WHERE dim1 IN ('abc', 'def')", + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + new QueryDataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("__time", "dim1")) + .limit(4) + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns(ImmutableList.of("dim1")) + .filters(in("dim1", Arrays.asList("abc", "def"), null)) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"abc"}, + new Object[]{"def"} + ) + ); + } + @Test public void testGroupBySingleColumnDescendingNoTopN() throws Exception { @@ -7966,8 +8002,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions(dimensions( - new DefaultDimensionSpec("dim2", "d0"), - new DefaultDimensionSpec("dim1", "d1") + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") )) .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) .setContext(QUERY_CONTEXT_DEFAULT) @@ -7976,12 +8012,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) - .setDimensions(dimensions(new DefaultDimensionSpec("d0", "_d0"))) + .setDimensions(dimensions(new DefaultDimensionSpec("d1", "_d0"))) .setAggregatorSpecs(aggregators( new LongSumAggregatorFactory("_a0", "a0"), new FilteredAggregatorFactory( new CountAggregatorFactory("_a1"), - not(selector("d1", null, null)) + not(selector("d0", null, null)) ) )) .setContext(QUERY_CONTEXT_DEFAULT) @@ -8212,8 +8248,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions(dimensions( - new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE), - new DefaultDimensionSpec("dim1", "d1") + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE) )) .setDimFilter(new SelectorDimFilter("m1", "5.0", null)) .setAggregatorSpecs(aggregators(new LongMaxAggregatorFactory("a0", "__time"))) @@ -8231,7 +8267,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) .setDimensions(dimensions( new DefaultDimensionSpec("v0", "_d0", ColumnType.LONG), - new DefaultDimensionSpec("d1", "_d1", ColumnType.STRING) + new DefaultDimensionSpec("d0", "_d1", ColumnType.STRING) )) .setAggregatorSpecs( aggregators( @@ -8239,7 +8275,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ? new CountAggregatorFactory("_a0") : new FilteredAggregatorFactory( new CountAggregatorFactory("_a0"), - not(selector("d0", null, null)) + not(selector("d1", null, null)) ) ) ) @@ -8513,10 +8549,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest new FieldAccessPostAggregator(null, "_a2:count") ) ), - expressionPostAgg("s0", "timestamp_extract(\"_a3\",'EPOCH','UTC')") + expressionPostAgg("p0", "timestamp_extract(\"_a3\",'EPOCH','UTC')") ) ) - .setLimit(1) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), @@ -9478,8 +9513,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setGranularity(Granularities.ALL) .setDimensions( dimensions( - new DefaultDimensionSpec("dim2", "d0"), - new DefaultDimensionSpec("dim1", "d1") + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") ) ) .setAggregatorSpecs( @@ -9490,7 +9525,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setLimitSpec( new DefaultLimitSpec( ImmutableList.of( - new OrderByColumnSpec("d1", OrderByColumnSpec.Direction.ASCENDING) + new OrderByColumnSpec("d0", OrderByColumnSpec.Direction.ASCENDING) ), 4 ) @@ -15352,7 +15387,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest newScanQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) .intervals(querySegmentSpec(Filtration.eternity())) - .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") + .virtualColumns(expressionVirtualColumn("v0", "0", ColumnType.LONG)) + .columns("v0") .limit(10) .context(QUERY_CONTEXT_DEFAULT) .build() @@ -15593,27 +15629,40 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .setDataSource(CalciteTests.DATASOURCE1) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) - .setDimensions(dimensions( - new DefaultDimensionSpec("__time", "d0", ColumnType.LONG), - new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE), - new DefaultDimensionSpec("dim1", "d2") - )) + .setDimensions( + useDefault + ? dimensions( + new DefaultDimensionSpec("__time", "d0", ColumnType.LONG), + new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE), + new DefaultDimensionSpec("dim1", "d2") + ) : dimensions( + new DefaultDimensionSpec("__time", "d0", ColumnType.LONG), + new DefaultDimensionSpec("dim1", "d1"), + new DefaultDimensionSpec("m2", "d2", ColumnType.DOUBLE) + ) + ) .setContext(QUERY_CONTEXT_DEFAULT) .build() ) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) - .setDimensions(dimensions( - new DefaultDimensionSpec("d0", "_d0", ColumnType.LONG), - new DefaultDimensionSpec("d2", "_d1", ColumnType.STRING) - )) + .setDimensions( + useDefault + ? dimensions( + new DefaultDimensionSpec("d0", "_d0", ColumnType.LONG), + new DefaultDimensionSpec("d2", "_d1", ColumnType.STRING) + ) : dimensions( + new DefaultDimensionSpec("d0", "_d0", ColumnType.LONG), + new DefaultDimensionSpec("d1", "_d1", ColumnType.STRING) + ) + ) .setAggregatorSpecs( aggregators( useDefault ? new CountAggregatorFactory("a0") : new FilteredAggregatorFactory( new CountAggregatorFactory("a0"), - not(selector("d1", null, null)) + not(selector("d2", null, null)) ) ) ) @@ -17621,7 +17670,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest if (useDefault) { rightTable = InlineDataSource.fromIterable( ImmutableList.of(), - RowSignature.builder().add("dim2", ColumnType.STRING).build() + RowSignature.builder().add("dim2", ColumnType.STRING).add("m2", ColumnType.DOUBLE).build() ); } else { rightTable = new QueryDataSource(