Support Window operators in decoupled planning (#15815)

This commit is contained in:
Zoltan Haindrich 2024-02-07 10:09:48 +01:00 committed by GitHub
parent 43a1c96cd1
commit fdc7cec271
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 291 additions and 22 deletions

View File

@ -248,12 +248,24 @@ public class CalciteRulesManager
), ),
Programs.sequence( Programs.sequence(
druidPreProgram, druidPreProgram,
buildBaseRuleSetProgram(plannerContext),
new LoggingProgram("After baseRuleSet program", isDebug),
Programs.ofRules(logicalConventionRuleSet(plannerContext)), Programs.ofRules(logicalConventionRuleSet(plannerContext)),
new LoggingProgram("After logical volcano planner program", isDebug) new LoggingProgram("After logical volcano planner program", isDebug)
) )
); );
} }
private Program buildBaseRuleSetProgram(PlannerContext plannerContext)
{
final HepProgramBuilder builder = HepProgram.builder();
builder.addMatchLimit(CalciteRulesManager.HEP_DEFAULT_MATCH_LIMIT);
builder.addGroupBegin();
builder.addRuleCollection(baseRuleSet(plannerContext));
builder.addGroupEnd();
return Programs.of(builder.build(), true, DefaultRelMetadataProvider.INSTANCE);
}
/** /**
* Build the program that runs prior to the cost-based {@link VolcanoPlanner}. * Build the program that runs prior to the cost-based {@link VolcanoPlanner}.
* *
@ -405,7 +417,7 @@ public class CalciteRulesManager
{ {
final ImmutableList.Builder<RelOptRule> retVal = ImmutableList final ImmutableList.Builder<RelOptRule> retVal = ImmutableList
.<RelOptRule>builder() .<RelOptRule>builder()
.addAll(baseRuleSet(plannerContext)) .add(CoreRules.SORT_REMOVE)
.add(new DruidLogicalRules(plannerContext).rules().toArray(new RelOptRule[0])); .add(new DruidLogicalRules(plannerContext).rules().toArray(new RelOptRule[0]));
return retVal.build(); return retVal.build();
} }

View File

@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableFunctionScan; import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCorrelate; import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalExchange; import org.apache.calcite.rel.logical.LogicalExchange;
@ -43,6 +44,7 @@ import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.logical.LogicalValues; import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexLiteral;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery; import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
@ -308,9 +310,25 @@ public class DruidQueryGenerator extends RelShuttleImpl
return visitFilter((Filter) other); return visitFilter((Filter) other);
} else if (other instanceof LogicalValues) { } else if (other instanceof LogicalValues) {
return visit((LogicalValues) other); return visit((LogicalValues) other);
} else if (other instanceof Window) {
return visitWindow((Window) other);
} }
return super.visit(other); throw new UOE("Found unsupported RelNode [%s]", other.getClass().getSimpleName());
}
private RelNode visitWindow(Window other)
{
RelNode result = super.visit(other);
if (!PartialDruidQuery.Stage.WINDOW.canFollow(currentStage)) {
queryList.add(partialDruidQuery);
queryTables.add(currentTable);
partialDruidQuery = PartialDruidQuery.createOuterQuery(partialDruidQuery);
}
partialDruidQuery = partialDruidQuery.withWindow((Window) result);
currentStage = PartialDruidQuery.Stage.WINDOW;
return result;
} }
public PartialDruidQuery getPartialDruidQuery() public PartialDruidQuery getPartialDruidQuery()

View File

@ -108,6 +108,7 @@ import org.joda.time.Interval;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -1494,7 +1495,7 @@ public class DruidQuery
{ {
if (sorting == null if (sorting == null
|| sorting.getOrderBys().isEmpty() || sorting.getOrderBys().isEmpty()
|| sorting.getProjection() != null) { || (sorting.getProjection() != null && !sorting.getProjection().getVirtualColumns().isEmpty())) {
return null; return null;
} }
@ -1515,13 +1516,25 @@ public class DruidQuery
List<OperatorFactory> operators = new ArrayList<>(); List<OperatorFactory> operators = new ArrayList<>();
operators.add(new NaiveSortOperatorFactory(sortColumns)); operators.add(new NaiveSortOperatorFactory(sortColumns));
if (!sorting.getOffsetLimit().isNone()) {
final Projection projection = sorting.getProjection();
final org.apache.druid.query.operator.OffsetLimit offsetLimit = sorting.getOffsetLimit().isNone()
? null
: sorting.getOffsetLimit().toOperatorOffsetLimit();
final List<String> projectedColumns = projection == null
? null
: projection.getOutputRowSignature().getColumnNames();
if (offsetLimit != null || projectedColumns != null) {
operators.add( operators.add(
new ScanOperatorFactory( new ScanOperatorFactory(
null, null,
null, null,
sorting.getOffsetLimit().toOperatorOffsetLimit(), offsetLimit,
null, projectedColumns,
null, null,
null null
) )

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel.logical;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import java.util.List;
/**
* {@link DruidLogicalNode} convention node for {@link Filter} plan node.
*/
public class DruidWindow extends Window implements DruidLogicalNode
{
public DruidWindow(RelOptCluster cluster, RelTraitSet traitSet, List<RelHint> hints, RelNode input,
List<RexLiteral> constants, RelDataType rowType, List<Group> groups)
{
super(cluster, traitSet, hints, input, constants, rowType, groups);
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs)
{
return new DruidWindow(getCluster(), traitSet, hints, inputs.get(0), constants, rowType, groups);
}
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq)
{
return planner.getCostFactory().makeCost(mq.getRowCount(this), 0, 0);
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite.rule.logical;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalProject;
@ -83,6 +84,12 @@ public class DruidLogicalRules
Convention.NONE, Convention.NONE,
DruidLogicalConvention.instance(), DruidLogicalConvention.instance(),
DruidValuesRule.class.getSimpleName() DruidValuesRule.class.getSimpleName()
),
new DruidWindowRule(
Window.class,
Convention.NONE,
DruidLogicalConvention.instance(),
DruidWindowRule.class.getSimpleName()
) )
) )
); );

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rule.logical;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.Window;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalConvention;
import org.apache.druid.sql.calcite.rel.logical.DruidWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
public class DruidWindowRule extends ConverterRule
{
public DruidWindowRule(Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix)
{
super(
Config.INSTANCE
.withConversion(clazz, in, out, descriptionPrefix)
);
}
@Override
public @Nullable RelNode convert(RelNode rel)
{
Window w = (Window) rel;
RelTraitSet newTrait = w.getTraitSet().replace(DruidLogicalConvention.instance());
return new DruidWindow(
w.getCluster(),
newTrait,
w.getHints(),
convert(
w.getInput(),
w.getInput().getTraitSet().replace(DruidLogicalConvention.instance())
),
w.getConstants(),
w.getRowType(),
w.groups
);
}
}

