Prevent JOIN reducing to a JOIN with constant in the ON condition (#9941)

* Prevent Join reducing to on constant condition

* Prevent Join reducing to on constant condition

* addreess comments

* set queryContext in tests
This commit is contained in:
Maytas Monsereenusorn 2020-06-01 06:39:06 -10:00 committed by GitHub
parent acfcfd35b1
commit 821c5d5a5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 354 additions and 3 deletions

View File

@ -123,14 +123,17 @@ public class Rules
ProjectTableScanRule.INTERPRETER
);
// Rules from RelOptUtil's registerReductionRules.
// Rules from RelOptUtil's registerReductionRules, minus:
//
// 1) ReduceExpressionsRule.JOIN_INSTANCE
// Removed by https://github.com/apache/druid/pull/9941 due to issue in https://github.com/apache/druid/issues/9942
// TODO: Re-enable when https://github.com/apache/druid/issues/9942 is fixed
private static final List<RelOptRule> REDUCTION_RULES =
ImmutableList.of(
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.WINDOW_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,
ValuesReduceRule.FILTER_INSTANCE,
ValuesReduceRule.PROJECT_FILTER_INSTANCE,
ValuesReduceRule.PROJECT_INSTANCE,
@ -166,6 +169,9 @@ public class Rules
// 2) SemiJoinRule.PROJECT and SemiJoinRule.JOIN (we don't need to detect semi-joins, because they are handled
// fine as-is by DruidJoinRule).
// 3) JoinCommuteRule (we don't support reordering joins yet).
// 4) FilterJoinRule.FILTER_ON_JOIN and FilterJoinRule.JOIN
// Removed by https://github.com/apache/druid/pull/9773 due to issue in https://github.com/apache/druid/issues/9843
// TODO: Re-enable when https://github.com/apache/druid/issues/9843 is fixed
private static final List<RelOptRule> ABSTRACT_RELATIONAL_RULES =
ImmutableList.of(
AbstractConverter.ExpandConversionRule.INSTANCE,

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.annotations.UsedByJUnitParamsRunner;
@ -79,6 +80,7 @@ import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
@ -9169,7 +9171,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
"j0.",
"0",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.d0")),
JoinType.INNER
)
)
@ -14077,6 +14079,349 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testCountOnSemiJoinSingleColumn(Map<String, Object> queryContext) throws Exception
{
testQuery(
"SELECT dim1 FROM foo WHERE dim1 IN (SELECT dim1 FROM foo WHERE dim1 = '10.1')\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(
selector("dim1", "10.1", null)
)
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0")))
.setContext(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("dim1"), DruidExpression.fromColumn("j0.d0")),
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns("v0")
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1"}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSourcesWithTimeFilter(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1' AND \"__time\" >= '1999'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 LEFT JOIN abc as t2 on t1.dim1 = t2.dim1 WHERE t1.dim1 = '10.1'\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(
querySegmentSpec(
Intervals.utc(
DateTimes.of("1999-01-01").getMillis(),
JodaUtils.MAX_INSTANT
)
)
)
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(
querySegmentSpec(
Intervals.utc(
DateTimes.of("1999-01-01").getMillis(),
JodaUtils.MAX_INSTANT
)
)
)
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.v0")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.filters(new SelectorDimFilter("v0", "10.1", null))
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSourcesWithOuterWhere(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 LEFT JOIN abc as t2 on t1.dim1 = t2.dim1 WHERE t1.dim1 = '10.1'\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.dim1")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.filters(new SelectorDimFilter("v0", "10.1", null))
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testLeftJoinOnTwoInlineDataSources(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 LEFT JOIN abc as t2 on t1.dim1 = t2.dim1\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.dim1")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testInnerJoinOnTwoInlineDataSourcesWithOuterWhere(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 INNER JOIN abc as t2 on t1.dim1 = t2.dim1 WHERE t1.dim1 = '10.1'\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.dim1")),
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.filters(new NotDimFilter(new SelectorDimFilter("v0", null, null)))
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testInnerJoinOnTwoInlineDataSources(Map<String, Object> queryContext) throws Exception
{
testQuery(
"with abc as\n"
+ "(\n"
+ " SELECT dim1, \"__time\", m1 from foo WHERE \"dim1\" = '10.1'\n"
+ ")\n"
+ "SELECT t1.dim1, t1.\"__time\" from abc as t1 INNER JOIN abc as t2 on t1.dim1 = t2.dim1\n",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.virtualColumns(expressionVirtualColumn("v0", "\'10.1\'", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new SelectorDimFilter("dim1", "10.1", null))
.columns(ImmutableList.of("dim1"))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(queryContext)
.build()
),
"j0.",
equalsCondition(DruidExpression.fromColumn("v0"), DruidExpression.fromColumn("j0.dim1")),
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ValueType.STRING))
.columns("__time", "_v0")
.context(queryContext)
.build()
),
ImmutableList.of(
new Object[]{"10.1", 946771200000L}
)
);
}
// This query is expected to fail as we do not support join with constant in the on condition
// (see issue https://github.com/apache/druid/issues/9942 for more information)
// TODO: Remove expected Exception when https://github.com/apache/druid/issues/9942 is fixed
@Test(expected = RelOptPlanner.CannotPlanException.class)
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinOnConstantShouldFail(Map<String, Object> queryContext) throws Exception
{
cannotVectorize();
final String query = "SELECT t1.dim1 from foo as t1 LEFT JOIN foo as t2 on t1.dim1 = '10.1'";
testQuery(
query,
queryContext,
ImmutableList.of(),
ImmutableList.of()
);
}
@Test
public void testRepeatedIdenticalVirtualExpressionGrouping() throws Exception
{