SQL: Plan non-equijoin conditions as cross join followed by filter (#15302)

This PR revives #14978 with a few more bells and whistles. Instead of an unconditional cross-join, we will now split the join condition such that some conditions are now evaluated post-join. To decide what sub-condition goes where, I have refactored DruidJoinRule class to extract unsupported sub-conditions. We build a postJoinFilter out of these unsupported sub-conditions and push to the join.
This commit is contained in:
Abhishek Agarwal 2023-11-29 13:46:11 +05:30 committed by GitHub
parent ee6ad36fab
commit 0a56c87e93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 508 additions and 222 deletions

View File

@ -320,12 +320,16 @@ Join datasources allow you to do a SQL-style join of two datasources. Stacking j
you to join arbitrarily many datasources.
In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a broadcast hash-join algorithm. This means
that all datasources other than the leftmost "base" datasource must fit in memory. It also means that the join condition
must be an equality. This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup),
[inline](#inline), and [query](#query) datasources.
that all datasources other than the leftmost "base" datasource must fit in memory. In native queries, the join condition
must be an equality. In SQL, any join condition is accepted, but only equalities of a certain form
(see [Joins in SQL](#joins-in-sql)) execute efficiently as part of a native join. For other kinds of conditions, planner will try
to re-arrange condition such that some of the sub-conditions are evaluated as a filter on top of join and other
sub-conditions are left out in the join condition. In worst case scenario, SQL will execute the join condition as a
cross join (cartesian product) plus a filter.
Refer to the [Query execution](query-execution.md#join) page for more details on how queries are executed when you
use join datasources.
This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), [inline](#inline), and
[query](#query) datasources. Refer to the [Query execution](query-execution.md#join) page for more details on how
queries are executed when you use join datasources.
#### Joins in SQL
@ -335,21 +339,23 @@ SQL joins take the form:
<o1> [ INNER | LEFT [OUTER] ] JOIN <o2> ON <condition>
```
The condition must involve only equalities, but functions are okay, and there can be multiple equalities ANDed together.
Conditions like `t1.x = t2.x`, or `LOWER(t1.x) = t2.x`, or `t1.x = t2.x AND t1.y = t2.y` can all be handled. Conditions
like `t1.x <> t2.x` cannot currently be handled.
Any condition is accepted, but only certain kinds of conditions execute efficiently
as a native join. The condition must be a single clause like the following, or an `AND` of clauses involving at
least one of the following:
Note that Druid SQL is less rigid than what native join datasources can handle. In cases where a SQL query does
something that is not allowed as-is with a native join datasource, Druid SQL will generate a subquery. This can have
a substantial effect on performance and scalability, so it is something to watch out for. Some examples of when the
SQL layer will generate subqueries include:
- Equality between fields of the same type on each side, like `t1 JOIN t2 ON t1.x = t2.x`.
- Equality between a function call on one side, and a field on the other side, like `t1 JOIN t2 ON LOWER(t1.x) = t2.x`.
- The equality operator may be `=` (which does not match nulls) or `IS NOT DISTINCT FROM` (which does match nulls).
- Joining a regular Druid table to itself, or to another regular Druid table. The native join datasource can accept
a table on the left-hand side, but not the right, so a subquery is needed.
In other cases, Druid will either insert a subquery below the join, or will use a cross join (cartesian product)
followed by a filter. Joins executed in these ways may run into resource or performance constraints. To determine
if your query is using one of these execution paths, run `EXPLAIN PLAN FOR <query>` and look for the following:
- Join conditions where the expressions on either side are of different types.
- `query` type datasources under the `left` or `right` key of your `join` datasource.
- `join` type datasource with `condition` set to `"1"` (cartesian product) followed by a `filter` that encodes the
condition you provided.
- Join conditions where the right-hand expression is not a direct column access.
In these cases, you may be able to improve the performance of your query by rewriting it.
For more information about how Druid translates SQL to native queries, refer to the
[Druid SQL](sql-translation.md) documentation.

View File

@ -772,7 +772,7 @@ public class MSQSelectTest extends MSQTestBase
DruidExpression.ofColumn(ColumnType.STRING, "dim2"),
DruidExpression.ofColumn(ColumnType.STRING, "j0.k")
),
JoinType.LEFT
NullHandling.sqlCompatible() ? JoinType.INNER : JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))

View File

@ -145,7 +145,14 @@ public class CalciteSelectQueryMSQTest extends CalciteQueryTest
@Ignore
@Override
public void testUnplannableQueries()
public void testUnplannableScanOrderByNonTime()
{
}
@Ignore
@Override
public void testUnplannableJoinQueriesInNonSQLCompatibleMode()
{
}

View File

@ -28,8 +28,10 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
@ -43,6 +45,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@ -54,7 +57,6 @@ import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
@ -82,7 +84,7 @@ public class DruidJoinRule extends RelOptRule
{
return new DruidJoinRule(plannerContext);
}
@Override
public boolean matches(RelOptRuleCall call)
{
@ -93,9 +95,14 @@ public class DruidJoinRule extends RelOptRule
// 1) Can handle the join condition as a native join.
// 2) Left has a PartialDruidQuery (i.e., is a real query, not top-level UNION ALL).
// 3) Right has a PartialDruidQuery (i.e., is a real query, not top-level UNION ALL).
return canHandleCondition(join.getCondition(), join.getLeft().getRowType(), right, join.getCluster().getRexBuilder())
&& left.getPartialDruidQuery() != null
&& right.getPartialDruidQuery() != null;
return canHandleCondition(
join.getCondition(),
join.getLeft().getRowType(),
right,
join.getJoinType(),
join.getSystemFieldList(),
join.getCluster().getRexBuilder()
) && left.getPartialDruidQuery() != null && right.getPartialDruidQuery() != null;
}
@Override
@ -112,14 +119,12 @@ public class DruidJoinRule extends RelOptRule
final Filter leftFilter;
final List<RexNode> newProjectExprs = new ArrayList<>();
// Already verified to be present in "matches", so just call "get".
// Can't be final, because we're going to reassign it up to a couple of times.
ConditionAnalysis conditionAnalysis = analyzeCondition(
join.getCondition(),
join.getLeft().getRowType(),
right,
rexBuilder
).get();
);
final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (left instanceof DruidQueryRel);
if (!plannerContext.getJoinAlgorithm().requiresSubquery()
@ -184,7 +189,7 @@ public class DruidJoinRule extends RelOptRule
final DruidJoinQueryRel druidJoin = DruidJoinQueryRel.create(
join.copy(
join.getTraitSet(),
conditionAnalysis.getCondition(rexBuilder),
conditionAnalysis.getConditionWithUnsupportedSubConditionsIgnored(rexBuilder),
newLeft,
newRight,
join.getJoinType(),
@ -194,7 +199,7 @@ public class DruidJoinRule extends RelOptRule
left.getPlannerContext()
);
final RelBuilder relBuilder =
RelBuilder relBuilder =
call.builder()
.push(druidJoin)
.project(
@ -205,6 +210,12 @@ public class DruidJoinRule extends RelOptRule
)
);
// Build a post-join filter with whatever join sub-conditions were not supported.
RexNode postJoinFilter = RexUtil.composeConjunction(rexBuilder, conditionAnalysis.getUnsupportedOnSubConditions(), true);
if (postJoinFilter != null) {
relBuilder = relBuilder.filter(postJoinFilter);
}
call.transformTo(relBuilder.build());
}
@ -222,30 +233,214 @@ public class DruidJoinRule extends RelOptRule
}
/**
* Returns whether {@link #analyzeCondition} would return something.
* Returns whether we can handle the join condition. In case, some conditions in an AND expression are not supported,
* they are extracted into a post-join filter instead.
*/
@VisibleForTesting
boolean canHandleCondition(final RexNode condition, final RelDataType leftRowType, DruidRel<?> right, final RexBuilder rexBuilder)
public boolean canHandleCondition(
final RexNode condition,
final RelDataType leftRowType,
DruidRel<?> right,
JoinRelType joinType,
List<RelDataTypeField> systemFieldList,
final RexBuilder rexBuilder
)
{
return analyzeCondition(condition, leftRowType, right, rexBuilder).isPresent();
ConditionAnalysis conditionAnalysis = analyzeCondition(condition, leftRowType, rexBuilder);
// if the right side requires a subquery, then even lookup will be transformed to a QueryDataSource
// thereby allowing join conditions on both k and v columns of the lookup
if (right != null
&& !DruidJoinQueryRel.computeRightRequiresSubquery(plannerContext, DruidJoinQueryRel.getSomeDruidChild(right))
&& right instanceof DruidQueryRel) {
DruidQueryRel druidQueryRel = (DruidQueryRel) right;
if (druidQueryRel.getDruidTable().getDataSource() instanceof LookupDataSource) {
long distinctRightColumns = conditionAnalysis.rightColumns.stream().map(RexSlot::getIndex).distinct().count();
if (distinctRightColumns > 1) {
// it means that the join's right side is lookup and the join condition contains both key and value columns of lookup.
// currently, the lookup datasource in the native engine doesn't support using value column in the join condition.
plannerContext.setPlanningError(
"SQL is resulting in a join involving lookup where value column is used in the condition.");
return false;
}
}
}
if (joinType != JoinRelType.INNER || !systemFieldList.isEmpty() || NullHandling.replaceWithDefault()) {
// I am not sure in what case, the list of system fields will be not empty. I have just picked up this logic
// directly from https://github.com/apache/calcite/blob/calcite-1.35.0/core/src/main/java/org/apache/calcite/rel/rules/AbstractJoinExtractFilterRule.java#L58
// Also to avoid results changes for existing queries in non-null handling mode, we don't handle unsupported
// conditions. Otherwise, some left/right joins with a condition that doesn't allow nulls on join input will
// be converted to inner joins. See Test CalciteJoinQueryTest#testFilterAndGroupByLookupUsingJoinOperatorBackwards
// for an example.
return conditionAnalysis.getUnsupportedOnSubConditions().isEmpty();
}
return true;
}
public static class ConditionAnalysis
{
/**
* Number of fields on the left-hand side. Useful for identifying if a particular field is from on the left
* or right side of a join.
*/
private final int numLeftFields;
/**
* Each equality subcondition is an equality of the form f(LeftRel) = g(RightRel).
*/
private final List<RexEquality> equalitySubConditions;
/**
* Each literal subcondition is... a literal.
*/
private final List<RexLiteral> literalSubConditions;
/**
* Sub-conditions in join clause that cannot be handled by the DruidJoinRule.
*/
private final List<RexNode> unsupportedOnSubConditions;
private final Set<RexInputRef> rightColumns;
ConditionAnalysis(
int numLeftFields,
List<RexEquality> equalitySubConditions,
List<RexLiteral> literalSubConditions,
List<RexNode> unsupportedOnSubConditions,
Set<RexInputRef> rightColumns
)
{
this.numLeftFields = numLeftFields;
this.equalitySubConditions = equalitySubConditions;
this.literalSubConditions = literalSubConditions;
this.unsupportedOnSubConditions = unsupportedOnSubConditions;
this.rightColumns = rightColumns;
}
public ConditionAnalysis pushThroughLeftProject(final Project leftProject)
{
// Pushing through the project will shift right-hand field references by this amount.
final int rhsShift =
leftProject.getInput().getRowType().getFieldCount() - leftProject.getRowType().getFieldCount();
// We leave unsupportedSubConditions un-touched as they are evaluated above join anyway.
return new ConditionAnalysis(
leftProject.getInput().getRowType().getFieldCount(),
equalitySubConditions
.stream()
.map(
equality -> new RexEquality(
RelOptUtil.pushPastProject(equality.left, leftProject),
(RexInputRef) RexUtil.shift(equality.right, rhsShift),
equality.kind
)
)
.collect(Collectors.toList()),
literalSubConditions,
unsupportedOnSubConditions,
rightColumns
);
}
public ConditionAnalysis pushThroughRightProject(final Project rightProject)
{
Preconditions.checkArgument(onlyUsesMappingsFromRightProject(rightProject), "Cannot push through");
// We leave unsupportedSubConditions un-touched as they are evaluated above join anyway.
return new ConditionAnalysis(
numLeftFields,
equalitySubConditions
.stream()
.map(
equality -> new RexEquality(
equality.left,
(RexInputRef) RexUtil.shift(
RelOptUtil.pushPastProject(
RexUtil.shift(equality.right, -numLeftFields),
rightProject
),
numLeftFields
),
equality.kind
)
)
.collect(Collectors.toList()),
literalSubConditions,
unsupportedOnSubConditions,
rightColumns
);
}
public boolean onlyUsesMappingsFromRightProject(final Project rightProject)
{
for (final RexEquality equality : equalitySubConditions) {
final int rightIndex = equality.right.getIndex() - numLeftFields;
if (!rightProject.getProjects().get(rightIndex).isA(SqlKind.INPUT_REF)) {
return false;
}
}
return true;
}
public RexNode getConditionWithUnsupportedSubConditionsIgnored(final RexBuilder rexBuilder)
{
return RexUtil.composeConjunction(
rexBuilder,
Iterables.concat(
literalSubConditions,
equalitySubConditions
.stream()
.map(equality -> equality.makeCall(rexBuilder))
.collect(Collectors.toList())
),
false
);
}
public List<RexNode> getUnsupportedOnSubConditions()
{
return unsupportedOnSubConditions;
}
@Override
public String toString()
{
return "ConditionAnalysis{" +
"numLeftFields=" + numLeftFields +
", equalitySubConditions=" + equalitySubConditions +
", literalSubConditions=" + literalSubConditions +
", unsupportedSubConditions=" + unsupportedOnSubConditions +
", rightColumns=" + rightColumns +
'}';
}
}
/**
* If this condition is an AND of some combination of (1) literals; (2) equality conditions of the form
* If this condition is an AND of some combination of
* (1) literals;
* (2) equality conditions of the form
* (3) unsupported conditions
* <p>
* Returns empty if the join cannot be supported at all. It can return non-empty with some unsupported conditions
* that can be extracted into post join filter.
* {@code f(LeftRel) = RightColumn}, then return a {@link ConditionAnalysis}.
*/
private Optional<ConditionAnalysis> analyzeCondition(
public ConditionAnalysis analyzeCondition(
final RexNode condition,
final RelDataType leftRowType,
final DruidRel<?> right,
final RexBuilder rexBuilder
)
{
final List<RexNode> subConditions = decomposeAnd(condition);
final List<RexEquality> equalitySubConditions = new ArrayList<>();
final List<RexLiteral> literalSubConditions = new ArrayList<>();
final int numLeftFields = leftRowType.getFieldCount();
final List<RexNode> unSupportedSubConditions = new ArrayList<>();
final Set<RexInputRef> rightColumns = new HashSet<>();
final int numLeftFields = leftRowType.getFieldCount();
for (RexNode subCondition : subConditions) {
if (RexUtil.isLiteral(subCondition, true)) {
@ -259,8 +454,9 @@ public class DruidJoinRule extends RelOptRule
// If the types are the same, unwrap the cast and use the underlying literal.
literalSubConditions.add((RexLiteral) call.getOperands().get(0));
} else {
// If the types are not the same, return Optional.empty() indicating the condition is not supported.
return Optional.empty();
// If the types are not the same, add to unsupported conditions.
unSupportedSubConditions.add(subCondition);
continue;
}
} else {
// Literals are always OK.
@ -284,7 +480,8 @@ public class DruidJoinRule extends RelOptRule
subCondition.getKind(),
secondOperand.getType().getSqlTypeName()
);
return Optional.empty();
unSupportedSubConditions.add(subCondition);
continue;
}
} else if (subCondition.isA(SqlKind.EQUALS) || subCondition.isA(SqlKind.IS_NOT_DISTINCT_FROM)) {
@ -299,7 +496,8 @@ public class DruidJoinRule extends RelOptRule
"SQL requires a join with '%s' condition that is not supported.",
subCondition.getKind()
);
return Optional.empty();
unSupportedSubConditions.add(subCondition);
continue;
}
if (isLeftExpression(firstOperand, numLeftFields) && isRightInputRef(secondOperand, numLeftFields)) {
@ -312,34 +510,16 @@ public class DruidJoinRule extends RelOptRule
} else {
// Cannot handle this condition.
plannerContext.setPlanningError("SQL is resulting in a join that has unsupported operand types.");
return Optional.empty();
unSupportedSubConditions.add(subCondition);
}
}
// if the right side requires a subquery, then even lookup will be transformed to a QueryDataSource
// thereby allowing join conditions on both k and v columns of the lookup
if (right != null
&& !DruidJoinQueryRel.computeRightRequiresSubquery(plannerContext, DruidJoinQueryRel.getSomeDruidChild(right))
&& right instanceof DruidQueryRel) {
DruidQueryRel druidQueryRel = (DruidQueryRel) right;
if (druidQueryRel.getDruidTable().getDataSource() instanceof LookupDataSource) {
long distinctRightColumns = rightColumns.stream().map(RexSlot::getIndex).distinct().count();
if (distinctRightColumns > 1) {
// it means that the join's right side is lookup and the join condition contains both key and value columns of lookup.
// currently, the lookup datasource in the native engine doesn't support using value column in the join condition.
plannerContext.setPlanningError(
"SQL is resulting in a join involving lookup where value column is used in the condition.");
return Optional.empty();
}
}
}
return Optional.of(
new ConditionAnalysis(
numLeftFields,
equalitySubConditions,
literalSubConditions
)
return new ConditionAnalysis(
numLeftFields,
equalitySubConditions,
literalSubConditions,
unSupportedSubConditions,
rightColumns
);
}
@ -369,7 +549,7 @@ public class DruidJoinRule extends RelOptRule
return retVal;
}
private boolean isLeftExpression(final RexNode rexNode, final int numLeftFields)
private static boolean isLeftExpression(final RexNode rexNode, final int numLeftFields)
{
return ImmutableBitSet.range(numLeftFields).contains(RelOptUtil.InputFinder.bits(rexNode));
}
@ -379,121 +559,6 @@ public class DruidJoinRule extends RelOptRule
return rexNode.isA(SqlKind.INPUT_REF) && ((RexInputRef) rexNode).getIndex() >= numLeftFields;
}
static class ConditionAnalysis
{
/**
* Number of fields on the left-hand side. Useful for identifying if a particular field is from on the left
* or right side of a join.
*/
private final int numLeftFields;
/**
* Each equality subcondition is an equality of the form f(LeftRel) = g(RightRel).
*/
private final List<RexEquality> equalitySubConditions;
/**
* Each literal subcondition is... a literal.
*/
private final List<RexLiteral> literalSubConditions;
ConditionAnalysis(
int numLeftFields,
List<RexEquality> equalitySubConditions,
List<RexLiteral> literalSubConditions
)
{
this.numLeftFields = numLeftFields;
this.equalitySubConditions = equalitySubConditions;
this.literalSubConditions = literalSubConditions;
}
public ConditionAnalysis pushThroughLeftProject(final Project leftProject)
{
// Pushing through the project will shift right-hand field references by this amount.
final int rhsShift =
leftProject.getInput().getRowType().getFieldCount() - leftProject.getRowType().getFieldCount();
return new ConditionAnalysis(
leftProject.getInput().getRowType().getFieldCount(),
equalitySubConditions
.stream()
.map(
equality -> new RexEquality(
RelOptUtil.pushPastProject(equality.left, leftProject),
(RexInputRef) RexUtil.shift(equality.right, rhsShift),
equality.kind
)
)
.collect(Collectors.toList()),
literalSubConditions
);
}
public ConditionAnalysis pushThroughRightProject(final Project rightProject)
{
Preconditions.checkArgument(onlyUsesMappingsFromRightProject(rightProject), "Cannot push through");
return new ConditionAnalysis(
numLeftFields,
equalitySubConditions
.stream()
.map(
equality -> new RexEquality(
equality.left,
(RexInputRef) RexUtil.shift(
RelOptUtil.pushPastProject(
RexUtil.shift(equality.right, -numLeftFields),
rightProject
),
numLeftFields
),
equality.kind
)
)
.collect(Collectors.toList()),
literalSubConditions
);
}
public boolean onlyUsesMappingsFromRightProject(final Project rightProject)
{
for (final RexEquality equality : equalitySubConditions) {
final int rightIndex = equality.right.getIndex() - numLeftFields;
if (!rightProject.getProjects().get(rightIndex).isA(SqlKind.INPUT_REF)) {
return false;
}
}
return true;
}
public RexNode getCondition(final RexBuilder rexBuilder)
{
return RexUtil.composeConjunction(
rexBuilder,
Iterables.concat(
literalSubConditions,
equalitySubConditions
.stream()
.map(equality -> equality.makeCall(rexBuilder))
.collect(Collectors.toList())
),
false
);
}
@Override
public String toString()
{
return "ConditionAnalysis{" +
"numLeftFields=" + numLeftFields +
", equalitySubConditions=" + equalitySubConditions +
", literalSubConditions=" + literalSubConditions +
'}';
}
}
/**
* Like {@link org.apache.druid.segment.join.Equality} but uses {@link RexNode} instead of

View File

@ -723,7 +723,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
),
"j0.",
equalsCondition(makeColumnExpression("k"), makeColumnExpression("j0.dim2")),
JoinType.RIGHT
NullHandling.sqlCompatible() ? JoinType.INNER : JoinType.RIGHT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
@ -740,7 +740,6 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
new Object[]{"xabc", 1L}
)
: ImmutableList.of(
new Object[]{null, 5L},
new Object[]{"xabc", 1L}
)
);
@ -768,7 +767,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(makeColumnExpression("dim2"), makeColumnExpression("j0.k")),
JoinType.LEFT
NullHandling.sqlCompatible() ? JoinType.INNER : JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
@ -784,8 +783,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
new Object[]{NULL_STRING, 3L},
new Object[]{"xabc", 1L}
)
:
ImmutableList.of(
: ImmutableList.of(
new Object[]{"xabc", 1L}
)
);
@ -821,7 +819,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
new LookupDataSource("lookyloo"),
"j0.",
equalsCondition(makeColumnExpression("dim2"), makeColumnExpression("j0.k")),
JoinType.LEFT
NullHandling.sqlCompatible() ? JoinType.INNER : JoinType.LEFT
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
@ -1965,17 +1963,25 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
.build()
),
"_j0.",
"1",
NullHandling.sqlCompatible() ?
equalsCondition(
DruidExpression.fromExpression("CAST(\"j0.k\", 'LONG')"),
DruidExpression.ofColumn(ColumnType.LONG, "_j0.cnt")
)
: "1",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(new CountAggregatorFactory("a0"))
.filters(and(
expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))"),
expressionFilter("(CAST(\"j0.k\", 'LONG') == \"_j0.cnt\")")
))
.filters(
NullHandling.sqlCompatible() ?
expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))")
: and(
expressionFilter("(\"cnt\" == CAST(\"j0.k\", 'LONG'))"),
expressionFilter("(CAST(\"j0.k\", 'LONG') == \"_j0.cnt\")")
))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -4558,6 +4564,113 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinWithNonEquiCondition(Map<String, Object> queryContext)
{
// Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a
// cross join with a filter.
cannotVectorize();
// We don't handle non-equi join conditions for non-sql compatible mode.
Assume.assumeFalse(NullHandling.replaceWithDefault());
testQuery(
"SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 > y.m1",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("m1")
.context(queryContext)
.build()
),
"j0.",
"1",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(expressionFilter("(\"m1\" > \"j0.m1\")"))
.columns("j0.m1", "m1")
.context(queryContext)
.build()
),
sortIfSortBased(
ImmutableList.of(
new Object[]{2.0f, 1.0f},
new Object[]{3.0f, 1.0f},
new Object[]{3.0f, 2.0f},
new Object[]{4.0f, 1.0f},
new Object[]{4.0f, 2.0f},
new Object[]{4.0f, 3.0f},
new Object[]{5.0f, 1.0f},
new Object[]{5.0f, 2.0f},
new Object[]{5.0f, 3.0f},
new Object[]{5.0f, 4.0f},
new Object[]{6.0f, 1.0f},
new Object[]{6.0f, 2.0f},
new Object[]{6.0f, 3.0f},
new Object[]{6.0f, 4.0f},
new Object[]{6.0f, 5.0f}
),
1,
0
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinWithEquiAndNonEquiCondition(Map<String, Object> queryContext)
{
// Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a
// cross join with a filter.
cannotVectorize();
// We don't handle non-equi join conditions for non-sql compatible mode.
Assume.assumeFalse(NullHandling.replaceWithDefault());
testQuery(
"SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 = y.m1 AND x.m1 + y.m1 = 6.0",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("m1")
.context(queryContext)
.build()
),
"j0.",
equalsCondition(makeColumnExpression("m1"), makeColumnExpression("j0.m1")),
JoinType.INNER
)
)
.virtualColumns(expressionVirtualColumn("v0", "(\"m1\" + \"j0.m1\")", ColumnType.DOUBLE))
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(
equality("v0", 6.0, ColumnType.DOUBLE)
)
.columns("j0.m1", "m1")
.context(queryContext)
.build()
),
ImmutableList.of(new Object[]{3.0f, 3.0f})
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testUsingSubqueryAsPartOfAndFilter(Map<String, Object> queryContext)

View File

@ -126,6 +126,7 @@ import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@ -5414,32 +5415,40 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnplannableQueries()
public void testUnplannableScanOrderByNonTime()
{
msqIncompatible();
// All of these queries are unplannable because they rely on features Druid doesn't support.
// This test is here to confirm that we don't fall back to Calcite's interpreter or enumerable implementation.
// It's also here so when we do support these features, we can have "real" tests for these queries.
final Map<String, String> queries = ImmutableMap.of(
// SELECT query with order by non-__time.
assertQueryIsUnplannable(
"SELECT dim1 FROM druid.foo ORDER BY dim1",
"SQL query requires ordering a table by non-time column [[dim1]], which is not supported.",
"SQL query requires ordering a table by non-time column [[dim1]], which is not supported."
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnplannableJoinQueriesInNonSQLCompatibleMode()
{
msqIncompatible();
Assume.assumeFalse(NullHandling.sqlCompatible());
assertQueryIsUnplannable(
// JOIN condition with not-equals (<>).
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k",
"SQL requires a join with 'NOT_EQUALS' condition that is not supported.",
"SQL requires a join with 'NOT_EQUALS' condition that is not supported."
);
assertQueryIsUnplannable(
// JOIN condition with a function of both sides.
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON CHARACTER_LENGTH(foo.dim2 || l.k) > 3\n",
"SQL requires a join with 'GREATER_THAN' condition that is not supported."
);
for (final Map.Entry<String, String> queryErrorPair : queries.entrySet()) {
assertQueryIsUnplannable(queryErrorPair.getKey(), queryErrorPair.getValue());
}
}
@Test

View File

@ -129,11 +129,27 @@ public @interface NotYetSupported
public void evaluate()
{
Modes ignoreMode = annotation.value();
Throwable e = assertThrows(
Throwable e = null;
try {
base.evaluate();
}
catch (Throwable t) {
e = t;
}
// If the base test case is supposed to be ignored already, just skip the further evaluation
if (e instanceof AssumptionViolatedException) {
throw (AssumptionViolatedException) e;
}
Throwable finalE = e;
assertThrows(
"Expected that this testcase will fail - it might got fixed; or failure have changed?",
ignoreMode.throwableClass,
base::evaluate
);
() -> {
if (finalE != null) {
throw finalE;
}
}
);
String trace = Throwables.getStackTraceAsString(e);
Matcher m = annotation.value().getPattern().matcher(trace);

View File

@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite.rule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
@ -29,6 +30,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.QueryContext;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.apache.druid.sql.calcite.planner.JoinAlgorithm;
@ -39,6 +41,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
public class DruidJoinRuleTest
@ -67,6 +70,7 @@ public class DruidJoinRuleTest
@Before
public void setup()
{
NullHandling.initializeForTests();
PlannerContext plannerContext = Mockito.mock(PlannerContext.class);
Mockito.when(plannerContext.queryContext()).thenReturn(QueryContext.empty());
Mockito.when(plannerContext.getJoinAlgorithm()).thenReturn(JoinAlgorithm.BROADCAST);
@ -85,6 +89,8 @@ public class DruidJoinRuleTest
),
leftType,
null,
JoinRelType.INNER,
ImmutableList.of(),
rexBuilder
)
);
@ -106,6 +112,8 @@ public class DruidJoinRuleTest
),
leftType,
null,
JoinRelType.INNER,
ImmutableList.of(),
rexBuilder
)
);
@ -113,6 +121,71 @@ public class DruidJoinRuleTest
@Test
public void test_canHandleCondition_leftEqRightFn()
{
Assert.assertEquals(
NullHandling.sqlCompatible(), // We don't handle non-equi join conditions for non-sql compatible mode.
druidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
rexBuilder.makeCall(
SqlStdOperatorTable.CONCAT,
rexBuilder.makeLiteral("foo"),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
)
),
leftType,
null,
JoinRelType.INNER,
ImmutableList.of(),
rexBuilder
)
);
}
@Test
public void test_canHandleCondition_leftEqLeft()
{
Assert.assertEquals(
NullHandling.sqlCompatible(), // We don't handle non-equi join conditions for non-sql compatible mode.
druidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0)
),
leftType,
null,
JoinRelType.INNER,
ImmutableList.of(),
rexBuilder
)
);
}
@Test
public void test_canHandleCondition_rightEqRight()
{
Assert.assertEquals(
NullHandling.sqlCompatible(), // We don't handle non-equi join conditions for non-sql compatible mode.
druidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
),
leftType,
null,
JoinRelType.INNER,
ImmutableList.of(),
rexBuilder
)
);
}
@Test
public void test_canHandleCondition_leftEqRightFn_leftJoin()
{
Assert.assertFalse(
druidJoinRule.canHandleCondition(
@ -127,40 +200,31 @@ public class DruidJoinRuleTest
),
leftType,
null,
JoinRelType.LEFT,
ImmutableList.of(),
rexBuilder
)
);
}
@Test
public void test_canHandleCondition_leftEqLeft()
public void test_canHandleCondition_leftEqRightFn_systemFields()
{
Assert.assertFalse(
druidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0)
),
leftType,
null,
rexBuilder
)
);
}
@Test
public void test_canHandleCondition_rightEqRight()
{
Assert.assertFalse(
druidJoinRule.canHandleCondition(
rexBuilder.makeCall(
SqlStdOperatorTable.EQUALS,
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
rexBuilder.makeCall(
SqlStdOperatorTable.CONCAT,
rexBuilder.makeLiteral("foo"),
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 1)
)
),
leftType,
null,
JoinRelType.INNER,
Collections.singletonList(null),
rexBuilder
)
);
@ -174,6 +238,8 @@ public class DruidJoinRuleTest
rexBuilder.makeLiteral(true),
leftType,
null,
JoinRelType.INNER,
ImmutableList.of(),
rexBuilder
)
);
@ -187,6 +253,8 @@ public class DruidJoinRuleTest
rexBuilder.makeLiteral(false),
leftType,
null,
JoinRelType.INNER,
ImmutableList.of(),
rexBuilder
)
);

View File

@ -44,6 +44,7 @@ Base64
Base64-encoded
ByteBuffer
bottlenecked
cartesian
concat
CIDR
CORS
@ -504,6 +505,7 @@ stdout
storages
stringDictionaryEncoding
stringified
sub-conditions
subarray
subnet
subqueries