View File

@ -111,6 +111,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.DecoupledTestConfig.NativeQueryIgnore;
import org.apache.druid.sql.calcite.NotYetSupported.Modes; import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.filtration.Filtration;
@ -7605,7 +7606,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@NotYetSupported(Modes.PLAN_MISMATCH)
@Test @Test
public void testExactCountDistinctUsingSubqueryWithWherePushDown() public void testExactCountDistinctUsingSubqueryWithWherePushDown()
{ {
@ -8159,6 +8159,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.AGG_COL_EXCHANGE)
@Test @Test
public void testGroupBySortPushDown() public void testGroupBySortPushDown()
{ {
@ -8254,6 +8255,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.IMPROVED_PLAN)
@Test @Test
public void testGroupByLimitPushdownExtraction() public void testGroupByLimitPushdownExtraction()
{ {
@ -8708,6 +8710,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.SLIGHTLY_WORSE_PLAN)
@SqlTestFrameworkConfig(numMergeBuffers = 3) @SqlTestFrameworkConfig(numMergeBuffers = 3)
@Test @Test
public void testQueryWithSelectProjectAndIdentityProjectDoesNotRename() public void testQueryWithSelectProjectAndIdentityProjectDoesNotRename()
@ -10168,7 +10171,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@Test @Test
public void testGroupByExtractYear() public void testGroupByExtractYear()
{ {
@ -12618,6 +12620,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EXPR_POSTAGG)
@Test @Test
public void testGroupByWithLiteralInSubqueryGrouping() public void testGroupByWithLiteralInSubqueryGrouping()
{ {
@ -12806,6 +12809,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EXPR_POSTAGG)
@Test @Test
public void testRepeatedIdenticalVirtualExpressionGrouping() public void testRepeatedIdenticalVirtualExpressionGrouping()
{ {
@ -14259,7 +14263,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@NotYetSupported(Modes.PLAN_MISMATCH)
@Test @Test
public void testPlanWithInFilterLessThanInSubQueryThreshold() public void testPlanWithInFilterLessThanInSubQueryThreshold()
{ {
@ -14410,6 +14413,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
); );
} }
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.AGGREGATE_REMOVE_NOT_FIRED)
@Test @Test
public void testSubqueryTypeMismatchWithLiterals() public void testSubqueryTypeMismatchWithLiterals()
{ {
@ -15100,7 +15104,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.run(); .run();
} }
@NotYetSupported(Modes.CANNOT_TRANSLATE) @DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.SLIGHTLY_WORSE_PLAN)
@Test @Test
public void testWindowingWithScanAndSort() public void testWindowingWithScanAndSort()
{ {
@ -15200,7 +15204,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.run(); .run();
} }
@NotYetSupported(Modes.CANNOT_TRANSLATE)
@Test @Test
public void testWindowingWithOrderBy() public void testWindowingWithOrderBy()
{ {

View File

@ -35,22 +35,33 @@ public class DecoupledPlanningCalciteQueryTest extends CalciteQueryTest
private static final ImmutableMap<String, Object> CONTEXT_OVERRIDES = ImmutableMap.of( private static final ImmutableMap<String, Object> CONTEXT_OVERRIDES = ImmutableMap.of(
PlannerConfig.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED, PlannerConfig.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED,
QueryContexts.ENABLE_DEBUG, true); QueryContexts.ENABLE_DEBUG, true
);
@Override @Override
protected QueryTestBuilder testBuilder() protected QueryTestBuilder testBuilder()
{ {
return new QueryTestBuilder(
new CalciteTestConfig(CONTEXT_OVERRIDES) CalciteTestConfig testConfig = new CalciteTestConfig(CONTEXT_OVERRIDES)
{ {
@Override @Override
public SqlTestFramework.PlannerFixture plannerFixture(PlannerConfig plannerConfig, AuthConfig authConfig) public SqlTestFramework.PlannerFixture plannerFixture(PlannerConfig plannerConfig, AuthConfig authConfig)
{ {
plannerConfig = plannerConfig.withOverrides(CONTEXT_OVERRIDES); plannerConfig = plannerConfig.withOverrides(CONTEXT_OVERRIDES);
return queryFramework().plannerFixture(DecoupledPlanningCalciteQueryTest.this, plannerConfig, authConfig); return queryFramework().plannerFixture(DecoupledPlanningCalciteQueryTest.this, plannerConfig, authConfig);
} }
}) };
QueryTestBuilder builder = new QueryTestBuilder(testConfig)
.cannotVectorize(cannotVectorize) .cannotVectorize(cannotVectorize)
.skipVectorize(skipVectorize); .skipVectorize(skipVectorize);
DecoupledTestConfig decTestConfig = queryFrameworkRule.getDescription().getAnnotation(DecoupledTestConfig.class);
if (decTestConfig != null && decTestConfig.nativeQueryIgnore().isPresent()) {
builder.verifyNativeQueries(x -> false);
}
return builder;
} }
} }

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import org.apache.calcite.rel.rules.CoreRules;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Specifies test case level matching customizations for decoupled mode.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface DecoupledTestConfig
{
/**
* Enables the framework to ignore native query differences.
*
* The value of this field should describe the root cause of the difference.
*/
NativeQueryIgnore nativeQueryIgnore() default NativeQueryIgnore.NONE;
enum NativeQueryIgnore
{
NONE,
/**
* Decoupled has moved virtualcolumn to postagg (improved plan)
* caused by: {@link CoreRules#AGGREGATE_ANY_PULL_UP_CONSTANTS}
*/
EXPR_POSTAGG,
/**
* Aggregate column order changes.
*
* dim1/dim2 exchange
*/
AGG_COL_EXCHANGE,
/**
* This happens when {@link CoreRules#AGGREGATE_REMOVE} gets supressed by {@link CoreRules#AGGREGATE_CASE_TO_FILTER}
*/
AGGREGATE_REMOVE_NOT_FIRED,
/**
* Improved plan
*
* Seen that it was induced by {@link CoreRules#AGGREGATE_ANY_PULL_UP_CONSTANTS}
*/
IMPROVED_PLAN,
/**
* Worse plan; may loose vectorization; but no extra queries
*/
SLIGHTLY_WORSE_PLAN;
public boolean isPresent()
{
return this != NONE;
}
};
}

View File

@ -81,6 +81,7 @@ public @interface SqlTestFrameworkConfig
private SqlTestFrameworkConfig config; private SqlTestFrameworkConfig config;
private ClassRule classRule; private ClassRule classRule;
private QueryComponentSupplier testHost; private QueryComponentSupplier testHost;
private Description description;
public MethodRule(ClassRule classRule, QueryComponentSupplier testHost) public MethodRule(ClassRule classRule, QueryComponentSupplier testHost)
{ {
@ -105,6 +106,7 @@ public @interface SqlTestFrameworkConfig
@Override @Override
public Statement apply(Statement base, Description description) public Statement apply(Statement base, Description description)
{ {
this.description = description;
config = description.getAnnotation(SqlTestFrameworkConfig.class); config = description.getAnnotation(SqlTestFrameworkConfig.class);
if (config == null) { if (config == null) {
config = defaultConfig(); config = defaultConfig();
@ -117,6 +119,11 @@ public @interface SqlTestFrameworkConfig
return getConfigurationInstance().framework; return getConfigurationInstance().framework;
} }
public Description getDescription()
{
return description;
}
private ConfigurationInstance getConfigurationInstance() private ConfigurationInstance getConfigurationInstance()
{ {
return classRule.configMap.computeIfAbsent(config, this::buildConfiguration); return classRule.configMap.computeIfAbsent(config, this::buildConfiguration);