Support Union in decoupled mode (#15870)

This commit is contained in:
Zoltan Haindrich 2024-02-21 16:54:50 +01:00 committed by GitHub
parent 170d37f188
commit bcce0806d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 749 additions and 449 deletions

View File

@ -1262,6 +1262,12 @@
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>

View File

@ -380,7 +380,6 @@
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.12</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -262,6 +262,11 @@
<artifactId>jdbi</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -248,20 +248,21 @@ public class CalciteRulesManager
),
Programs.sequence(
druidPreProgram,
buildBaseRuleSetProgram(plannerContext),
new LoggingProgram("After baseRuleSet program", isDebug),
buildDecoupledLogicalOptimizationProgram(plannerContext),
new LoggingProgram("After DecoupledLogicalOptimizationProgram program", isDebug),
Programs.ofRules(logicalConventionRuleSet(plannerContext)),
new LoggingProgram("After logical volcano planner program", isDebug)
)
);
}
private Program buildBaseRuleSetProgram(PlannerContext plannerContext)
private Program buildDecoupledLogicalOptimizationProgram(PlannerContext plannerContext)
{
final HepProgramBuilder builder = HepProgram.builder();
builder.addMatchLimit(CalciteRulesManager.HEP_DEFAULT_MATCH_LIMIT);
builder.addGroupBegin();
builder.addRuleCollection(baseRuleSet(plannerContext));
builder.addRuleInstance(CoreRules.UNION_MERGE);
builder.addGroupEnd();
return Programs.of(builder.build(), true, DefaultRelMetadataProvider.INSTANCE);
}

View File

