Revert "SQL: Plan non-equijoin conditions as cross join followed by filter. (#14978)" (#15029)

This reverts commit 4f498e64691ecd22eaa2c940d1d0d57e769ee9e7.
This commit is contained in:
Soumyava 2023-09-25 11:35:44 -07:00 committed by GitHub
parent ba6101ad75
commit 75af741a96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 187 deletions

View File

@ -320,14 +320,12 @@ Join datasources allow you to do a SQL-style join of two datasources. Stacking j
you to join arbitrarily many datasources. you to join arbitrarily many datasources.
In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a broadcast hash-join algorithm. This means In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a broadcast hash-join algorithm. This means
that all datasources other than the leftmost "base" datasource must fit in memory. In native queries, the join condition that all datasources other than the leftmost "base" datasource must fit in memory. It also means that the join condition
must be an equality. In SQL, any join condition is accepted, but only equalities of a certain form must be an equality. This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup),
(see [Joins in SQL](#joins-in-sql)) execute as part of a native join. Other kinds of conditions execute as a cross join [inline](#inline), and [query](#query) datasources.
(cartesian product) plus a filter.
This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), [inline](#inline), and Refer to the [Query execution](query-execution.md#join) page for more details on how queries are executed when you
[query](#query) datasources. Refer to the [Query execution](query-execution.md#join) page for more details on how use join datasources.
queries are executed when you use join datasources.
#### Joins in SQL #### Joins in SQL
@ -337,23 +335,21 @@ SQL joins take the form:
<o1> [ INNER | LEFT [OUTER] ] JOIN <o2> ON <condition> <o1> [ INNER | LEFT [OUTER] ] JOIN <o2> ON <condition>
``` ```
Any condition is accepted, but only certain kinds of conditions execute as part of a native join. To execute efficiently The condition must involve only equalities, but functions are okay, and there can be multiple equalities ANDed together.
as part of a native join, a condition must be a single clause like the following, or an `AND` of clauses like the Conditions like `t1.x = t2.x`, or `LOWER(t1.x) = t2.x`, or `t1.x = t2.x AND t1.y = t2.y` can all be handled. Conditions
following: like `t1.x <> t2.x` cannot currently be handled.
- Equality between fields of the same type on each side, like `t1 JOIN t2 ON t1.x = t2.x`. Note that Druid SQL is less rigid than what native join datasources can handle. In cases where a SQL query does
- Equality between a function call on one side, and a field on the other side, like `t1 JOIN t2 ON LOWER(t1.x) = t2.x`. something that is not allowed as-is with a native join datasource, Druid SQL will generate a subquery. This can have
- The equality operator may be `=` (which does not match nulls) or `IS NOT DISTINCT FROM` (which does match nulls). a substantial effect on performance and scalability, so it is something to watch out for. Some examples of when the
SQL layer will generate subqueries include:
In other cases, Druid will either insert a subquery below the join, or will use a cross join (cartesian product) - Joining a regular Druid table to itself, or to another regular Druid table. The native join datasource can accept
followed by a filter. Joins executed in these ways may run into resource or performance constraints. To determine a table on the left-hand side, but not the right, so a subquery is needed.
if your query is using one of these execution paths, run `EXPLAIN PLAN FOR <query>` and look for the following:
- `query` type datasources under the `left` or `right` key of your `join` datasource. - Join conditions where the expressions on either side are of different types.
- `join` type datasource with `condition` set to `"1"` (cartesian product) followed by a `filter` that encodes the
condition you provided.
In these cases, you may be able to improve the performance of your query by rewriting it. - Join conditions where the right-hand expression is not a direct column access.
For more information about how Druid translates SQL to native queries, refer to the For more information about how Druid translates SQL to native queries, refer to the
[Druid SQL](sql-translation.md) documentation. [Druid SQL](sql-translation.md) documentation.

View File

@ -144,7 +144,7 @@ public class CalciteSelectQueryMSQTest extends CalciteQueryTest
@Ignore @Ignore
@Override @Override
public void testUnplannableScanOrderByNonTime() public void testUnplannableQueries()
{ {
} }

View File

@ -162,15 +162,6 @@ public class CalciteRulesManager
CoreRules.INTERSECT_TO_DISTINCT CoreRules.INTERSECT_TO_DISTINCT
); );
/**
* Rules from Calcite that are not part of Calcite's standard set, but that we use anyway.
*/
private static final List<RelOptRule> EXTRA_CALCITE_RULES =
ImmutableList.of(
// Useful for planning funky join conditions as filters on top of cross joins.
CoreRules.JOIN_EXTRACT_FILTER
);
/** /**
* Rules from {@link org.apache.calcite.plan.RelOptRules#ABSTRACT_RELATIONAL_RULES}, minus: * Rules from {@link org.apache.calcite.plan.RelOptRules#ABSTRACT_RELATIONAL_RULES}, minus:
* *
@ -349,7 +340,6 @@ public class CalciteRulesManager
rules.addAll(BASE_RULES); rules.addAll(BASE_RULES);
rules.addAll(ABSTRACT_RULES); rules.addAll(ABSTRACT_RULES);
rules.addAll(ABSTRACT_RELATIONAL_RULES); rules.addAll(ABSTRACT_RELATIONAL_RULES);
rules.addAll(EXTRA_CALCITE_RULES);
if (plannerContext.getJoinAlgorithm().requiresSubquery()) { if (plannerContext.getJoinAlgorithm().requiresSubquery()) {
rules.addAll(FANCY_JOIN_RULES); rules.addAll(FANCY_JOIN_RULES);

View File

@ -3470,7 +3470,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
// Cannot vectorize due to 'concat' expression. // Cannot vectorize due to 'concat' expression.
cannotVectorize(); cannotVectorize();
ScanQuery expectedQuery = newScanQueryBuilder() ScanQuery nullCompatibleModePlan = newScanQueryBuilder()
.dataSource( .dataSource(
join( join(
new TableDataSource(CalciteTests.DATASOURCE1), new TableDataSource(CalciteTests.DATASOURCE1),
@ -3496,6 +3496,33 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
.context(queryContext) .context(queryContext)
.build(); .build();
ScanQuery nonNullCompatibleModePlan = newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
GroupByQuery
.builder()
.setDataSource(new LookupDataSource("lookyloo"))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(
expressionVirtualColumn("v0", "concat(\"k\",'')", ColumnType.STRING)
)
.setDimensions(new DefaultDimensionSpec("v0", "d0"))
.build()
),
"j0.",
equalsCondition(makeColumnExpression("dim1"), makeColumnExpression("j0.d0")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim1", "j0.d0")
.filters(notNull("j0.d0"))
.context(queryContext)
.build();
boolean isJoinFilterRewriteEnabled = queryContext.getOrDefault(QueryContexts.JOIN_FILTER_REWRITE_ENABLE_KEY, true) boolean isJoinFilterRewriteEnabled = queryContext.getOrDefault(QueryContexts.JOIN_FILTER_REWRITE_ENABLE_KEY, true)
.toString() .toString()
.equals("true"); .equals("true");
@ -3505,7 +3532,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
+ "LEFT JOIN (select k || '' as k from lookup.lookyloo group by 1) l1 ON foo.dim1 = l1.k\n" + "LEFT JOIN (select k || '' as k from lookup.lookyloo group by 1) l1 ON foo.dim1 = l1.k\n"
+ "WHERE l1.k IS NOT NULL\n", + "WHERE l1.k IS NOT NULL\n",
queryContext, queryContext,
ImmutableList.of(expectedQuery), ImmutableList.of(NullHandling.sqlCompatible() ? nullCompatibleModePlan : nonNullCompatibleModePlan),
NullHandling.sqlCompatible() || !isJoinFilterRewriteEnabled NullHandling.sqlCompatible() || !isJoinFilterRewriteEnabled
? ImmutableList.of(new Object[]{"abc", "abc"}) ? ImmutableList.of(new Object[]{"abc", "abc"})
: ImmutableList.of( : ImmutableList.of(
@ -4515,155 +4542,6 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
); );
} }
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinWithImplicitIsNotDistinctFromCondition(Map<String, Object> queryContext)
{
// Like "testInnerJoin", but uses an implied is-not-distinct-from instead of equals.
cannotVectorize();
testQuery(
"SELECT x.m1, y.m1\n"
+ "FROM foo x INNER JOIN foo y ON (x.m1 = y.m1) OR (x.m1 IS NULL AND y.m1 IS NULL)",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("m1")
.context(queryContext)
.build()
),
"j0.",
"notdistinctfrom(\"m1\",\"j0.m1\")",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("j0.m1", "m1")
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{1.0f, 1.0f},
new Object[]{2.0f, 2.0f},
new Object[]{3.0f, 3.0f},
new Object[]{4.0f, 4.0f},
new Object[]{5.0f, 5.0f},
new Object[]{6.0f, 6.0f}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinWithNonEquiCondition(Map<String, Object> queryContext)
{
// Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a
// cross join with a filter.
cannotVectorize();
testQuery(
"SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 > y.m1",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("m1")
.context(queryContext)
.build()
),
"j0.",
"1",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(expressionFilter("(\"m1\" > \"j0.m1\")"))
.columns("j0.m1", "m1")
.context(queryContext)
.build()
),
sortIfSortBased(
ImmutableList.of(
new Object[]{2.0f, 1.0f},
new Object[]{3.0f, 1.0f},
new Object[]{3.0f, 2.0f},
new Object[]{4.0f, 1.0f},
new Object[]{4.0f, 2.0f},
new Object[]{4.0f, 3.0f},
new Object[]{5.0f, 1.0f},
new Object[]{5.0f, 2.0f},
new Object[]{5.0f, 3.0f},
new Object[]{5.0f, 4.0f},
new Object[]{6.0f, 1.0f},
new Object[]{6.0f, 2.0f},
new Object[]{6.0f, 3.0f},
new Object[]{6.0f, 4.0f},
new Object[]{6.0f, 5.0f}
),
1,
0
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinWithEquiAndNonEquiCondition(Map<String, Object> queryContext)
{
// Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a
// cross join with a filter.
cannotVectorize();
testQuery(
"SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 = y.m1 AND x.m1 + y.m1 = 6.0",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("m1")
.context(queryContext)
.build()
),
"j0.",
"1",
JoinType.INNER
)
)
.virtualColumns(expressionVirtualColumn("v0", "(\"m1\" + \"j0.m1\")", ColumnType.DOUBLE))
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(
and(
expressionFilter("(\"m1\" == \"j0.m1\")"),
equality("v0", 6.0, ColumnType.DOUBLE)
)
)
.columns("j0.m1", "m1")
.context(queryContext)
.build()
),
ImmutableList.of(new Object[]{3.0f, 3.0f})
);
}
@Test @Test
@Parameters(source = QueryContextForJoinProvider.class) @Parameters(source = QueryContextForJoinProvider.class)
public void testUsingSubqueryAsPartOfAndFilter(Map<String, Object> queryContext) public void testUsingSubqueryAsPartOfAndFilter(Map<String, Object> queryContext)

View File

@ -5688,15 +5688,32 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@DecoupledIgnore(mode = Modes.ERROR_HANDLING) @DecoupledIgnore(mode = Modes.ERROR_HANDLING)
@Test @Test
public void testUnplannableScanOrderByNonTime() public void testUnplannableQueries()
{ {
// Scan can ORDER BY non-time in MSQ.
notMsqCompatible(); notMsqCompatible();
// All of these queries are unplannable because they rely on features Druid doesn't support.
// This test is here to confirm that we don't fall back to Calcite's interpreter or enumerable implementation.
// It's also here so when we do support these features, we can have "real" tests for these queries.
assertQueryIsUnplannable( final Map<String, String> queries = ImmutableMap.of(
// SELECT query with order by non-__time.
"SELECT dim1 FROM druid.foo ORDER BY dim1", "SELECT dim1 FROM druid.foo ORDER BY dim1",
"SQL query requires order by non-time column [[dim1 ASC]], which is not supported." "SQL query requires order by non-time column [[dim1 ASC]], which is not supported.",
// JOIN condition with not-equals (<>).
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k",
"SQL requires a join with 'NOT_EQUALS' condition that is not supported.",
// JOIN condition with a function of both sides.
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON CHARACTER_LENGTH(foo.dim2 || l.k) > 3\n",
"SQL requires a join with 'GREATER_THAN' condition that is not supported."
); );
for (final Map.Entry<String, String> queryErrorPair : queries.entrySet()) {
assertQueryIsUnplannable(queryErrorPair.getKey(), queryErrorPair.getValue());
}
} }
@Test @Test

View File

@ -43,7 +43,6 @@ Base64
Base64-encoded Base64-encoded
ByteBuffer ByteBuffer
bottlenecked bottlenecked
cartesian
concat concat
CIDR CIDR
CORS CORS