From 0a56c87e93898bde2d0fb4ffafdbd793834632da Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Wed, 29 Nov 2023 13:46:11 +0530 Subject: [PATCH] SQL: Plan non-equijoin conditions as cross join followed by filter (#15302) This PR revives #14978 with a few more bells and whistles. Instead of an unconditional cross-join, we will now split the join condition such that some conditions are now evaluated post-join. To decide what sub-condition goes where, I have refactored DruidJoinRule class to extract unsupported sub-conditions. We build a postJoinFilter out of these unsupported sub-conditions and push to the join. --- docs/querying/datasource.md | 38 +- .../apache/druid/msq/exec/MSQSelectTest.java | 2 +- .../msq/test/CalciteSelectQueryMSQTest.java | 9 +- .../druid/sql/calcite/rule/DruidJoinRule.java | 389 ++++++++++-------- .../sql/calcite/CalciteJoinQueryTest.java | 135 +++++- .../druid/sql/calcite/CalciteQueryTest.java | 27 +- .../druid/sql/calcite/NotYetSupported.java | 22 +- .../sql/calcite/rule/DruidJoinRuleTest.java | 106 ++++- website/.spelling | 2 + 9 files changed, 508 insertions(+), 222 deletions(-) diff --git a/docs/querying/datasource.md b/docs/querying/datasource.md index 5853f1e6dfc..5a1118deff7 100644 --- a/docs/querying/datasource.md +++ b/docs/querying/datasource.md @@ -320,12 +320,16 @@ Join datasources allow you to do a SQL-style join of two datasources. Stacking j you to join arbitrarily many datasources. In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a broadcast hash-join algorithm. This means -that all datasources other than the leftmost "base" datasource must fit in memory. It also means that the join condition -must be an equality. This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), -[inline](#inline), and [query](#query) datasources. +that all datasources other than the leftmost "base" datasource must fit in memory. In native queries, the join condition +must be an equality. In SQL, any join condition is accepted, but only equalities of a certain form +(see [Joins in SQL](#joins-in-sql)) execute efficiently as part of a native join. For other kinds of conditions, planner will try +to re-arrange condition such that some of the sub-conditions are evaluated as a filter on top of join and other +sub-conditions are left out in the join condition. In worst case scenario, SQL will execute the join condition as a +cross join (cartesian product) plus a filter. -Refer to the [Query execution](query-execution.md#join) page for more details on how queries are executed when you -use join datasources. +This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), [inline](#inline), and +[query](#query) datasources. Refer to the [Query execution](query-execution.md#join) page for more details on how +queries are executed when you use join datasources. #### Joins in SQL @@ -335,21 +339,23 @@ SQL joins take the form: [ INNER | LEFT [OUTER] ] JOIN ON ``` -The condition must involve only equalities, but functions are okay, and there can be multiple equalities ANDed together. -Conditions like `t1.x = t2.x`, or `LOWER(t1.x) = t2.x`, or `t1.x = t2.x AND t1.y = t2.y` can all be handled. Conditions -like `t1.x <> t2.x` cannot currently be handled. +Any condition is accepted, but only certain kinds of conditions execute efficiently +as a native join. The condition must be a single clause like the following, or an `AND` of clauses involving at +least one of the following: -Note that Druid SQL is less rigid than what native join datasources can handle. In cases where a SQL query does -something that is not allowed as-is with a native join datasource, Druid SQL will generate a subquery. This can have -a substantial effect on performance and scalability, so it is something to watch out for. Some examples of when the -SQL layer will generate subqueries include: +- Equality between fields of the same type on each side, like `t1 JOIN t2 ON t1.x = t2.x`. +- Equality between a function call on one side, and a field on the other side, like `t1 JOIN t2 ON LOWER(t1.x) = t2.x`. +- The equality operator may be `=` (which does not match nulls) or `IS NOT DISTINCT FROM` (which does match nulls). -- Joining a regular Druid table to itself, or to another regular Druid table. The native join datasource can accept -a table on the left-hand side, but not the right, so a subquery is needed. +In other cases, Druid will either insert a subquery below the join, or will use a cross join (cartesian product) +followed by a filter. Joins executed in these ways may run into resource or performance constraints. To determine +if your query is using one of these execution paths, run `EXPLAIN PLAN FOR ` and look for the following: -- Join conditions where the expressions on either side are of different types. +- `query` type datasources under the `left` or `right` key of your `join` datasource. +- `join` type datasource with `condition` set to `"1"` (cartesian product) followed by a `filter` that encodes the + condition you provided. -- Join conditions where the right-hand expression is not a direct column access. +In these cases, you may be able to improve the performance of your query by rewriting it. For more information about how Druid translates SQL to native queries, refer to the [Druid SQL](sql-translation.md) documentation. diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index e1c671a8665..c0dfe0f77f3 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -772,7 +772,7 @@ public class MSQSelectTest extends MSQTestBase DruidExpression.ofColumn(ColumnType.STRING, "dim2"), DruidExpression.ofColumn(ColumnType.STRING, "j0.k") ), - JoinType.LEFT + NullHandling.sqlCompatible() ? JoinType.INNER : JoinType.LEFT ) ) .setInterval(querySegmentSpec(Filtration.eternity())) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java index c83ec91f454..974eed48734 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java @@ -145,7 +145,14 @@ public class CalciteSelectQueryMSQTest extends CalciteQueryTest @Ignore @Override - public void testUnplannableQueries() + public void testUnplannableScanOrderByNonTime() + { + + } + + @Ignore + @Override + public void testUnplannableJoinQueriesInNonSQLCompatibleMode() { } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java index 6dc8ff00531..35e3e6eca80 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidJoinRule.java @@ -28,8 +28,10 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; @@ -43,6 +45,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; import org.apache.druid.query.LookupDataSource; import org.apache.druid.sql.calcite.planner.PlannerContext; @@ -54,7 +57,6 @@ import org.apache.druid.sql.calcite.rel.PartialDruidQuery; import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.Stack; import java.util.stream.Collectors; @@ -82,7 +84,7 @@ public class DruidJoinRule extends RelOptRule { return new DruidJoinRule(plannerContext); } - + @Override public boolean matches(RelOptRuleCall call) { @@ -93,9 +95,14 @@ public class DruidJoinRule extends RelOptRule // 1) Can handle the join condition as a native join. // 2) Left has a PartialDruidQuery (i.e., is a real query, not top-level UNION ALL). // 3) Right has a PartialDruidQuery (i.e., is a real query, not top-level UNION ALL). - return canHandleCondition(join.getCondition(), join.getLeft().getRowType(), right, join.getCluster().getRexBuilder()) - && left.getPartialDruidQuery() != null - && right.getPartialDruidQuery() != null; + return canHandleCondition( + join.getCondition(), + join.getLeft().getRowType(), + right, + join.getJoinType(), + join.getSystemFieldList(), + join.getCluster().getRexBuilder() + ) && left.getPartialDruidQuery() != null && right.getPartialDruidQuery() != null; } @Override @@ -112,14 +119,12 @@ public class DruidJoinRule extends RelOptRule final Filter leftFilter; final List newProjectExprs = new ArrayList<>(); - // Already verified to be present in "matches", so just call "get". // Can't be final, because we're going to reassign it up to a couple of times. ConditionAnalysis conditionAnalysis = analyzeCondition( join.getCondition(), join.getLeft().getRowType(), - right, rexBuilder - ).get(); + ); final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (left instanceof DruidQueryRel); if (!plannerContext.getJoinAlgorithm().requiresSubquery() @@ -184,7 +189,7 @@ public class DruidJoinRule extends RelOptRule final DruidJoinQueryRel druidJoin = DruidJoinQueryRel.create( join.copy( join.getTraitSet(), - conditionAnalysis.getCondition(rexBuilder), + conditionAnalysis.getConditionWithUnsupportedSubConditionsIgnored(rexBuilder), newLeft, newRight, join.getJoinType(), @@ -194,7 +199,7 @@ public class DruidJoinRule extends RelOptRule left.getPlannerContext() ); - final RelBuilder relBuilder = + RelBuilder relBuilder = call.builder() .push(druidJoin) .project( @@ -205,6 +210,12 @@ public class DruidJoinRule extends RelOptRule ) ); + // Build a post-join filter with whatever join sub-conditions were not supported. + RexNode postJoinFilter = RexUtil.composeConjunction(rexBuilder, conditionAnalysis.getUnsupportedOnSubConditions(), true); + if (postJoinFilter != null) { + relBuilder = relBuilder.filter(postJoinFilter); + } + call.transformTo(relBuilder.build()); } @@ -222,30 +233,214 @@ public class DruidJoinRule extends RelOptRule } /** - * Returns whether {@link #analyzeCondition} would return something. + * Returns whether we can handle the join condition. In case, some conditions in an AND expression are not supported, + * they are extracted into a post-join filter instead. */ @VisibleForTesting - boolean canHandleCondition(final RexNode condition, final RelDataType leftRowType, DruidRel right, final RexBuilder rexBuilder) + public boolean canHandleCondition( + final RexNode condition, + final RelDataType leftRowType, + DruidRel right, + JoinRelType joinType, + List systemFieldList, + final RexBuilder rexBuilder + ) { - return analyzeCondition(condition, leftRowType, right, rexBuilder).isPresent(); + ConditionAnalysis conditionAnalysis = analyzeCondition(condition, leftRowType, rexBuilder); + // if the right side requires a subquery, then even lookup will be transformed to a QueryDataSource + // thereby allowing join conditions on both k and v columns of the lookup + if (right != null + && !DruidJoinQueryRel.computeRightRequiresSubquery(plannerContext, DruidJoinQueryRel.getSomeDruidChild(right)) + && right instanceof DruidQueryRel) { + DruidQueryRel druidQueryRel = (DruidQueryRel) right; + if (druidQueryRel.getDruidTable().getDataSource() instanceof LookupDataSource) { + long distinctRightColumns = conditionAnalysis.rightColumns.stream().map(RexSlot::getIndex).distinct().count(); + if (distinctRightColumns > 1) { + // it means that the join's right side is lookup and the join condition contains both key and value columns of lookup. + // currently, the lookup datasource in the native engine doesn't support using value column in the join condition. + plannerContext.setPlanningError( + "SQL is resulting in a join involving lookup where value column is used in the condition."); + return false; + } + } + } + + if (joinType != JoinRelType.INNER || !systemFieldList.isEmpty() || NullHandling.replaceWithDefault()) { + // I am not sure in what case, the list of system fields will be not empty. I have just picked up this logic + // directly from https://github.com/apache/calcite/blob/calcite-1.35.0/core/src/main/java/org/apache/calcite/rel/rules/AbstractJoinExtractFilterRule.java#L58 + + // Also to avoid results changes for existing queries in non-null handling mode, we don't handle unsupported + // conditions. Otherwise, some left/right joins with a condition that doesn't allow nulls on join input will + // be converted to inner joins. See Test CalciteJoinQueryTest#testFilterAndGroupByLookupUsingJoinOperatorBackwards + // for an example. + return conditionAnalysis.getUnsupportedOnSubConditions().isEmpty(); + } + + return true; + } + + public static class ConditionAnalysis + { + /** + * Number of fields on the left-hand side. Useful for identifying if a particular field is from on the left + * or right side of a join. + */ + private final int numLeftFields; + + /** + * Each equality subcondition is an equality of the form f(LeftRel) = g(RightRel). + */ + private final List equalitySubConditions; + + /** + * Each literal subcondition is... a literal. + */ + private final List literalSubConditions; + + /** + * Sub-conditions in join clause that cannot be handled by the DruidJoinRule. + */ + private final List unsupportedOnSubConditions; + + private final Set rightColumns; + + ConditionAnalysis( + int numLeftFields, + List equalitySubConditions, + List literalSubConditions, + List unsupportedOnSubConditions, + Set rightColumns + ) + { + this.numLeftFields = numLeftFields; + this.equalitySubConditions = equalitySubConditions; + this.literalSubConditions = literalSubConditions; + this.unsupportedOnSubConditions = unsupportedOnSubConditions; + this.rightColumns = rightColumns; + } + + public ConditionAnalysis pushThroughLeftProject(final Project leftProject) + { + // Pushing through the project will shift right-hand field references by this amount. + final int rhsShift = + leftProject.getInput().getRowType().getFieldCount() - leftProject.getRowType().getFieldCount(); + + // We leave unsupportedSubConditions un-touched as they are evaluated above join anyway. + return new ConditionAnalysis( + leftProject.getInput().getRowType().getFieldCount(), + equalitySubConditions + .stream() + .map( + equality -> new RexEquality( + RelOptUtil.pushPastProject(equality.left, leftProject), + (RexInputRef) RexUtil.shift(equality.right, rhsShift), + equality.kind + ) + ) + .collect(Collectors.toList()), + literalSubConditions, + unsupportedOnSubConditions, + rightColumns + ); + } + + public ConditionAnalysis pushThroughRightProject(final Project rightProject) + { + Preconditions.checkArgument(onlyUsesMappingsFromRightProject(rightProject), "Cannot push through"); + + // We leave unsupportedSubConditions un-touched as they are evaluated above join anyway. + return new ConditionAnalysis( + numLeftFields, + equalitySubConditions + .stream() + .map( + equality -> new RexEquality( + equality.left, + (RexInputRef) RexUtil.shift( + RelOptUtil.pushPastProject( + RexUtil.shift(equality.right, -numLeftFields), + rightProject + ), + numLeftFields + ), + equality.kind + ) + ) + .collect(Collectors.toList()), + literalSubConditions, + unsupportedOnSubConditions, + rightColumns + ); + } + + public boolean onlyUsesMappingsFromRightProject(final Project rightProject) + { + for (final RexEquality equality : equalitySubConditions) { + final int rightIndex = equality.right.getIndex() - numLeftFields; + + if (!rightProject.getProjects().get(rightIndex).isA(SqlKind.INPUT_REF)) { + return false; + } + } + + return true; + } + + public RexNode getConditionWithUnsupportedSubConditionsIgnored(final RexBuilder rexBuilder) + { + return RexUtil.composeConjunction( + rexBuilder, + Iterables.concat( + literalSubConditions, + equalitySubConditions + .stream() + .map(equality -> equality.makeCall(rexBuilder)) + .collect(Collectors.toList()) + ), + false + ); + } + + public List getUnsupportedOnSubConditions() + { + return unsupportedOnSubConditions; + } + + @Override + public String toString() + { + return "ConditionAnalysis{" + + "numLeftFields=" + numLeftFields + + ", equalitySubConditions=" + equalitySubConditions + + ", literalSubConditions=" + literalSubConditions + + ", unsupportedSubConditions=" + unsupportedOnSubConditions + + ", rightColumns=" + rightColumns + + '}'; + } } /** - * If this condition is an AND of some combination of (1) literals; (2) equality conditions of the form + * If this condition is an AND of some combination of + * (1) literals; + * (2) equality conditions of the form + * (3) unsupported conditions + *

+ * Returns empty if the join cannot be supported at all. It can return non-empty with some unsupported conditions + * that can be extracted into post join filter. * {@code f(LeftRel) = RightColumn}, then return a {@link ConditionAnalysis}. */ - private Optional analyzeCondition( + public ConditionAnalysis analyzeCondition( final RexNode condition, final RelDataType leftRowType, - final DruidRel right, final RexBuilder rexBuilder ) { final List subConditions = decomposeAnd(condition); final List equalitySubConditions = new ArrayList<>(); final List literalSubConditions = new ArrayList<>(); - final int numLeftFields = leftRowType.getFieldCount(); + final List unSupportedSubConditions = new ArrayList<>(); final Set rightColumns = new HashSet<>(); + final int numLeftFields = leftRowType.getFieldCount(); for (RexNode subCondition : subConditions) { if (RexUtil.isLiteral(subCondition, true)) { @@ -259,8 +454,9 @@ public class DruidJoinRule extends RelOptRule // If the types are the same, unwrap the cast and use the underlying literal. literalSubConditions.add((RexLiteral) call.getOperands().get(0)); } else { - // If the types are not the same, return Optional.empty() indicating the condition is not supported. - return Optional.empty(); + // If the types are not the same, add to unsupported conditions. + unSupportedSubConditions.add(subCondition); + continue; } } else { // Literals are always OK. @@ -284,7 +480,8 @@ public class DruidJoinRule extends RelOptRule subCondition.getKind(), secondOperand.getType().getSqlTypeName() ); - return Optional.empty(); + unSupportedSubConditions.add(subCondition); + continue; } } else if (subCondition.isA(SqlKind.EQUALS) || subCondition.isA(SqlKind.IS_NOT_DISTINCT_FROM)) { @@ -299,7 +496,8 @@ public class DruidJoinRule extends RelOptRule "SQL requires a join with '%s' condition that is not supported.", subCondition.getKind() ); - return Optional.empty(); + unSupportedSubConditions.add(subCondition); + continue; } if (isLeftExpression(firstOperand, numLeftFields) && isRightInputRef(secondOperand, numLeftFields)) { @@ -312,34 +510,16 @@ public class DruidJoinRule extends RelOptRule } else { // Cannot handle this condition. plannerContext.setPlanningError("SQL is resulting in a join that has unsupported operand types."); - return Optional.empty(); + unSupportedSubConditions.add(subCondition); } } - // if the right side requires a subquery, then even lookup will be transformed to a QueryDataSource - // thereby allowing join conditions on both k and v columns of the lookup - if (right != null - && !DruidJoinQueryRel.computeRightRequiresSubquery(plannerContext, DruidJoinQueryRel.getSomeDruidChild(right)) - && right instanceof DruidQueryRel) { - DruidQueryRel druidQueryRel = (DruidQueryRel) right; - if (druidQueryRel.getDruidTable().getDataSource() instanceof LookupDataSource) { - long distinctRightColumns = rightColumns.stream().map(RexSlot::getIndex).distinct().count(); - if (distinctRightColumns > 1) { - // it means that the join's right side is lookup and the join condition contains both key and value columns of lookup. - // currently, the lookup datasource in the native engine doesn't support using value column in the join condition. - plannerContext.setPlanningError( - "SQL is resulting in a join involving lookup where value column is used in the condition."); - return Optional.empty(); - } - } - } - - return Optional.of( - new ConditionAnalysis( - numLeftFields, - equalitySubConditions, - literalSubConditions - ) + return new ConditionAnalysis( + numLeftFields, + equalitySubConditions, + literalSubConditions, + unSupportedSubConditions, + rightColumns ); } @@ -369,7 +549,7 @@ public class DruidJoinRule extends RelOptRule return retVal; } - private boolean isLeftExpression(final RexNode rexNode, final int numLeftFields) + private static boolean isLeftExpression(final RexNode rexNode, final int numLeftFields) { return ImmutableBitSet.range(numLeftFields).contains(RelOptUtil.InputFinder.bits(rexNode)); } @@ -379,121 +559,6 @@ public class DruidJoinRule extends RelOptRule return rexNode.isA(SqlKind.INPUT_REF) && ((RexInputRef) rexNode).getIndex() >= numLeftFields; } - static class ConditionAnalysis - { - /** - * Number of fields on the left-hand side. Useful for identifying if a particular field is from on the left - * or right side of a join. - */ - private final int numLeftFields; - - /** - * Each equality subcondition is an equality of the form f(LeftRel) = g(RightRel). - */ - private final List equalitySubConditions; - - /** - * Each literal subcondition is... a literal. - */ - private final List literalSubConditions; - - ConditionAnalysis( - int numLeftFields, - List equalitySubConditions, - List literalSubConditions - ) - { - this.numLeftFields = numLeftFields; - this.equalitySubConditions = equalitySubConditions; - this.literalSubConditions = literalSubConditions; - } - - public ConditionAnalysis pushThroughLeftProject(final Project leftProject) - { - // Pushing through the project will shift right-hand field references by this amount. - final int rhsShift = - leftProject.getInput().getRowType().getFieldCount() - leftProject.getRowType().getFieldCount(); - - return new ConditionAnalysis( - leftProject.getInput().getRowType().getFieldCount(), - equalitySubConditions - .stream() - .map( - equality -> new RexEquality( - RelOptUtil.pushPastProject(equality.left, leftProject), - (RexInputRef) RexUtil.shift(equality.right, rhsShift), - equality.kind - ) - ) - .collect(Collectors.toList()), - literalSubConditions - ); - } - - public ConditionAnalysis pushThroughRightProject(final Project rightProject) - { - Preconditions.checkArgument(onlyUsesMappingsFromRightProject(rightProject), "Cannot push through"); - - return new ConditionAnalysis( - numLeftFields, - equalitySubConditions - .stream() - .map( - equality -> new RexEquality( - equality.left, - (RexInputRef) RexUtil.shift( - RelOptUtil.pushPastProject( - RexUtil.shift(equality.right, -numLeftFields), - rightProject - ), - numLeftFields - ), - equality.kind - ) - ) - .collect(Collectors.toList()), - literalSubConditions - ); - } - - public boolean onlyUsesMappingsFromRightProject(final Project rightProject) - { - for (final RexEquality equality : equalitySubConditions) { - final int rightIndex = equality.right.getIndex() - numLeftFields; - - if (!rightProject.getProjects().get(rightIndex).isA(SqlKind.INPUT_REF)) { - return false; - } - } - - return true; - } - - public RexNode getCondition(final RexBuilder rexBuilder) - { - return RexUtil.composeConjunction( - rexBuilder, - Iterables.concat( - literalSubConditions, - equalitySubConditions - .stream() - .map(equality -> equality.makeCall(rexBuilder)) - .collect(Collectors.toList()) - ), - false - ); - } - - @Override - public String toString() - { - return "ConditionAnalysis{" + - "numLeftFields=" + numLeftFields + - ", equalitySubConditions=" + equalitySubConditions + - ", literalSubConditions=" + literalSubConditions + - '}'; - } - } /** * Like {@link org.apache.druid.segment.join.Equality} but uses {@link RexNode} instead of diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index ea74678ba53..a8e16de2675 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -723,7 +723,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest ), "j0.", equalsCondition(makeColumnExpression("k"), makeColumnExpression("j0.dim2")), - JoinType.RIGHT + NullHandling.sqlCompatible() ? JoinType.INNER : JoinType.RIGHT ) ) .setInterval(querySegmentSpec(Filtration.eternity())) @@ -740,7 +740,6 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest new Object[]{"xabc", 1L} ) : ImmutableList.of( - new Object[]{null, 5L}, new Object[]{"xabc", 1L} ) ); @@ -768,7 +767,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest new LookupDataSource("lookyloo"), "j0.", equalsCondition(makeColumnExpression("dim2"), makeColumnExpression("j0.k")), - JoinType.LEFT + NullHandling.sqlCompatible() ? JoinType.INNER : JoinType.LEFT ) ) .setInterval(querySegmentSpec(Filtration.eternity())) @@ -784,8 +783,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest new Object[]{NULL_STRING, 3L}, new Object[]{"xabc", 1L} ) - : - ImmutableList.of( + : ImmutableList.of( new Object[]{"xabc", 1L} ) ); @@ -821,7 +819,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest new LookupDataSource("lookyloo"), "j0.", equalsCondition(makeColumnExpression("dim2"), makeColumnExpression("j0.k")), - JoinType.LEFT + NullHandling.sqlCompatible() ? JoinType.INNER : JoinType.LEFT ) ) .setInterval(querySegmentSpec(Filtration.eternity())) @@ -1965,17 +1963,25 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest .build() ), "_j0.", - "1", + NullHandling.sqlCompatible() ? + equalsCondition( + DruidExpression.fromExpression("CAST(\"j0.k\", 'LONG')"), + DruidExpression.ofColumn(ColumnType.LONG, "_j0.cnt") + ) + : "1", JoinType.INNER ) ) .intervals(querySegmentSpec(Filtration.eternity())) .granularity(Granularities.ALL) .aggregators(new CountAggregatorFactory("a0")) - .filters(and( - expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))"), - expressionFilter("(CAST(\"j0.k\", 'LONG') == \"_j0.cnt\")") - )) + .filters( + NullHandling.sqlCompatible() ? + expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))") + : and( + expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))"), + expressionFilter("(CAST(\"j0.k\", 'LONG') == \"_j0.cnt\")") + )) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -4558,6 +4564,113 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest ); } + @Test + @Parameters(source = QueryContextForJoinProvider.class) + public void testJoinWithNonEquiCondition(Map queryContext) + { + // Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a + // cross join with a filter. + cannotVectorize(); + + // We don't handle non-equi join conditions for non-sql compatible mode. + Assume.assumeFalse(NullHandling.replaceWithDefault()); + + testQuery( + "SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 > y.m1", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("m1") + .context(queryContext) + .build() + ), + "j0.", + "1", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters(expressionFilter("(\"m1\" > \"j0.m1\")")) + .columns("j0.m1", "m1") + .context(queryContext) + .build() + ), + sortIfSortBased( + ImmutableList.of( + new Object[]{2.0f, 1.0f}, + new Object[]{3.0f, 1.0f}, + new Object[]{3.0f, 2.0f}, + new Object[]{4.0f, 1.0f}, + new Object[]{4.0f, 2.0f}, + new Object[]{4.0f, 3.0f}, + new Object[]{5.0f, 1.0f}, + new Object[]{5.0f, 2.0f}, + new Object[]{5.0f, 3.0f}, + new Object[]{5.0f, 4.0f}, + new Object[]{6.0f, 1.0f}, + new Object[]{6.0f, 2.0f}, + new Object[]{6.0f, 3.0f}, + new Object[]{6.0f, 4.0f}, + new Object[]{6.0f, 5.0f} + ), + 1, + 0 + ) + ); + } + + @Test + @Parameters(source = QueryContextForJoinProvider.class) + public void testJoinWithEquiAndNonEquiCondition(Map queryContext) + { + // Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a + // cross join with a filter. + cannotVectorize(); + + // We don't handle non-equi join conditions for non-sql compatible mode. + Assume.assumeFalse(NullHandling.replaceWithDefault()); + + testQuery( + "SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 = y.m1 AND x.m1 + y.m1 = 6.0", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("m1") + .context(queryContext) + .build() + ), + "j0.", + equalsCondition(makeColumnExpression("m1"), makeColumnExpression("j0.m1")), + JoinType.INNER + ) + ) + .virtualColumns(expressionVirtualColumn("v0", "(\"m1\" + \"j0.m1\")", ColumnType.DOUBLE)) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters( + equality("v0", 6.0, ColumnType.DOUBLE) + ) + .columns("j0.m1", "m1") + .context(queryContext) + .build() + ), + ImmutableList.of(new Object[]{3.0f, 3.0f}) + ); + } + @Test @Parameters(source = QueryContextForJoinProvider.class) public void testUsingSubqueryAsPartOfAndFilter(Map queryContext) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index e03130a04f1..00ea933bb1e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -126,6 +126,7 @@ import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; +import org.junit.Assume; import org.junit.Ignore; import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; @@ -5414,32 +5415,40 @@ public class CalciteQueryTest extends BaseCalciteQueryTest @NotYetSupported(Modes.ERROR_HANDLING) @Test - public void testUnplannableQueries() + public void testUnplannableScanOrderByNonTime() { msqIncompatible(); // All of these queries are unplannable because they rely on features Druid doesn't support. // This test is here to confirm that we don't fall back to Calcite's interpreter or enumerable implementation. // It's also here so when we do support these features, we can have "real" tests for these queries. - final Map queries = ImmutableMap.of( - // SELECT query with order by non-__time. + assertQueryIsUnplannable( "SELECT dim1 FROM druid.foo ORDER BY dim1", - "SQL query requires ordering a table by non-time column [[dim1]], which is not supported.", + "SQL query requires ordering a table by non-time column [[dim1]], which is not supported." + ); + } + @NotYetSupported(Modes.ERROR_HANDLING) + @Test + public void testUnplannableJoinQueriesInNonSQLCompatibleMode() + { + msqIncompatible(); + + Assume.assumeFalse(NullHandling.sqlCompatible()); + + assertQueryIsUnplannable( // JOIN condition with not-equals (<>). "SELECT foo.dim1, foo.dim2, l.k, l.v\n" + "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k", - "SQL requires a join with 'NOT_EQUALS' condition that is not supported.", + "SQL requires a join with 'NOT_EQUALS' condition that is not supported." + ); + assertQueryIsUnplannable( // JOIN condition with a function of both sides. "SELECT foo.dim1, foo.dim2, l.k, l.v\n" + "FROM foo INNER JOIN lookup.lookyloo l ON CHARACTER_LENGTH(foo.dim2 || l.k) > 3\n", "SQL requires a join with 'GREATER_THAN' condition that is not supported." ); - - for (final Map.Entry queryErrorPair : queries.entrySet()) { - assertQueryIsUnplannable(queryErrorPair.getKey(), queryErrorPair.getValue()); - } } @Test diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java index c660e0cae21..a2372f3e0e5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java @@ -129,11 +129,27 @@ public @interface NotYetSupported public void evaluate() { Modes ignoreMode = annotation.value(); - Throwable e = assertThrows( + Throwable e = null; + try { + base.evaluate(); + } + catch (Throwable t) { + e = t; + } + // If the base test case is supposed to be ignored already, just skip the further evaluation + if (e instanceof AssumptionViolatedException) { + throw (AssumptionViolatedException) e; + } + Throwable finalE = e; + assertThrows( "Expected that this testcase will fail - it might got fixed; or failure have changed?", ignoreMode.throwableClass, - base::evaluate - ); + () -> { + if (finalE != null) { + throw finalE; + } + } + ); String trace = Throwables.getStackTraceAsString(e); Matcher m = annotation.value().getPattern().matcher(trace); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java index e531580162e..fa76ba3a3c9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/rule/DruidJoinRuleTest.java @@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite.rule; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; @@ -29,6 +30,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.QueryContext; import org.apache.druid.sql.calcite.planner.DruidTypeSystem; import org.apache.druid.sql.calcite.planner.JoinAlgorithm; @@ -39,6 +41,7 @@ import org.junit.Test; import org.mockito.Mockito; import java.math.BigDecimal; +import java.util.Collections; import java.util.List; public class DruidJoinRuleTest @@ -67,6 +70,7 @@ public class DruidJoinRuleTest @Before public void setup() { + NullHandling.initializeForTests(); PlannerContext plannerContext = Mockito.mock(PlannerContext.class); Mockito.when(plannerContext.queryContext()).thenReturn(QueryContext.empty()); Mockito.when(plannerContext.getJoinAlgorithm()).thenReturn(JoinAlgorithm.BROADCAST); @@ -85,6 +89,8 @@ public class DruidJoinRuleTest ), leftType, null, + JoinRelType.INNER, + ImmutableList.of(), rexBuilder ) ); @@ -106,6 +112,8 @@ public class DruidJoinRuleTest ), leftType, null, + JoinRelType.INNER, + ImmutableList.of(), rexBuilder ) ); @@ -113,6 +121,71 @@ public class DruidJoinRuleTest @Test public void test_canHandleCondition_leftEqRightFn() + { + Assert.assertEquals( + NullHandling.sqlCompatible(), // We don't handle non-equi join conditions for non-sql compatible mode. + druidJoinRule.canHandleCondition( + rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0), + rexBuilder.makeCall( + SqlStdOperatorTable.CONCAT, + rexBuilder.makeLiteral("foo"), + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1) + ) + ), + leftType, + null, + JoinRelType.INNER, + ImmutableList.of(), + rexBuilder + ) + ); + } + + @Test + public void test_canHandleCondition_leftEqLeft() + { + + Assert.assertEquals( + NullHandling.sqlCompatible(), // We don't handle non-equi join conditions for non-sql compatible mode. + druidJoinRule.canHandleCondition( + rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0), + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0) + ), + leftType, + null, + JoinRelType.INNER, + ImmutableList.of(), + rexBuilder + ) + ); + } + + @Test + public void test_canHandleCondition_rightEqRight() + { + Assert.assertEquals( + NullHandling.sqlCompatible(), // We don't handle non-equi join conditions for non-sql compatible mode. + druidJoinRule.canHandleCondition( + rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1), + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1) + ), + leftType, + null, + JoinRelType.INNER, + ImmutableList.of(), + rexBuilder + ) + ); + } + + @Test + public void test_canHandleCondition_leftEqRightFn_leftJoin() { Assert.assertFalse( druidJoinRule.canHandleCondition( @@ -127,40 +200,31 @@ public class DruidJoinRuleTest ), leftType, null, + JoinRelType.LEFT, + ImmutableList.of(), rexBuilder ) ); } @Test - public void test_canHandleCondition_leftEqLeft() + public void test_canHandleCondition_leftEqRightFn_systemFields() { Assert.assertFalse( druidJoinRule.canHandleCondition( rexBuilder.makeCall( SqlStdOperatorTable.EQUALS, rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0), - rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0) - ), - leftType, - null, - rexBuilder - ) - ); - } - - @Test - public void test_canHandleCondition_rightEqRight() - { - Assert.assertFalse( - druidJoinRule.canHandleCondition( - rexBuilder.makeCall( - SqlStdOperatorTable.EQUALS, - rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1), - rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1) + rexBuilder.makeCall( + SqlStdOperatorTable.CONCAT, + rexBuilder.makeLiteral("foo"), + rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1) + ) ), leftType, null, + JoinRelType.INNER, + Collections.singletonList(null), rexBuilder ) ); @@ -174,6 +238,8 @@ public class DruidJoinRuleTest rexBuilder.makeLiteral(true), leftType, null, + JoinRelType.INNER, + ImmutableList.of(), rexBuilder ) ); @@ -187,6 +253,8 @@ public class DruidJoinRuleTest rexBuilder.makeLiteral(false), leftType, null, + JoinRelType.INNER, + ImmutableList.of(), rexBuilder ) ); diff --git a/website/.spelling b/website/.spelling index 78e25b3285f..002998c442c 100644 --- a/website/.spelling +++ b/website/.spelling @@ -44,6 +44,7 @@ Base64 Base64-encoded ByteBuffer bottlenecked +cartesian concat CIDR CORS @@ -504,6 +505,7 @@ stdout storages stringDictionaryEncoding stringified +sub-conditions subarray subnet subqueries