@ -1,354 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalIntersect;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalMatch;
import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rex.RexLiteral;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
import org.apache.druid.sql.calcite.rel.logical.DruidTableScan;
import org.apache.druid.sql.calcite.rule.DruidLogicalValuesRule;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.table.InlineTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Converts a DAG of {@link org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode} convention to a native
* Druid query for execution. The convertion is done via a {@link org.apache.calcite.rel.RelShuttle} visitor
* implementation.
*/
public class DruidQueryGenerator extends RelShuttleImpl
{
private final List<PartialDruidQuery> queryList = new ArrayList<>();
private final List<DruidTable> queryTables = new ArrayList<>();
private final PlannerContext plannerContext;
private PartialDruidQuery partialDruidQuery;
private PartialDruidQuery.Stage currentStage = null;
private DruidTable currentTable = null;
private boolean isRoot = true;
public DruidQueryGenerator(PlannerContext plannerContext)
{
this.plannerContext = plannerContext;
}
@Override
public RelNode visit(TableScan scan)
{
if (!(scan instanceof DruidTableScan)) {
throw new ISE("Planning hasn't converted logical table scan to druid convention");
}
DruidTableScan druidTableScan = (DruidTableScan) scan;
isRoot = false;
RelNode result = super.visit(scan);
partialDruidQuery = PartialDruidQuery.create(scan);
currentStage = PartialDruidQuery.Stage.SCAN;
final RelOptTable table = scan.getTable();
final DruidTable druidTable = table.unwrap(DruidTable.class);
if (druidTable != null) {
currentTable = druidTable;
}
if (druidTableScan.getProject() != null) {
partialDruidQuery = partialDruidQuery.withSelectProject(druidTableScan.getProject());
currentStage = PartialDruidQuery.Stage.SELECT_PROJECT;
}
return result;
}
@Override
public RelNode visit(TableFunctionScan scan)
{
return null;
}
@Override
public RelNode visit(LogicalValues values)
{
isRoot = false;
RelNode result = super.visit(values);
final List<ImmutableList<RexLiteral>> tuples = values.getTuples();
final List<Object[]> objectTuples = tuples
.stream()
.map(tuple -> tuple
.stream()
.map(v -> DruidLogicalValuesRule.getValueFromLiteral(v, plannerContext))
.collect(Collectors.toList())
.toArray(new Object[0])
)
.collect(Collectors.toList());
RowSignature rowSignature = RowSignatures.fromRelDataType(
values.getRowType().getFieldNames(),
values.getRowType()
);
currentTable = new InlineTable(InlineDataSource.fromIterable(objectTuples, rowSignature));
if (currentStage == null) {
partialDruidQuery = PartialDruidQuery.create(values);
currentStage = PartialDruidQuery.Stage.SCAN;
} else {
throw new ISE("Values node found at non leaf node in the plan");
}
return result;
}
@Override
public RelNode visit(LogicalFilter filter)
{
return visitFilter(filter);
}
public RelNode visitFilter(Filter filter)
{
isRoot = false;
RelNode result = super.visit(filter);
if (currentStage == PartialDruidQuery.Stage.AGGREGATE) {
partialDruidQuery = partialDruidQuery.withHavingFilter(filter);
currentStage = PartialDruidQuery.Stage.HAVING_FILTER;
} else if (currentStage == PartialDruidQuery.Stage.SCAN) {
partialDruidQuery = partialDruidQuery.withWhereFilter(filter);
currentStage = PartialDruidQuery.Stage.WHERE_FILTER;
} else if (currentStage == PartialDruidQuery.Stage.SELECT_PROJECT) {
PartialDruidQuery old = partialDruidQuery;
partialDruidQuery = PartialDruidQuery.create(old.getScan());
partialDruidQuery = partialDruidQuery.withWhereFilter(filter);
partialDruidQuery = partialDruidQuery.withSelectProject(old.getSelectProject());
currentStage = PartialDruidQuery.Stage.SELECT_PROJECT;
} else {
queryList.add(partialDruidQuery);
queryTables.add(currentTable);
partialDruidQuery = PartialDruidQuery.createOuterQuery(partialDruidQuery).withWhereFilter(filter);
currentStage = PartialDruidQuery.Stage.WHERE_FILTER;
}
return result;
}
@Override
public RelNode visit(LogicalProject project)
{
return visitProject(project);
}
@Override
public RelNode visit(LogicalJoin join)
{
throw new UnsupportedOperationException("Found join");
}
@Override
public RelNode visit(LogicalCorrelate correlate)
{
return null;
}
@Override
public RelNode visit(LogicalUnion union)
{
throw new UnsupportedOperationException("Found union");
}
@Override
public RelNode visit(LogicalIntersect intersect)
{
return null;
}
@Override
public RelNode visit(LogicalMinus minus)
{
return null;
}
@Override
public RelNode visit(LogicalAggregate aggregate)
{
isRoot = false;
RelNode result = super.visit(aggregate);
if (PartialDruidQuery.Stage.AGGREGATE.canFollow(currentStage)) {
partialDruidQuery = partialDruidQuery.withAggregate(aggregate);
} else {
queryList.add(partialDruidQuery);
queryTables.add(currentTable);
partialDruidQuery = PartialDruidQuery.createOuterQuery(partialDruidQuery).withAggregate(aggregate);
}
currentStage = PartialDruidQuery.Stage.AGGREGATE;
return result;
}
@Override
public RelNode visit(LogicalMatch match)
{
return null;
}
@Override
public RelNode visit(LogicalSort sort)
{
return visitSort(sort);
}
@Override
public RelNode visit(LogicalExchange exchange)
{
return null;
}
private RelNode visitProject(Project project)
{
boolean rootForReal = isRoot;
isRoot = false;
RelNode result = super.visit(project);
if (rootForReal && (currentStage == PartialDruidQuery.Stage.AGGREGATE
|| currentStage == PartialDruidQuery.Stage.HAVING_FILTER)) {
partialDruidQuery = partialDruidQuery.withAggregateProject(project);
currentStage = PartialDruidQuery.Stage.AGGREGATE_PROJECT;
} else if (currentStage == PartialDruidQuery.Stage.SCAN || currentStage == PartialDruidQuery.Stage.WHERE_FILTER) {
partialDruidQuery = partialDruidQuery.withSelectProject(project);
currentStage = PartialDruidQuery.Stage.SELECT_PROJECT;
} else if (currentStage == PartialDruidQuery.Stage.SELECT_PROJECT) {
partialDruidQuery = partialDruidQuery.mergeProject(project);
currentStage = PartialDruidQuery.Stage.SELECT_PROJECT;
} else if (currentStage == PartialDruidQuery.Stage.SORT) {
partialDruidQuery = partialDruidQuery.withSortProject(project);
currentStage = PartialDruidQuery.Stage.SORT_PROJECT;
} else {
queryList.add(partialDruidQuery);
queryTables.add(currentTable);
partialDruidQuery = PartialDruidQuery.createOuterQuery(partialDruidQuery).withSelectProject(project);
currentStage = PartialDruidQuery.Stage.SELECT_PROJECT;
}
return result;
}
private RelNode visitSort(Sort sort)
{
isRoot = false;
RelNode result = super.visit(sort);
if (PartialDruidQuery.Stage.SORT.canFollow(currentStage)) {
partialDruidQuery = partialDruidQuery.withSort(sort);
} else {
queryList.add(partialDruidQuery);
queryTables.add(currentTable);
partialDruidQuery = PartialDruidQuery.createOuterQuery(partialDruidQuery).withSort(sort);
}
currentStage = PartialDruidQuery.Stage.SORT;
return result;
}
private RelNode visitAggregate(Aggregate aggregate)
{
isRoot = false;
RelNode result = super.visit(aggregate);
if (PartialDruidQuery.Stage.AGGREGATE.canFollow(currentStage)) {
partialDruidQuery = partialDruidQuery.withAggregate(aggregate);
} else {
queryList.add(partialDruidQuery);
queryTables.add(currentTable);
partialDruidQuery = PartialDruidQuery.createOuterQuery(partialDruidQuery).withAggregate(aggregate);
}
currentStage = PartialDruidQuery.Stage.AGGREGATE;
return result;
}
@Override
public RelNode visit(RelNode other)
{
if (other instanceof TableScan) {
return visit((TableScan) other);
} else if (other instanceof Project) {
return visitProject((Project) other);
} else if (other instanceof Sort) {
return visitSort((Sort) other);
} else if (other instanceof Aggregate) {
return visitAggregate((Aggregate) other);
} else if (other instanceof Filter) {
return visitFilter((Filter) other);
} else if (other instanceof LogicalValues) {
return visit((LogicalValues) other);
} else if (other instanceof Window) {
return visitWindow((Window) other);
}
throw new UOE("Found unsupported RelNode [%s]", other.getClass().getSimpleName());
}
private RelNode visitWindow(Window other)
{
RelNode result = super.visit(other);
if (!PartialDruidQuery.Stage.WINDOW.canFollow(currentStage)) {
queryList.add(partialDruidQuery);
queryTables.add(currentTable);
partialDruidQuery = PartialDruidQuery.createOuterQuery(partialDruidQuery);
}
partialDruidQuery = partialDruidQuery.withWindow((Window) result);
currentStage = PartialDruidQuery.Stage.WINDOW;
return result;
}
public PartialDruidQuery getPartialDruidQuery()
{
return partialDruidQuery;
}
public List<PartialDruidQuery> getQueryList()
{
return queryList;
}
public List<DruidTable> getQueryTables()
{
return queryTables;
}
public DruidTable getCurrentTable()
{
return currentTable;
}
}

View File

@ -61,11 +61,11 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator;
import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.DruidRel;
@ -241,7 +241,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
}
// Exceptions during rule evaluations could be wrapped inside a RuntimeException by VolcanoRuleCall class.
// This block will extract a user-friendly message from the exception chain.
// This block will extract a user-friendly message from the exception chain.
if (e.getMessage() != null
&& e.getCause() != null
&& e.getCause().getMessage() != null
@ -559,36 +559,14 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
.plus(DruidLogicalConvention.instance()),
newRoot
);
DruidQueryGenerator shuttle = new DruidQueryGenerator(plannerContext);
newRoot.accept(shuttle);
log.info("PartialDruidQuery : " + shuttle.getPartialDruidQuery());
shuttle.getQueryList().add(shuttle.getPartialDruidQuery()); // add topmost query to the list
shuttle.getQueryTables().add(shuttle.getCurrentTable());
assert !shuttle.getQueryList().isEmpty();
log.info("query list size " + shuttle.getQueryList().size());
log.info("query tables size " + shuttle.getQueryTables().size());
// build bottom-most query
DruidQuery baseQuery = shuttle.getQueryList().get(0).build(
shuttle.getQueryTables().get(0).getDataSource(),
shuttle.getQueryTables().get(0).getRowSignature(),
plannerContext,
rexBuilder,
shuttle.getQueryList().size() != 1,
null
);
// build outer queries
for (int i = 1; i < shuttle.getQueryList().size(); i++) {
baseQuery = shuttle.getQueryList().get(i).build(
new QueryDataSource(baseQuery.getQuery()),
baseQuery.getOutputRowSignature(),
plannerContext,
rexBuilder,
false
);
}
DruidQueryGenerator generator = new DruidQueryGenerator(plannerContext, newRoot, rexBuilder);
DruidQuery baseQuery = generator.buildQuery();
try {
log.info("final query : " +
new DefaultObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(baseQuery.getQuery()));
log.info(
"final query : " +
new DefaultObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(baseQuery.getQuery())
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);

View File

@ -0,0 +1,286 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner.querygen;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexBuilder;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.PDQVertexFactory.PDQVertex;
import org.apache.druid.sql.calcite.planner.querygen.InputDescProducer.InputDesc;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery.Stage;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
/**
* Converts a DAG of {@link org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode} convention to a native
* {@link DruidQuery} for execution.
*/
public class DruidQueryGenerator
{
private final RelNode relRoot;
private final PDQVertexFactory vertexFactory;
public DruidQueryGenerator(PlannerContext plannerContext, RelNode relRoot, RexBuilder rexBuilder)
{
this.relRoot = relRoot;
this.vertexFactory = new PDQVertexFactory(plannerContext, rexBuilder);
}
public DruidQuery buildQuery()
{
Vertex vertex = buildVertexFor(relRoot, true);
return vertex.buildQuery(true);
}
private Vertex buildVertexFor(RelNode node, boolean isRoot)
{
List<Vertex> newInputs = new ArrayList<>();
for (RelNode input : node.getInputs()) {
newInputs.add(buildVertexFor(input, false));
}
Vertex vertex = processNodeWithInputs(node, newInputs, isRoot);
return vertex;
}
private Vertex processNodeWithInputs(RelNode node, List<Vertex> newInputs, boolean isRoot)
{
if (node instanceof InputDescProducer) {
return vertexFactory.createVertex(PartialDruidQuery.create(node), newInputs);
}
if (newInputs.size() == 1) {
Vertex inputVertex = newInputs.get(0);
Optional<Vertex> newVertex = inputVertex.extendWith(node, isRoot);
if (newVertex.isPresent()) {
return newVertex.get();
}
inputVertex = vertexFactory.createVertex(
PartialDruidQuery.createOuterQuery(((PDQVertex) inputVertex).partialDruidQuery),
ImmutableList.of(inputVertex)
);
newVertex = inputVertex.extendWith(node, false);
if (newVertex.isPresent()) {
return newVertex.get();
}
}
throw DruidException.defensive().build("Unable to process relNode[%s]", node);
}
/**
* Execution dag vertex - encapsulates a list of operators.
*/
private interface Vertex
{
/**
* Builds the query.
*/
DruidQuery buildQuery(boolean isRoot);
/**
* Extends the current vertex to include the specified parent.
*/
Optional<Vertex> extendWith(RelNode parentNode, boolean isRoot);
/**
* Decides wether this {@link Vertex} can be unwrapped into an {@link InputDesc}.
*/
boolean canUnwrapInput();
/**
* Unwraps this {@link Vertex} into an {@link InputDesc}.
*
* Unwraps the input of this vertex - if it doesn't do anything beyond reading its input.
*
* @throws DruidException if unwrap is not possible.
*/
InputDesc unwrapInputDesc();
}
/**
* {@link PartialDruidQuery} based {@link Vertex} factory.
*/
protected static class PDQVertexFactory
{
private final PlannerContext plannerContext;
private final RexBuilder rexBuilder;
public PDQVertexFactory(PlannerContext plannerContext, RexBuilder rexBuilder)
{
this.plannerContext = plannerContext;
this.rexBuilder = rexBuilder;
}
Vertex createVertex(PartialDruidQuery partialDruidQuery, List<Vertex> inputs)
{
return new PDQVertex(partialDruidQuery, inputs);
}
public class PDQVertex implements Vertex
{
final PartialDruidQuery partialDruidQuery;
final List<Vertex> inputs;
public PDQVertex(PartialDruidQuery partialDruidQuery, List<Vertex> inputs)
{
this.partialDruidQuery = partialDruidQuery;
this.inputs = inputs;
}
@Override
public DruidQuery buildQuery(boolean topLevel)
{
InputDesc input = getInput();
return partialDruidQuery.build(
input.dataSource,
input.rowSignature,
plannerContext,
rexBuilder,
!topLevel
);
}
/**
* Creates the {@link InputDesc} for the current {@link Vertex}.
*/
private InputDesc getInput()
{
List<InputDesc> inputDescs = new ArrayList<>();
for (Vertex inputVertex : inputs) {
final InputDesc desc;
if (inputVertex.canUnwrapInput()) {
desc = inputVertex.unwrapInputDesc();
} else {
DruidQuery inputQuery = inputVertex.buildQuery(false);
desc = new InputDesc(new QueryDataSource(inputQuery.getQuery()), inputQuery.getOutputRowSignature());
}
inputDescs.add(desc);
}
RelNode scan = partialDruidQuery.getScan();
if (scan instanceof InputDescProducer) {
InputDescProducer inp = (InputDescProducer) scan;
return inp.getInputDesc(plannerContext, inputDescs);
}
if (inputs.size() == 1) {
return inputDescs.get(0);
}
throw DruidException.defensive("Unable to create InputDesc for Operator [%s]", scan);
}
/**
* Extends the the current partial query with the new parent if possible.
*/
@Override
public Optional<Vertex> extendWith(RelNode parentNode, boolean isRoot)
{
Optional<PartialDruidQuery> newPartialQuery = extendPartialDruidQuery(parentNode, isRoot);
if (!newPartialQuery.isPresent()) {
return Optional.empty();
}
return Optional.of(createVertex(newPartialQuery.get(), inputs));
}
/**
* Merges the given {@link RelNode} into the current {@link PartialDruidQuery}.
*/
private Optional<PartialDruidQuery> extendPartialDruidQuery(RelNode parentNode, boolean isRoot)
{
if (accepts(parentNode, Stage.WHERE_FILTER, Filter.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withWhereFilter((Filter) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.SELECT_PROJECT, Project.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withSelectProject((Project) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.AGGREGATE, Aggregate.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withAggregate((Aggregate) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.AGGREGATE_PROJECT, Project.class) && isRoot) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withAggregateProject((Project) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.HAVING_FILTER, Filter.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withHavingFilter((Filter) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.SORT, Sort.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withSort((Sort) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.SORT_PROJECT, Project.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withSortProject((Project) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.WINDOW, Window.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withWindow((Window) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.WINDOW_PROJECT, Project.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withWindowProject((Project) parentNode);
return Optional.of(newPartialQuery);
}
return Optional.empty();
}
private boolean accepts(RelNode node, Stage whereFilter, Class<? extends RelNode> class1)
{
return partialDruidQuery.canAccept(whereFilter) && class1.isInstance(node);
}
@Override
public InputDesc unwrapInputDesc()
{
if (canUnwrapInput()) {
DruidQuery q = buildQuery(false);
InputDesc origInput = getInput();
return new InputDesc(origInput.dataSource, q.getOutputRowSignature());
}
throw DruidException.defensive("Can't unwrap input of vertex[%s]", partialDruidQuery);
}
@Override
public boolean canUnwrapInput()
{
if (partialDruidQuery.stage() == Stage.SCAN) {
return true;
}
if (partialDruidQuery.stage() == PartialDruidQuery.Stage.SELECT_PROJECT &&
partialDruidQuery.getWhereFilter() == null &&
partialDruidQuery.getSelectProject().isMapping()) {
return true;
}
return false;
}
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner.querygen;
import org.apache.druid.query.DataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import java.util.List;
/**
* Abstracts away non-trivial input operation handlings between {@link DataSource}s.
*
* Example: TableScan ; Union; Join.
*/
public interface InputDescProducer
{
/**
* Utility class to input related things details.
*
* Main reason to have this was that {@link DataSource} doesn't contain the {@link RowSignature}.
*/
class InputDesc
{
public DataSource dataSource;
public RowSignature rowSignature;
public InputDesc(DataSource dataSource, RowSignature rowSignature)
{
this.dataSource = dataSource;
this.rowSignature = rowSignature;
}
}
InputDesc getInputDesc(PlannerContext plannerContext, List<InputDesc> inputs);
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.rel.logical;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
@ -28,37 +29,35 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.Table;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.InputDescProducer;
import org.apache.druid.sql.calcite.table.DruidTable;
import java.util.List;
/**
* {@link DruidLogicalNode} convention node for {@link TableScan} plan node.
*/
public class DruidTableScan extends TableScan implements DruidLogicalNode
public class DruidTableScan extends TableScan implements DruidLogicalNode, InputDescProducer
{
private final Project project;
public DruidTableScan(
RelOptCluster cluster,
RelTraitSet traitSet,
RelOptTable table,
Project project
RelOptTable table
)
{
super(cluster, traitSet, table);
this.project = project;
assert getConvention() instanceof DruidLogicalConvention;
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs)
{
return new DruidTableScan(getCluster(), traitSet, table, project);
return new DruidTableScan(getCluster(), traitSet, table);
}
@Override
@ -76,26 +75,15 @@ public class DruidTableScan extends TableScan implements DruidLogicalNode
@Override
public RelWriter explainTerms(RelWriter pw)
{
if (project != null) {
project.explainTerms(pw);
}
return super.explainTerms(pw).item("druid", "logical");
}
@Override
public RelDataType deriveRowType()
{
if (project != null) {
return project.getRowType();
}
return super.deriveRowType();
}
public Project getProject()
{
return project;
}
public static DruidTableScan create(RelOptCluster cluster, final RelOptTable relOptTable)
{
final Table table = relOptTable.unwrap(Table.class);
@ -106,6 +94,21 @@ public class DruidTableScan extends TableScan implements DruidLogicalNode
}
return ImmutableList.of();
});
return new DruidTableScan(cluster, traitSet, relOptTable, null);
return new DruidTableScan(cluster, traitSet, relOptTable);
}
@Override
public InputDesc getInputDesc(PlannerContext plannerContext, List<InputDesc> inputs)
{
final DruidTable druidTable = getDruidTable();
return new InputDesc(druidTable.getDataSource(), druidTable.getRowSignature());
}
private DruidTable getDruidTable()
{
final RelOptTable table = getTable();
final DruidTable druidTable = table.unwrap(DruidTable.class);
Preconditions.checkNotNull(druidTable, "DruidTable may not be null");
return druidTable;
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel.logical;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.InputDescProducer;
import java.util.ArrayList;
import java.util.List;
public class DruidUnion extends Union implements DruidLogicalNode, InputDescProducer
{
public DruidUnion(
RelOptCluster cluster,
RelTraitSet traits,
List<RelHint> hints,
List<RelNode> inputs,
boolean all)
{
super(cluster, traits, hints, inputs, all);
}
@Override
public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all)
{
return new DruidUnion(getCluster(), traitSet, hints, inputs, all);
}
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq)
{
return planner.getCostFactory().makeCost(mq.getRowCount(this), 0, 0);
}
@Override
public InputDesc getInputDesc(PlannerContext plannerContext, List<InputDesc> inputs)
{
List<DataSource> dataSources = new ArrayList<>();
RowSignature signature = null;
for (InputDesc inputDesc : inputs) {
checkDataSourceSupported(inputDesc.dataSource);
dataSources.add(inputDesc.dataSource);
if (signature == null) {
signature = inputDesc.rowSignature;
} else {
if (!signature.equals(inputDesc.rowSignature)) {
throw DruidException.defensive(
"Row signature mismatch in Union inputs [%s] and [%s]",
signature,
inputDesc.rowSignature
);
}
}
}
return new InputDesc(new UnionDataSource(dataSources), signature);
}
private void checkDataSourceSupported(DataSource dataSource)
{
if (dataSource instanceof TableDataSource || dataSource instanceof InlineDataSource) {
return;
}
throw DruidException.defensive("Only Table and Values are supported as inputs for Union [%s]", dataSource);
}
}

View File

@ -29,22 +29,31 @@ import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.InputDescProducer;
import org.apache.druid.sql.calcite.rel.CostEstimates;
import org.apache.druid.sql.calcite.rule.DruidLogicalValuesRule;
import org.apache.druid.sql.calcite.table.InlineTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.List;
import java.util.stream.Collectors;
/**
* {@link DruidLogicalNode} convention node for {@link LogicalValues} plan node.
*/
public class DruidValues extends LogicalValues implements DruidLogicalNode
public class DruidValues extends LogicalValues implements DruidLogicalNode, InputDescProducer
{
private InlineTable inlineTable;
public DruidValues(
RelOptCluster cluster,
RelTraitSet traitSet,
RelDataType rowType,
ImmutableList<ImmutableList<RexLiteral>> tuples
)
ImmutableList<ImmutableList<RexLiteral>> tuples)
{
super(cluster, traitSet, rowType, tuples);
assert getConvention() instanceof DruidLogicalConvention;
@ -61,4 +70,36 @@ public class DruidValues extends LogicalValues implements DruidLogicalNode
{
return planner.getCostFactory().makeCost(CostEstimates.COST_BASE, 0, 0);
}
@Override
public InputDesc getInputDesc(PlannerContext plannerContext, List<InputDesc> inputs)
{
if (inlineTable == null) {
inlineTable = buildInlineTable(plannerContext);
}
return new InputDesc(inlineTable.getDataSource(), inlineTable.getRowSignature());
}
private InlineTable buildInlineTable(PlannerContext plannerContext)
{
final List<ImmutableList<RexLiteral>> tuples = getTuples();
final List<Object[]> objectTuples = tuples
.stream()
.map(
tuple -> tuple
.stream()
.map(v -> DruidLogicalValuesRule.getValueFromLiteral(v, plannerContext))
.collect(Collectors.toList())
.toArray(new Object[0])
)
.collect(Collectors.toList());
RowSignature rowSignature = RowSignatures.fromRelDataType(
getRowType().getFieldNames(),
getRowType()
);
InlineTable inlineTable = new InlineTable(InlineDataSource.fromIterable(objectTuples, rowSignature));
return inlineTable;
}
}

View File

@ -34,7 +34,7 @@ public class DruidSortUnionRule extends RelOptRule
{
private static final DruidSortUnionRule INSTANCE = new DruidSortUnionRule();
private DruidSortUnionRule()
{
super(operand(Sort.class, operand(DruidUnionRel.class, any())));

View File

@ -28,8 +28,8 @@ import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalConvention;
@ -51,8 +51,10 @@ public class DruidLogicalRules
return new ArrayList<>(
ImmutableList.of(
new DruidTableScanRule(
RelOptRule.operand(LogicalTableScan.class, null, RelOptRule.any()),
StringUtils.format("%s", DruidTableScanRule.class.getSimpleName())
LogicalTableScan.class,
Convention.NONE,
DruidLogicalConvention.instance(),
DruidTableScanRule.class.getSimpleName()
),
new DruidAggregateRule(
LogicalAggregate.class,
@ -90,6 +92,12 @@ public class DruidLogicalRules
Convention.NONE,
DruidLogicalConvention.instance(),
DruidWindowRule.class.getSimpleName()
),
new DruidUnionRule(
LogicalUnion.class,
Convention.NONE,
DruidLogicalConvention.instance(),
DruidUnionRule.class.getSimpleName()
)
)
);

View File

@ -19,36 +19,38 @@
package org.apache.druid.sql.calcite.rule.logical;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalConvention;
import org.apache.druid.sql.calcite.rel.logical.DruidTableScan;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* {@link ConverterRule} to convert {@link org.apache.calcite.rel.core.TableScan} to {@link DruidTableScan}
*/
public class DruidTableScanRule extends RelOptRule
public class DruidTableScanRule extends ConverterRule
{
public DruidTableScanRule(RelOptRuleOperand operand, String description)
public DruidTableScanRule(Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix)
{
super(operand, description);
super(
Config.INSTANCE
.withConversion(clazz, in, out, descriptionPrefix)
);
}
@Override
public void onMatch(RelOptRuleCall call)
public @Nullable RelNode convert(RelNode rel)
{
LogicalTableScan tableScan = call.rel(0);
LogicalTableScan tableScan = (LogicalTableScan) rel;
RelTraitSet newTrait = tableScan.getTraitSet().replace(DruidLogicalConvention.instance());
DruidTableScan druidTableScan = new DruidTableScan(
tableScan.getCluster(),
newTrait,
tableScan.getTable(),
null
tableScan.getTable()
);
call.transformTo(druidTableScan);
return druidTableScan;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rule.logical;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.Union;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalConvention;
import org.apache.druid.sql.calcite.rel.logical.DruidUnion;
import org.checkerframework.checker.nullness.qual.Nullable;
public class DruidUnionRule extends ConverterRule
{
public DruidUnionRule(Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix)
{
super(
Config.INSTANCE
.withConversion(clazz, in, out, descriptionPrefix)
);
}
@Override
public @Nullable RelNode convert(RelNode rel)
{
Union w = (Union) rel;
RelTraitSet newTrait = w.getTraitSet().replace(DruidLogicalConvention.instance());
return new DruidUnion(
w.getCluster(),
newTrait,
w.getHints(),
convertList(
w.getInputs(),
DruidLogicalConvention.instance()
),
w.all
);
}
}

View File

@ -2939,7 +2939,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.MISSING_UNION_CONVERSION)
@NotYetSupported(Modes.UNION_WITH_COMPLEX_OPERAND)
@Test
public void testUnionAllQueries()
{
@ -2973,7 +2973,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.MISSING_UNION_CONVERSION)
@NotYetSupported(Modes.UNION_WITH_COMPLEX_OPERAND)
@Test
public void testUnionAllQueriesWithLimit()
{
@ -3413,7 +3413,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
* doesn't reset framework once the merge buffers
*/
@SqlTestFrameworkConfig(numMergeBuffers = 3)
@NotYetSupported(Modes.MISSING_UNION_CONVERSION)
@Test
public void testUnionAllSameTableThreeTimes()
{
@ -3458,7 +3457,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
}
@SqlTestFrameworkConfig(numMergeBuffers = 3)
@NotYetSupported(Modes.MISSING_UNION_CONVERSION)
@Test
public void testExactCountDistinctUsingSubqueryOnUnionAllTables()
{
@ -12645,7 +12643,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EXPR_POSTAGG)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.IMPROVED_PLAN)
@Test
public void testGroupByWithLiteralInSubqueryGrouping()
{
@ -12834,7 +12832,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EXPR_POSTAGG)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.IMPROVED_PLAN)
@Test
public void testRepeatedIdenticalVirtualExpressionGrouping()
{

View File

@ -28,6 +28,7 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.hamcrest.MatcherAssert;
@ -140,6 +141,7 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
}
}
@NotYetSupported(Modes.UNION_MORE_STRICT_ROWTYPE_CHECK)
@Test
public void testUnionAllTablesColumnTypeMismatchFloatLong()
{
@ -186,6 +188,7 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionAllTablesColumnTypeMismatchStringLong()
{
@ -203,6 +206,7 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionAllTablesWhenMappingIsRequired()
{
@ -219,6 +223,7 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionIsUnplannable()
{
@ -229,6 +234,7 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionAllTablesWhenCastAndMappingIsRequired()
{
@ -328,6 +334,7 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionAllSameTableTwiceWithDifferentMapping()
{

View File

@ -25,6 +25,7 @@ import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.apache.druid.sql.calcite.util.SqlTestFramework.PlannerComponentSupplier;
import org.junit.Rule;
public class DecoupledPlanningCalciteQueryTest extends CalciteQueryTest
@ -41,14 +42,14 @@ public class DecoupledPlanningCalciteQueryTest extends CalciteQueryTest
@Override
protected QueryTestBuilder testBuilder()
{
PlannerComponentSupplier componentSupplier = this;
CalciteTestConfig testConfig = new CalciteTestConfig(CONTEXT_OVERRIDES)
{
@Override
public SqlTestFramework.PlannerFixture plannerFixture(PlannerConfig plannerConfig, AuthConfig authConfig)
{
plannerConfig = plannerConfig.withOverrides(CONTEXT_OVERRIDES);
return queryFramework().plannerFixture(DecoupledPlanningCalciteQueryTest.this, plannerConfig, authConfig);
return queryFramework().plannerFixture(componentSupplier, plannerConfig, authConfig);
}
};

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.apache.druid.sql.calcite.util.SqlTestFramework.PlannerComponentSupplier;
import org.junit.Rule;
public class DecoupledPlanningCalciteUnionQueryTest extends CalciteUnionQueryTest
{
@Rule(order = 0)
public NotYetSupportedProcessor decoupledIgnoreProcessor = new NotYetSupportedProcessor();
private static final ImmutableMap<String, Object> CONTEXT_OVERRIDES = ImmutableMap.of(
PlannerConfig.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED,
QueryContexts.ENABLE_DEBUG, true
);
@Override
protected QueryTestBuilder testBuilder()
{
PlannerComponentSupplier componentSupplier = this;
CalciteTestConfig testConfig = new CalciteTestConfig(CONTEXT_OVERRIDES)
{
@Override
public SqlTestFramework.PlannerFixture plannerFixture(PlannerConfig plannerConfig, AuthConfig authConfig)
{
plannerConfig = plannerConfig.withOverrides(CONTEXT_OVERRIDES);
return queryFramework().plannerFixture(componentSupplier, plannerConfig, authConfig);
}
};
QueryTestBuilder builder = new QueryTestBuilder(testConfig)
.cannotVectorize(cannotVectorize)
.skipVectorize(skipVectorize);
DecoupledTestConfig decTestConfig = queryFrameworkRule.getDescription().getAnnotation(DecoupledTestConfig.class);
if (decTestConfig != null && decTestConfig.nativeQueryIgnore().isPresent()) {
builder.verifyNativeQueries(x -> false);
}
return builder;
}
}

View File

@ -43,11 +43,6 @@ public @interface DecoupledTestConfig
enum NativeQueryIgnore
{
NONE,
/**
* Decoupled has moved virtualcolumn to postagg (improved plan)
* caused by: {@link CoreRules#AGGREGATE_ANY_PULL_UP_CONSTANTS}
*/
EXPR_POSTAGG,
/**
* Aggregate column order changes.
*
@ -61,7 +56,8 @@ public @interface DecoupledTestConfig
/**
* Improved plan
*
* Seen that it was induced by {@link CoreRules#AGGREGATE_ANY_PULL_UP_CONSTANTS}
* Seen that some are induced by {@link CoreRules#AGGREGATE_ANY_PULL_UP_CONSTANTS}
* And in some cases decoupled has moved virtualcolumn to postagg
*/
IMPROVED_PLAN,
/**

View File

@ -70,16 +70,13 @@ public @interface NotYetSupported
{
PLAN_MISMATCH(AssertionError.class, "AssertionError: query #"),
NOT_ENOUGH_RULES(DruidException.class, "not enough rules"),
CANNOT_CONVERT(DruidException.class, "Cannot convert query parts"),
ERROR_HANDLING(AssertionError.class, "(is <ADMIN> was <OPERATOR>|is <INVALID_INPUT> was <UNCATEGORIZED>|with message a string containing)"),
ERROR_HANDLING(AssertionError.class, "(is <ADMIN> was <(OPERATOR|DEVELOPER)>|is <INVALID_INPUT> was <UNCATEGORIZED>|with message a string containing)"),
EXPRESSION_NOT_GROUPED(DruidException.class, "Expression '[a-z]+' is not being grouped"),
COLUMN_NOT_FOUND(DruidException.class, "CalciteContextException.*Column.*not found in any table"),
NULLS_FIRST_LAST(DruidException.class, "NULLS (FIRST|LAST)"),
BIGINT_TO_DATE(DruidException.class, "BIGINT to type (DATE|TIME)"),
NPE_PLAIN(NullPointerException.class, "java.lang.NullPointerException"),
NPE(DruidException.class, "java.lang.NullPointerException"),
AGGREGATION_NOT_SUPPORT_TYPE(DruidException.class, "Aggregation \\[(MIN|MAX)\\] does not support type \\[STRING\\]"),
MISSING_DESC(DruidException.class, "function signature DESC"),
RESULT_COUNT_MISMATCH(AssertionError.class, "result count:"),
ALLDATA_CSV(DruidException.class, "allData.csv"),
BIGINT_TIME_COMPARE(DruidException.class, "Cannot apply '.' to arguments of type"),
@ -88,10 +85,9 @@ public @interface NotYetSupported
T_ALLTYPES_ISSUES(AssertionError.class, "(t_alltype|allTypsUniq|fewRowsAllData).parquet.*Verifier.verify"),
RESULT_MISMATCH(AssertionError.class, "(assertResultsEquals|AssertionError: column content mismatch)"),
UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering with NULLS (LAST|FIRST)"),
CANNOT_TRANSLATE(DruidException.class, "Cannot translate reference"),
MISSING_UNION_CONVERSION(DruidException.class, "Missing conversions? (is|are) LogicalUnion"),
MISSING_WINDOW_CONVERSION(DruidException.class, "Missing conversions? is Window"),
MISSING_JOIN_CONVERSION(DruidException.class, "Missing conversions? is (Logical)?Join");
MISSING_JOIN_CONVERSION(DruidException.class, "Missing conversions? is (Logical)?Join"),
UNION_WITH_COMPLEX_OPERAND(DruidException.class, "Only Table and Values are supported as inputs for Union"),
UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs");
public Class<? extends Throwable> throwableClass;
public String regex;

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import org.junit.Test;
import org.reflections.Reflections;
import org.reflections.scanners.MethodAnnotationsScanner;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
public class NotYetSupportedUsageTest
{
@Test
public void ensureAllModesUsed()
{
Set<Method> methodsAnnotatedWith = new Reflections("org.apache.druid.sql", new MethodAnnotationsScanner())
.getMethodsAnnotatedWith(NotYetSupported.class);
Set<NotYetSupported.Modes> modes = new HashSet<>(Arrays.asList(NotYetSupported.Modes.values()));
for (Method method : methodsAnnotatedWith) {
NotYetSupported annot = method.getAnnotation(NotYetSupported.class);
modes.remove(annot.value());
}
assertEquals("There are unused modes which should be removed", Collections.emptySet(), modes);
}
}