Support join in decoupled mode (#15957)

* plan join(s) in decoupled mode
* configure DecoupledPlanningCalciteJoinQueryTest
        the test has 593 cases; however there are quite a few parameterized
        from the 107 methods annotated with @Test - 42 is not yet working
 * replace the isRoot hack in DruidQueryGenerator with a logic that instead looks ahead for the next node; and doesn't let the previous node do the Project - this makes it plan more likely than the existing planner
This commit is contained in:
Zoltan Haindrich 2024-03-06 02:10:13 +01:00 committed by GitHub
parent d38703281c
commit 65c3b4d31a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 683 additions and 191 deletions

View File

@ -207,6 +207,16 @@
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>

View File

@ -71,6 +71,7 @@ import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalConvention;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.table.DruidTable;
@ -561,7 +562,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
newRoot
);
DruidQueryGenerator generator = new DruidQueryGenerator(plannerContext, newRoot, rexBuilder);
DruidQueryGenerator generator = new DruidQueryGenerator(plannerContext, (DruidLogicalNode) newRoot, rexBuilder);
DruidQuery baseQuery = generator.buildQuery();
try {
log.info(

View File

@ -31,25 +31,28 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.PDQVertexFactory.PDQVertex;
import org.apache.druid.sql.calcite.planner.querygen.InputDescProducer.InputDesc;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery.Stage;
import org.apache.druid.sql.calcite.rel.logical.DruidAggregate;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
import org.apache.druid.sql.calcite.rel.logical.DruidSort;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Stack;
/**
* Converts a DAG of {@link org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode} convention to a native
* {@link DruidQuery} for execution.
* Converts a DAG of {@link DruidLogicalNode} convention to a native {@link DruidQuery} for execution.
*/
public class DruidQueryGenerator
{
private final RelNode relRoot;
private final DruidLogicalNode relRoot;
private final PDQVertexFactory vertexFactory;
public DruidQueryGenerator(PlannerContext plannerContext, RelNode relRoot, RexBuilder rexBuilder)
public DruidQueryGenerator(PlannerContext plannerContext, DruidLogicalNode relRoot, RexBuilder rexBuilder)
{
this.relRoot = relRoot;
this.vertexFactory = new PDQVertexFactory(plannerContext, rexBuilder);
@ -57,28 +60,34 @@ public class DruidQueryGenerator
public DruidQuery buildQuery()
{
Vertex vertex = buildVertexFor(relRoot, true);
Stack<DruidLogicalNode> stack = new Stack<>();
stack.push(relRoot);
Vertex vertex = buildVertexFor(stack);
return vertex.buildQuery(true);
}
private Vertex buildVertexFor(RelNode node, boolean isRoot)
private Vertex buildVertexFor(Stack<DruidLogicalNode> stack)
{
List<Vertex> newInputs = new ArrayList<>();
for (RelNode input : node.getInputs()) {
newInputs.add(buildVertexFor(input, false));
for (RelNode input : stack.peek().getInputs()) {
stack.push((DruidLogicalNode) input);
newInputs.add(buildVertexFor(stack));
stack.pop();
}
Vertex vertex = processNodeWithInputs(node, newInputs, isRoot);
Vertex vertex = processNodeWithInputs(stack, newInputs);
return vertex;
}
private Vertex processNodeWithInputs(RelNode node, List<Vertex> newInputs, boolean isRoot)
private Vertex processNodeWithInputs(Stack<DruidLogicalNode> stack, List<Vertex> newInputs)
{
if (node instanceof InputDescProducer) {
DruidLogicalNode node = stack.peek();
if (node instanceof SourceDescProducer) {
return vertexFactory.createVertex(PartialDruidQuery.create(node), newInputs);
}
if (newInputs.size() == 1) {
Vertex inputVertex = newInputs.get(0);
Optional<Vertex> newVertex = inputVertex.extendWith(node, isRoot);
Optional<Vertex> newVertex = inputVertex.extendWith(stack);
if (newVertex.isPresent()) {
return newVertex.get();
}
@ -86,7 +95,7 @@ public class DruidQueryGenerator
PartialDruidQuery.createOuterQuery(((PDQVertex) inputVertex).partialDruidQuery),
ImmutableList.of(inputVertex)
);
newVertex = inputVertex.extendWith(node, false);
newVertex = inputVertex.extendWith(stack);
if (newVertex.isPresent()) {
return newVertex.get();
}
@ -107,21 +116,21 @@ public class DruidQueryGenerator
/**
* Extends the current vertex to include the specified parent.
*/
Optional<Vertex> extendWith(RelNode parentNode, boolean isRoot);
Optional<Vertex> extendWith(Stack<DruidLogicalNode> stack);
/**
* Decides wether this {@link Vertex} can be unwrapped into an {@link InputDesc}.
* Decides wether this {@link Vertex} can be unwrapped into an {@link SourceDesc}.
*/
boolean canUnwrapInput();
boolean canUnwrapSourceDesc();
/**
* Unwraps this {@link Vertex} into an {@link InputDesc}.
* Unwraps this {@link Vertex} into an {@link SourceDesc}.
*
* Unwraps the input of this vertex - if it doesn't do anything beyond reading its input.
* Unwraps the source of this vertex - if it doesn't do anything beyond reading its input.
*
* @throws DruidException if unwrap is not possible.
*/
InputDesc unwrapInputDesc();
SourceDesc unwrapSourceDesc();
}
/**
@ -157,10 +166,10 @@ public class DruidQueryGenerator
@Override
public DruidQuery buildQuery(boolean topLevel)
{
InputDesc input = getInput();
SourceDesc source = getSource();
return partialDruidQuery.build(
input.dataSource,
input.rowSignature,
source.dataSource,
source.rowSignature,
plannerContext,
rexBuilder,
!topLevel
@ -168,39 +177,39 @@ public class DruidQueryGenerator
}
/**
* Creates the {@link InputDesc} for the current {@link Vertex}.
* Creates the {@link SourceDesc} for the current {@link Vertex}.
*/
private InputDesc getInput()
private SourceDesc getSource()
{
List<InputDesc> inputDescs = new ArrayList<>();
List<SourceDesc> sourceDescs = new ArrayList<>();
for (Vertex inputVertex : inputs) {
final InputDesc desc;
if (inputVertex.canUnwrapInput()) {
desc = inputVertex.unwrapInputDesc();
final SourceDesc desc;
if (inputVertex.canUnwrapSourceDesc()) {
desc = inputVertex.unwrapSourceDesc();
} else {
DruidQuery inputQuery = inputVertex.buildQuery(false);
desc = new InputDesc(new QueryDataSource(inputQuery.getQuery()), inputQuery.getOutputRowSignature());
desc = new SourceDesc(new QueryDataSource(inputQuery.getQuery()), inputQuery.getOutputRowSignature());
}
inputDescs.add(desc);
sourceDescs.add(desc);
}
RelNode scan = partialDruidQuery.getScan();
if (scan instanceof InputDescProducer) {
InputDescProducer inp = (InputDescProducer) scan;
return inp.getInputDesc(plannerContext, inputDescs);
if (scan instanceof SourceDescProducer) {
SourceDescProducer inp = (SourceDescProducer) scan;
return inp.getSourceDesc(plannerContext, sourceDescs);
}
if (inputs.size() == 1) {
return inputDescs.get(0);
return sourceDescs.get(0);
}
throw DruidException.defensive("Unable to create InputDesc for Operator [%s]", scan);
throw DruidException.defensive("Unable to create SourceDesc for Operator [%s]", scan);
}
/**
* Extends the the current partial query with the new parent if possible.
*/
@Override
public Optional<Vertex> extendWith(RelNode parentNode, boolean isRoot)
public Optional<Vertex> extendWith(Stack<DruidLogicalNode> stack)
{
Optional<PartialDruidQuery> newPartialQuery = extendPartialDruidQuery(parentNode, isRoot);
Optional<PartialDruidQuery> newPartialQuery = extendPartialDruidQuery(stack);
if (!newPartialQuery.isPresent()) {
return Optional.empty();
}
@ -210,65 +219,81 @@ public class DruidQueryGenerator
/**
* Merges the given {@link RelNode} into the current {@link PartialDruidQuery}.
*/
private Optional<PartialDruidQuery> extendPartialDruidQuery(RelNode parentNode, boolean isRoot)
private Optional<PartialDruidQuery> extendPartialDruidQuery(Stack<DruidLogicalNode> stack)
{
if (accepts(parentNode, Stage.WHERE_FILTER, Filter.class)) {
DruidLogicalNode parentNode = stack.peek();
if (accepts(stack, Stage.WHERE_FILTER, Filter.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withWhereFilter((Filter) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.SELECT_PROJECT, Project.class)) {
if (accepts(stack, Stage.SELECT_PROJECT, Project.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withSelectProject((Project) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.AGGREGATE, Aggregate.class)) {
if (accepts(stack, Stage.AGGREGATE, Aggregate.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withAggregate((Aggregate) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.AGGREGATE_PROJECT, Project.class) && isRoot) {
if (accepts(stack, Stage.AGGREGATE_PROJECT, Project.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withAggregateProject((Project) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.HAVING_FILTER, Filter.class)) {
if (accepts(stack, Stage.HAVING_FILTER, Filter.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withHavingFilter((Filter) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.SORT, Sort.class)) {
if (accepts(stack, Stage.SORT, Sort.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withSort((Sort) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.SORT_PROJECT, Project.class)) {
if (accepts(stack, Stage.SORT_PROJECT, Project.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withSortProject((Project) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.WINDOW, Window.class)) {
if (accepts(stack, Stage.WINDOW, Window.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withWindow((Window) parentNode);
return Optional.of(newPartialQuery);
}
if (accepts(parentNode, Stage.WINDOW_PROJECT, Project.class)) {
if (accepts(stack, Stage.WINDOW_PROJECT, Project.class)) {
PartialDruidQuery newPartialQuery = partialDruidQuery.withWindowProject((Project) parentNode);
return Optional.of(newPartialQuery);
}
return Optional.empty();
}
private boolean accepts(RelNode node, Stage whereFilter, Class<? extends RelNode> class1)
private boolean accepts(Stack<DruidLogicalNode> stack, Stage stage, Class<? extends RelNode> clazz)
{
return partialDruidQuery.canAccept(whereFilter) && class1.isInstance(node);
DruidLogicalNode currentNode = stack.peek();
if (Project.class == clazz && stack.size() >= 2) {
// peek at parent and postpone project for next query stage
DruidLogicalNode parentNode = stack.get(stack.size() - 2);
if (stage.ordinal() > Stage.AGGREGATE.ordinal()
&& parentNode instanceof DruidAggregate
&& !partialDruidQuery.canAccept(Stage.AGGREGATE)) {
return false;
}
if (stage.ordinal() > Stage.SORT.ordinal()
&& parentNode instanceof DruidSort
&& !partialDruidQuery.canAccept(Stage.SORT)) {
return false;
}
}
return partialDruidQuery.canAccept(stage) && clazz.isInstance(currentNode);
}
@Override
public InputDesc unwrapInputDesc()
public SourceDesc unwrapSourceDesc()
{
if (canUnwrapInput()) {
if (canUnwrapSourceDesc()) {
DruidQuery q = buildQuery(false);
InputDesc origInput = getInput();
return new InputDesc(origInput.dataSource, q.getOutputRowSignature());
SourceDesc origInput = getSource();
return new SourceDesc(origInput.dataSource, q.getOutputRowSignature());
}
throw DruidException.defensive("Can't unwrap input of vertex[%s]", partialDruidQuery);
throw DruidException.defensive("Can't unwrap source of vertex[%s]", partialDruidQuery);
}
@Override
public boolean canUnwrapInput()
public boolean canUnwrapSourceDesc()
{
if (partialDruidQuery.stage() == Stage.SCAN) {
return true;

View File

@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite.planner.querygen;
import org.apache.druid.query.DataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import java.util.List;
@ -30,24 +31,31 @@ import java.util.List;
*
* Example: TableScan ; Union; Join.
*/
public interface InputDescProducer
public interface SourceDescProducer
{
/**
* Utility class to input related things details.
*
* Main reason to have this was that {@link DataSource} doesn't contain the {@link RowSignature}.
*/
class InputDesc
class SourceDesc
{
public DataSource dataSource;
public RowSignature rowSignature;
public final DataSource dataSource;
public final RowSignature rowSignature;
public final VirtualColumnRegistry virtualColumnRegistry;
public InputDesc(DataSource dataSource, RowSignature rowSignature)
public SourceDesc(DataSource dataSource, RowSignature rowSignature)
{
this(dataSource, rowSignature, null);
}
public SourceDesc(DataSource dataSource, RowSignature rowSignature, VirtualColumnRegistry virtualColumnRegistry)
{
this.dataSource = dataSource;
this.rowSignature = rowSignature;
this.virtualColumnRegistry = virtualColumnRegistry;
}
}
InputDesc getInputDesc(PlannerContext plannerContext, List<InputDesc> inputs);
SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources);
}

View File

@ -55,9 +55,11 @@ import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@ -142,19 +144,13 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
);
}
@Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
private SourceDesc buildLeftSourceDesc()
{
final SourceDesc leftDesc;
final DruidRel<?> leftDruidRel = (DruidRel<?>) left;
final DruidQuery leftQuery = Preconditions.checkNotNull(leftDruidRel.toDruidQuery(false), "leftQuery");
final RowSignature leftSignature = leftQuery.getOutputRowSignature();
final DataSource leftDataSource;
final DruidRel<?> rightDruidRel = (DruidRel<?>) right;
final DruidQuery rightQuery = Preconditions.checkNotNull(rightDruidRel.toDruidQuery(false), "rightQuery");
final RowSignature rightSignature = rightQuery.getOutputRowSignature();
final DataSource rightDataSource;
if (computeLeftRequiresSubquery(getPlannerContext(), leftDruidRel)) {
leftDataSource = new QueryDataSource(leftQuery.getQuery());
if (leftFilter != null) {
@ -163,37 +159,54 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
} else {
leftDataSource = leftQuery.getDataSource();
}
leftDesc = new SourceDesc(leftDataSource, leftSignature);
return leftDesc;
}
private SourceDesc buildRightSourceDesc()
{
final SourceDesc rightDesc;
final DruidRel<?> rightDruidRel = (DruidRel<?>) right;
final DruidQuery rightQuery = Preconditions.checkNotNull(rightDruidRel.toDruidQuery(false), "rightQuery");
final RowSignature rightSignature = rightQuery.getOutputRowSignature();
final DataSource rightDataSource;
if (computeRightRequiresSubquery(getPlannerContext(), rightDruidRel)) {
rightDataSource = new QueryDataSource(rightQuery.getQuery());
} else {
rightDataSource = rightQuery.getDataSource();
}
rightDesc = new SourceDesc(rightDataSource, rightSignature);
return rightDesc;
}
public static SourceDesc buildJoinSourceDesc(final SourceDesc leftDesc, final SourceDesc rightDesc, PlannerContext plannerContext, Join joinRel, Filter leftFilter)
{
final Pair<String, RowSignature> prefixSignaturePair = computeJoinRowSignature(
leftSignature,
rightSignature,
findExistingJoinPrefixes(leftDataSource, rightDataSource)
leftDesc.rowSignature,
rightDesc.rowSignature,
findExistingJoinPrefixes(leftDesc.dataSource, rightDesc.dataSource)
);
String prefix = prefixSignaturePair.lhs;
RowSignature signature = prefixSignaturePair.rhs;
VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
prefixSignaturePair.rhs,
getPlannerContext().getExpressionParser(),
getPlannerContext().getPlannerConfig().isForceExpressionVirtualColumns()
signature,
plannerContext.getExpressionParser(),
plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
);
getPlannerContext().setJoinExpressionVirtualColumnRegistry(virtualColumnRegistry);
plannerContext.setJoinExpressionVirtualColumnRegistry(virtualColumnRegistry);
// Generate the condition for this join as a Druid expression.
final DruidExpression condition = Expressions.toDruidExpression(
getPlannerContext(),
prefixSignaturePair.rhs,
plannerContext,
signature,
joinRel.getCondition()
);
// Unsetting it to avoid any VC Registry leaks incase there are multiple druid quries for the SQL
// It should be fixed soon with changes in interface for SqlOperatorConversion and Expressions bridge class
getPlannerContext().setJoinExpressionVirtualColumnRegistry(null);
plannerContext.setJoinExpressionVirtualColumnRegistry(null);
// DruidJoinRule should not have created us if "condition" is null. Check defensively anyway, which also
// quiets static code analysis.
@ -201,25 +214,40 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
throw new CannotBuildQueryException(joinRel, joinRel.getCondition());
}
return partialQuery.build(
JoinDataSource.create(
leftDataSource,
rightDataSource,
prefixSignaturePair.lhs,
JoinDataSource joinDataSource = JoinDataSource.create(
leftDesc.dataSource,
rightDesc.dataSource,
prefix,
JoinConditionAnalysis.forExpression(
condition.getExpression(),
getPlannerContext().parseExpression(condition.getExpression()),
prefixSignaturePair.lhs
plannerContext.parseExpression(condition.getExpression()),
prefix
),
toDruidJoinType(joinRel.getJoinType()),
getDimFilter(getPlannerContext(), leftSignature, leftFilter),
getPlannerContext().getJoinableFactoryWrapper()
),
prefixSignaturePair.rhs,
getDimFilter(plannerContext, leftDesc.rowSignature, leftFilter),
plannerContext.getJoinableFactoryWrapper()
);
SourceDesc sourceDesc = new SourceDesc(joinDataSource, signature, virtualColumnRegistry);
return sourceDesc;
}
@Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final SourceDesc leftDesc = buildLeftSourceDesc();
final SourceDesc rightDesc = buildRightSourceDesc();
SourceDesc sourceDesc = buildJoinSourceDesc(leftDesc, rightDesc, getPlannerContext(), joinRel, leftFilter);
return partialQuery.build(
sourceDesc.dataSource,
sourceDesc.rowSignature,
getPlannerContext(),
getCluster().getRexBuilder(),
finalizeAggregations,
virtualColumnRegistry
sourceDesc.virtualColumnRegistry
);
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel.logical;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
import java.util.List;
import java.util.Set;
public class DruidJoin extends Join implements DruidLogicalNode, SourceDescProducer
{
public DruidJoin(RelOptCluster cluster,
RelTraitSet traitSet,
List<RelHint> hints,
RelNode left,
RelNode right,
RexNode condition,
Set<CorrelationId> variablesSet,
JoinRelType joinType)
{
super(cluster, traitSet, hints, left, right, condition, variablesSet, joinType);
}
@Override
public Join copy(
RelTraitSet traitSet,
RexNode conditionExpr,
RelNode left,
RelNode right,
JoinRelType joinType,
boolean semiJoinDone)
{
return new DruidJoin(getCluster(), traitSet, hints, left, right, conditionExpr, variablesSet, joinType);
}
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq)
{
return planner.getCostFactory().makeCost(mq.getRowCount(this), 0, 0);
}
@Override
public SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources)
{
SourceDesc leftDesc = sources.get(0);
SourceDesc rightDesc = sources.get(1);
return DruidJoinQueryRel.buildJoinSourceDesc(leftDesc, rightDesc, plannerContext, this, null);
}
}

View File

@ -34,7 +34,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.Table;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.InputDescProducer;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
import org.apache.druid.sql.calcite.table.DruidTable;
import java.util.List;
@ -42,7 +42,7 @@ import java.util.List;
/**
* {@link DruidLogicalNode} convention node for {@link TableScan} plan node.
*/
public class DruidTableScan extends TableScan implements DruidLogicalNode, InputDescProducer
public class DruidTableScan extends TableScan implements DruidLogicalNode, SourceDescProducer
{
public DruidTableScan(
RelOptCluster cluster,
@ -98,10 +98,10 @@ public class DruidTableScan extends TableScan implements DruidLogicalNode, Input
}
@Override
public InputDesc getInputDesc(PlannerContext plannerContext, List<InputDesc> inputs)
public SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources)
{
final DruidTable druidTable = getDruidTable();
return new InputDesc(druidTable.getDataSource(), druidTable.getRowSignature());
return new SourceDesc(druidTable.getDataSource(), druidTable.getRowSignature());
}
private DruidTable getDruidTable()

View File

@ -35,11 +35,11 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.InputDescProducer;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
import java.util.ArrayList;
import java.util.List;
public class DruidUnion extends Union implements DruidLogicalNode, InputDescProducer
public class DruidUnion extends Union implements DruidLogicalNode, SourceDescProducer
{
public DruidUnion(
RelOptCluster cluster,
@ -64,26 +64,26 @@ public class DruidUnion extends Union implements DruidLogicalNode, InputDescProd
}
@Override
public InputDesc getInputDesc(PlannerContext plannerContext, List<InputDesc> inputs)
public SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources)
{
List<DataSource> dataSources = new ArrayList<>();
RowSignature signature = null;
for (InputDesc inputDesc : inputs) {
checkDataSourceSupported(inputDesc.dataSource);
dataSources.add(inputDesc.dataSource);
for (SourceDesc sourceDesc : sources) {
checkDataSourceSupported(sourceDesc.dataSource);
dataSources.add(sourceDesc.dataSource);
if (signature == null) {
signature = inputDesc.rowSignature;
signature = sourceDesc.rowSignature;
} else {
if (!signature.equals(inputDesc.rowSignature)) {
if (!signature.equals(sourceDesc.rowSignature)) {
throw DruidException.defensive(
"Row signature mismatch in Union inputs [%s] and [%s]",
signature,
inputDesc.rowSignature
sourceDesc.rowSignature
);
}
}
}
return new InputDesc(new UnionDataSource(dataSources), signature);
return new SourceDesc(new UnionDataSource(dataSources), signature);
}
private void checkDataSourceSupported(DataSource dataSource)

View File

@ -32,7 +32,7 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.InputDescProducer;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
import org.apache.druid.sql.calcite.rel.CostEstimates;
import org.apache.druid.sql.calcite.rule.DruidLogicalValuesRule;
import org.apache.druid.sql.calcite.table.InlineTable;
@ -44,7 +44,7 @@ import java.util.stream.Collectors;
/**
* {@link DruidLogicalNode} convention node for {@link LogicalValues} plan node.
*/
public class DruidValues extends LogicalValues implements DruidLogicalNode, InputDescProducer
public class DruidValues extends LogicalValues implements DruidLogicalNode, SourceDescProducer
{
private InlineTable inlineTable;
@ -72,12 +72,12 @@ public class DruidValues extends LogicalValues implements DruidLogicalNode, Inpu
}
@Override
public InputDesc getInputDesc(PlannerContext plannerContext, List<InputDesc> inputs)
public SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources)
{
if (inlineTable == null) {
inlineTable = buildInlineTable(plannerContext);
}
return new InputDesc(inlineTable.getDataSource(), inlineTable.getRowSignature());
return new SourceDesc(inlineTable.getDataSource(), inlineTable.getRowSignature());
}
private InlineTable buildInlineTable(PlannerContext plannerContext)

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.rule;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.calcite.plan.RelOptRule;
@ -47,6 +48,7 @@ import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
@ -125,6 +127,7 @@ public class DruidJoinRule extends RelOptRule
join.getLeft().getRowType(),
rexBuilder
);
plannerContext.setPlanningError(conditionAnalysis.errorStr);
final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (left instanceof DruidQueryRel);
if (!plannerContext.getJoinAlgorithm().requiresSubquery()
@ -247,6 +250,7 @@ public class DruidJoinRule extends RelOptRule
)
{
ConditionAnalysis conditionAnalysis = analyzeCondition(condition, leftRowType, rexBuilder);
plannerContext.setPlanningError(conditionAnalysis.errorStr);
// if the right side requires a subquery, then even lookup will be transformed to a QueryDataSource
// thereby allowing join conditions on both k and v columns of the lookup
if (right != null
@ -304,12 +308,15 @@ public class DruidJoinRule extends RelOptRule
private final Set<RexInputRef> rightColumns;
public final String errorStr;
ConditionAnalysis(
int numLeftFields,
List<RexEquality> equalitySubConditions,
List<RexLiteral> literalSubConditions,
List<RexNode> unsupportedOnSubConditions,
Set<RexInputRef> rightColumns
Set<RexInputRef> rightColumns,
String errorStr
)
{
this.numLeftFields = numLeftFields;
@ -317,6 +324,7 @@ public class DruidJoinRule extends RelOptRule
this.literalSubConditions = literalSubConditions;
this.unsupportedOnSubConditions = unsupportedOnSubConditions;
this.rightColumns = rightColumns;
this.errorStr = errorStr;
}
public ConditionAnalysis pushThroughLeftProject(final Project leftProject)
@ -340,7 +348,8 @@ public class DruidJoinRule extends RelOptRule
.collect(Collectors.toList()),
literalSubConditions,
unsupportedOnSubConditions,
rightColumns
rightColumns,
null
);
}
@ -369,7 +378,8 @@ public class DruidJoinRule extends RelOptRule
.collect(Collectors.toList()),
literalSubConditions,
unsupportedOnSubConditions,
rightColumns
rightColumns,
null
);
}
@ -429,7 +439,7 @@ public class DruidJoinRule extends RelOptRule
* that can be extracted into post join filter.
* {@code f(LeftRel) = RightColumn}, then return a {@link ConditionAnalysis}.
*/
public ConditionAnalysis analyzeCondition(
public static ConditionAnalysis analyzeCondition(
final RexNode condition,
final RelDataType leftRowType,
final RexBuilder rexBuilder
@ -441,6 +451,7 @@ public class DruidJoinRule extends RelOptRule
final List<RexNode> unSupportedSubConditions = new ArrayList<>();
final Set<RexInputRef> rightColumns = new HashSet<>();
final int numLeftFields = leftRowType.getFieldCount();
final List<String> errors = new ArrayList<String>();
for (RexNode subCondition : subConditions) {
if (RexUtil.isLiteral(subCondition, true)) {
@ -475,10 +486,12 @@ public class DruidJoinRule extends RelOptRule
comparisonKind = SqlKind.EQUALS;
if (!SqlTypeName.BOOLEAN_TYPES.contains(secondOperand.getType().getSqlTypeName())) {
plannerContext.setPlanningError(
errors.add(
StringUtils.format(
"SQL requires a join with '%s' condition where the column is of the type %s, that is not supported",
subCondition.getKind(),
secondOperand.getType().getSqlTypeName()
)
);
unSupportedSubConditions.add(subCondition);
continue;
@ -492,9 +505,11 @@ public class DruidJoinRule extends RelOptRule
comparisonKind = subCondition.getKind();
} else {
// If it's not EQUALS or a BOOLEAN input ref, it's not supported.
plannerContext.setPlanningError(
errors.add(
StringUtils.format(
"SQL requires a join with '%s' condition that is not supported.",
subCondition.getKind()
)
);
unSupportedSubConditions.add(subCondition);
continue;
@ -509,17 +524,27 @@ public class DruidJoinRule extends RelOptRule
rightColumns.add((RexInputRef) firstOperand);
} else {
// Cannot handle this condition.
plannerContext.setPlanningError("SQL is resulting in a join that has unsupported operand types.");
errors.add(
StringUtils.format(
"SQL is resulting in a join that has unsupported operand types."
)
);
unSupportedSubConditions.add(subCondition);
}
}
final String errorStr;
if (errors.size() > 0) {
errorStr = Joiner.on('\n').join(errors);
} else {
errorStr = null;
}
return new ConditionAnalysis(
numLeftFields,
equalitySubConditions,
literalSubConditions,
unSupportedSubConditions,
rightColumns
rightColumns,
errorStr
);
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rule.logical;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.Join;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.sql.calcite.rel.logical.DruidJoin;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalConvention;
import org.apache.druid.sql.calcite.rule.DruidJoinRule.ConditionAnalysis;
import org.checkerframework.checker.nullness.qual.Nullable;
public class DruidJoinRule extends ConverterRule
{
public DruidJoinRule(Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix)
{
super(Config.INSTANCE.withConversion(clazz, in, out, descriptionPrefix));
}
@Override
public @Nullable RelNode convert(RelNode rel)
{
Join join = (Join) rel;
RelTraitSet newTrait = join.getTraitSet().replace(DruidLogicalConvention.instance());
ConditionAnalysis analysis = org.apache.druid.sql.calcite.rule.DruidJoinRule.analyzeCondition(
join.getCondition(),
join.getLeft().getRowType(),
join.getCluster().getRexBuilder()
);
if (analysis.errorStr != null) {
// reject the query in case the anaysis detected any issues
throw InvalidSqlInput.exception(analysis.errorStr);
}
return new DruidJoin(
join.getCluster(),
newTrait,
join.getHints(),
convert(
join.getLeft(),
DruidLogicalConvention.instance()
),
convert(
join.getRight(),
DruidLogicalConvention.instance()
),
join.getCondition(),
join.getVariablesSet(),
join.getJoinType()
);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
@ -98,6 +99,12 @@ public class DruidLogicalRules
Convention.NONE,
DruidLogicalConvention.instance(),
DruidUnionRule.class.getSimpleName()
),
new DruidJoinRule(
LogicalJoin.class,
Convention.NONE,
DruidLogicalConvention.instance(),
DruidJoinRule.class.getSimpleName()
)
)
);

View File

@ -31,6 +31,8 @@ import org.apache.commons.text.StringEscapeUtils;
import org.apache.druid.annotations.UsedByJUnitParamsRunner;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidException.Category;
import org.apache.druid.error.DruidException.Persona;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.hll.VersionOneHyperLogLogCollector;
@ -126,6 +128,7 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -770,18 +773,21 @@ public class BaseCalciteQueryTest extends CalciteTestBase
catch (DruidException e) {
MatcherAssert.assertThat(
e,
new DruidExceptionMatcher(DruidException.Persona.ADMIN, DruidException.Category.INVALID_INPUT, "general")
.expectMessageIs(
StringUtils.format(
"Query could not be planned. A possible reason is [%s]",
expectedError
)
)
buildUnplannableExceptionMatcher().expectMessageContains(expectedError)
);
}
catch (Exception e) {
log.error(e, "Expected DruidException for query: %s", sql);
Assert.fail(sql);
throw e;
}
}
private DruidExceptionMatcher buildUnplannableExceptionMatcher()
{
if (testBuilder().isDecoupledMode()) {
return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput");
} else {
return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general");
}
}

View File

@ -85,6 +85,8 @@ import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ListFilteredVirtualColumn;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.security.Access;
import org.apache.druid.sql.calcite.DecoupledTestConfig.NativeQueryIgnore;
import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -188,6 +190,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
// to compute the query with limit 1.
@SqlTestFrameworkConfig(minTopNThreshold = 1)
@Test
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EQUIV_PLAN)
public void testExactTopNOnInnerJoinWithLimit()
{
Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
@ -236,6 +239,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.STACK_OVERFLOW)
public void testJoinOuterGroupByAndSubqueryHasLimit()
{
// Cannot vectorize JOIN operator.
@ -323,6 +327,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testJoinOuterGroupByAndSubqueryNoLimit(Map<String, Object> queryContext)
{
// Fully removing the join allows this query to vectorize.
@ -406,6 +411,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testJoinWithLimitBeforeJoining()
{
// Cannot vectorize JOIN operator.
@ -492,6 +498,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_FILTER_LOCATIONS)
public void testJoinOnTimeseriesWithFloorOnTime()
{
// Cannot vectorize JOIN operator.
@ -546,6 +553,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_FILTER_LOCATIONS)
public void testJoinOnGroupByInsteadOfTimeseriesWithFloorOnTime()
{
// Cannot vectorize JOIN operator.
@ -612,6 +620,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_FILTER_LOCATIONS)
public void testJoinOnGroupByInsteadOfTimeseriesWithFloorOnTimeWithNoAggregateMultipleValues()
{
// Cannot vectorize JOIN operator.
@ -679,6 +688,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.CANNOT_JOIN_LOOKUP_NON_KEY)
public void testFilterAndGroupByLookupUsingJoinOperatorWithValueFilterPushdownMatchesNothing(Map<String, Object> queryContext)
{
@ -760,6 +770,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testFilterAndGroupByLookupUsingJoinOperatorBackwards(Map<String, Object> queryContext)
{
// Like "testFilterAndGroupByLookupUsingJoinOperator", but with the table and lookup reversed.
@ -815,6 +826,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testFilterAndGroupByLookupUsingJoinOperatorWithNotFilter(Map<String, Object> queryContext)
{
@ -857,6 +869,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinUnionTablesOnLookup(Map<String, Object> queryContext)
@ -911,6 +924,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.CANNOT_JOIN_LOOKUP_NON_KEY)
public void testFilterAndGroupByLookupUsingJoinOperator(Map<String, Object> queryContext)
{
// Cannot vectorize JOIN operator.
@ -1120,6 +1134,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_UNSUPORTED_OPERAND)
public void testInnerJoinTableLookupLookupWithFilterWithOuterLimit(Map<String, Object> queryContext)
{
testQuery(
@ -1163,6 +1178,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_UNSUPORTED_OPERAND)
public void testInnerJoinTableLookupLookupWithFilterWithoutLimit(Map<String, Object> queryContext)
{
testQuery(
@ -1204,6 +1220,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_UNSUPORTED_OPERAND)
public void testInnerJoinTableLookupLookupWithFilterWithOuterLimitWithAllColumns(Map<String, Object> queryContext)
{
@ -1248,6 +1265,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_UNSUPORTED_OPERAND)
public void testInnerJoinTableLookupLookupWithFilterWithoutLimitWithAllColumns(Map<String, Object> queryContext)
{
testQuery(
@ -1289,6 +1307,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_UNSUPORTED_OPERAND)
public void testManyManyInnerJoinOnManyManyLookup(Map<String, Object> queryContext)
{
testQuery(
@ -1518,6 +1537,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.FINALIZING_FIELD_ACCESS)
public void testInnerJoinQueryOfLookup(Map<String, Object> queryContext)
{
// Cannot vectorize the subquery.
@ -1597,6 +1617,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.DEFINETLY_WORSE_PLAN)
public void testInnerJoinQueryOfLookupRemovable(Map<String, Object> queryContext)
{
// Like "testInnerJoinQueryOfLookup", but the subquery is removable.
@ -1635,6 +1656,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EQUIV_PLAN)
public void testInnerJoinTwoLookupsToTableUsingNumericColumn(Map<String, Object> queryContext)
{
// Regression test for https://github.com/apache/druid/issues/9646.
@ -1696,6 +1718,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testInnerJoinTwoLookupsToTableUsingNumericColumnInReverse(Map<String, Object> queryContext)
{
@ -1753,6 +1776,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testInnerJoinLookupTableTable(Map<String, Object> queryContext)
{
// Regression test for https://github.com/apache/druid/issues/9646.
@ -1835,6 +1859,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testInnerJoinLookupTableTableChained(Map<String, Object> queryContext)
{
// Cannot vectorize JOIN operator.
@ -1957,6 +1982,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_FILTER_LOCATIONS)
public void testCommaJoinLeftFunction()
{
testQuery(
@ -1995,6 +2021,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
// Hence, comma join will result in a cross join with filter on outermost
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_UNSUPORTED_OPERAND)
public void testCommaJoinTableLookupTableMismatchedTypes(Map<String, Object> queryContext)
{
// Regression test for https://github.com/apache/druid/issues/9646.
@ -2061,6 +2088,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testJoinTableLookupTableMismatchedTypesWithoutComma(Map<String, Object> queryContext)
{
// Empty-dataset aggregation queries in MSQ return an empty row, rather than a single row as SQL requires.
@ -2131,6 +2159,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_FILTER_LOCATIONS)
public void testInnerJoinCastLeft(Map<String, Object> queryContext)
{
// foo.m1 is FLOAT, l.k is STRING.
@ -2259,6 +2288,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_FILTER_LOCATIONS)
public void testInnerJoinLeftFunction(Map<String, Object> queryContext)
{
testQuery(
@ -2711,6 +2741,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_FILTER_LOCATIONS)
public void testUsingSubqueryWithExtractionFns(Map<String, Object> queryContext)
{
// Cannot vectorize JOIN operator.
@ -2771,6 +2802,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testInnerJoinWithIsNullFilter(Map<String, Object> queryContext)
{
testQuery(
@ -2913,6 +2945,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_LEFT_DIRECT_ACCESS)
public void testLeftJoinOnTwoInlineDataSourcesWithTimeFilter_withLeftDirectAccess(Map<String, Object> queryContext)
{
queryContext = withLeftDirectAccessEnabled(queryContext);
@ -3024,6 +3057,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_LEFT_DIRECT_ACCESS)
public void testLeftJoinOnTwoInlineDataSourcesWithOuterWhere_withLeftDirectAccess(Map<String, Object> queryContext)
{
queryContext = withLeftDirectAccessEnabled(queryContext);
@ -3125,6 +3159,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_LEFT_DIRECT_ACCESS)
public void testLeftJoinOnTwoInlineDataSources_withLeftDirectAccess(Map<String, Object> queryContext)
{
queryContext = withLeftDirectAccessEnabled(queryContext);
@ -3226,6 +3261,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_LEFT_DIRECT_ACCESS)
public void testInnerJoinOnTwoInlineDataSourcesWithOuterWhere_withLeftDirectAccess(Map<String, Object> queryContext)
{
queryContext = withLeftDirectAccessEnabled(queryContext);
@ -3327,6 +3363,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EQUIV_PLAN)
public void testGroupByOverGroupByOverInnerJoinOnTwoInlineDataSources(Map<String, Object> queryContext)
{
skipVectorize();
@ -3412,6 +3449,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_LEFT_DIRECT_ACCESS)
public void testInnerJoinOnTwoInlineDataSources_withLeftDirectAccess(Map<String, Object> queryContext)
{
queryContext = withLeftDirectAccessEnabled(queryContext);
@ -3640,6 +3678,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EQUIV_PLAN)
public void testLeftJoinSubqueryWithSelectorFilter(Map<String, Object> queryContext)
{
// Cannot vectorize due to 'concat' expression.
@ -3693,6 +3732,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testLeftJoinWithNotNullFilter(Map<String, Object> queryContext)
{
testQuery(
@ -3740,6 +3780,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testInnerJoin(Map<String, Object> queryContext)
{
testQuery(
@ -3794,6 +3835,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testJoinWithExplicitIsNotDistinctFromCondition(Map<String, Object> queryContext)
{
// Like "testInnerJoin", but uses IS NOT DISTINCT FROM instead of equals.
@ -3839,6 +3881,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EQUIV_PLAN)
public void testInnerJoinSubqueryWithSelectorFilter(Map<String, Object> queryContext)
{
if (sortBasedJoin) {
@ -3898,6 +3941,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testSemiJoinWithOuterTimeExtractScan()
{
testQuery(
@ -3946,6 +3990,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testTwoSemiJoinsSimultaneously(Map<String, Object> queryContext)
{
// Fully removing the join allows this query to vectorize.
@ -4117,6 +4162,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EQUIV_PLAN)
public void testSemiAndAntiJoinSimultaneouslyUsingExplicitJoins(Map<String, Object> queryContext)
{
cannotVectorize();
@ -4184,6 +4230,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testSemiJoinWithOuterTimeExtractAggregateWithOrderBy()
{
// Cannot vectorize due to virtual columns.
@ -4278,6 +4325,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.UNION_WITH_COMPLEX_OPERAND)
public void testUnionAllTwoQueriesLeftQueryIsJoin(Map<String, Object> queryContext)
{
// MSQ does not support UNION ALL.
@ -4322,6 +4370,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.UNION_WITH_COMPLEX_OPERAND)
public void testUnionAllTwoQueriesRightQueryIsJoin(Map<String, Object> queryContext)
{
// MSQ does not support UNION ALL.
@ -4364,6 +4413,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.UNION_WITH_COMPLEX_OPERAND)
@Test
public void testUnionAllTwoQueriesBothQueriesAreJoin()
{
@ -4636,6 +4686,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testJoinWithNonEquiCondition(Map<String, Object> queryContext)
{
// Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a
@ -4698,6 +4749,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_UNSUPORTED_OPERAND)
public void testJoinWithEquiAndNonEquiCondition(Map<String, Object> queryContext)
{
// Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a
@ -4743,6 +4795,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testUsingSubqueryAsPartOfAndFilter(Map<String, Object> queryContext)
{
// Fully removing the join allows this query to vectorize.
@ -4902,6 +4955,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_UNSUPORTED_OPERAND)
public void testNestedGroupByOnInlineDataSourceWithFilter(Map<String, Object> queryContext)
{
// Cannot vectorize due to virtual columns.
@ -5094,6 +5148,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EQUIV_PLAN)
public void testTopNOnStringWithNonSortedOrUniqueDictionary(Map<String, Object> queryContext)
{
testQuery(
@ -5134,6 +5189,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.EQUIV_PLAN)
public void testTopNOnStringWithNonSortedOrUniqueDictionaryOrderByDim(Map<String, Object> queryContext)
{
@ -5174,6 +5230,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.SLIGHTLY_WORSE_PLAN)
public void testVirtualColumnOnMVFilterJoinExpression(Map<String, Object> queryContext)
{
// Doesn't work in MSQ, although it's not really MSQ's fault. In MSQ, the second field (foo2.dim3) is returned as
@ -5230,6 +5287,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.DEFINETLY_WORSE_PLAN)
public void testVirtualColumnOnMVFilterMultiJoinExpression(Map<String, Object> queryContext)
{
// Doesn't work in MSQ, although it's not really MSQ's fault. In MSQ, the second field (foo2.dim3) is returned as
@ -5309,6 +5367,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testInnerJoinWithFilterPushdownAndManyFiltersEmptyResults(Map<String, Object> queryContext)
{
// create the query we expect
@ -5416,6 +5475,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testInnerJoinWithFilterPushdownAndManyFiltersNonEmptyResults(Map<String, Object> queryContext)
{
// create the query we expect
@ -5584,6 +5644,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@NotYetSupported(Modes.SORT_REMOVE_TROUBLE)
public void testRegressionFilteredAggregatorsSubqueryJoins(Map<String, Object> queryContext)
{
cannotVectorize();
@ -5778,6 +5839,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@SqlTestFrameworkConfig(minTopNThreshold = 1)
@Test
@NotYetSupported(Modes.JOIN_TABLE_TABLE)
public void testJoinWithAliasAndOrderByNoGroupBy()
{
Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
@ -5796,8 +5858,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Intervals.of(
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z")))
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim2")
.context(context)
.build()
@ -5970,6 +6031,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.JOIN_FILTER_LOCATIONS)
public void testJoinWithInputRefCondition()
{
cannotVectorize();
@ -6088,6 +6150,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.CORRELATE_CONVERSION)
public void testJoinsWithUnnestOnLeft()
{
// Segment map function of MSQ needs some work
@ -6143,6 +6206,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.CORRELATE_CONVERSION)
public void testJoinsWithUnnestOverFilteredDSOnLeft()
{
// Segment map function of MSQ needs some work
@ -6201,6 +6265,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.CORRELATE_CONVERSION)
public void testJoinsWithUnnestOverJoin()
{
// Segment map function of MSQ needs some work
@ -6287,6 +6352,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.CORRELATE_CONVERSION)
public void testSelfJoinsWithUnnestOnLeftAndRight()
{
// Segment map function of MSQ needs some work
@ -6356,6 +6422,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
}
@Test
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
public void testJoinsOverUnnestOverFilterDSOverJoin()
{
// Segment map function of MSQ needs some work

View File

@ -2817,7 +2817,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.PLAN_MISMATCH)
@Test
public void testGroupByWithSelectAndOrderByProjections()
{
@ -2902,7 +2901,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.PLAN_MISMATCH)
@Test
public void testTopNWithSelectAndOrderByProjections()
{
@ -4868,7 +4866,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.PLAN_MISMATCH)
@Test
public void testGroupByWithSortOnPostAggregationDefault()
{
@ -4900,7 +4897,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.PLAN_MISMATCH)
@Test
public void testGroupByWithSortOnPostAggregationNoTopNConfig()
{
@ -4944,7 +4940,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.PLAN_MISMATCH)
@Test
public void testGroupByWithSortOnPostAggregationNoTopNContext()
{
@ -5784,7 +5779,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnplannableJoinQueriesInNonSQLCompatibleMode()
{
@ -6931,7 +6925,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.PLAN_MISMATCH)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.AGG_COL_EXCHANGE)
@Test
public void testExactCountDistinctWithGroupingAndOtherAggregators()
{
@ -6986,7 +6980,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.MISSING_JOIN_CONVERSION)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.AGG_COL_EXCHANGE)
@Test
public void testMultipleExactCountDistinctWithGroupingAndOtherAggregatorsUsingJoin()
{
@ -10515,7 +10509,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.PLAN_MISMATCH)
@DecoupledTestConfig(nativeQueryIgnore = NativeQueryIgnore.IMPROVED_PLAN)
@Test
public void testGroupByTimeFloorAndDimOnGroupByTimeFloorAndDim()
{
@ -12149,7 +12143,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.MISSING_JOIN_CONVERSION)
@Test
public void testRequireTimeConditionPositive()
{
@ -12178,7 +12171,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new Object[]{3L, timestamp("2001-01-01")}
)
);
}
@Test
public void testRequireTimeConditionPositive2()
{
// nested GROUP BY only requires time condition for inner most query
testQuery(
PLANNER_CONFIG_REQUIRE_TIME_CONDITION,
@ -12221,7 +12218,13 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new Object[]{6L, 4L}
)
);
}
// __time >= x remains in the join condition
@NotYetSupported(Modes.JOIN_CONDITION_NOT_PUSHED_CONDITION)
@Test
public void testRequireTimeConditionPositive3()
{
// Cannot vectorize next test due to extraction dimension spec.
cannotVectorize();
@ -12353,7 +12356,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.MISSING_JOIN_CONVERSION2)
@Test
public void testRequireTimeConditionSemiJoinNegative()
{
@ -14648,7 +14650,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.MISSING_JOIN_CONVERSION)
@Test
public void testOrderByAlongWithInternalScanQuery()
{
@ -14691,7 +14692,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.MISSING_JOIN_CONVERSION)
@Test
public void testOrderByAlongWithInternalScanQueryNoDistinct()
{

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableMap;
import junitparams.Parameters;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.apache.druid.sql.calcite.util.SqlTestFramework.PlannerComponentSupplier;
import org.junit.Rule;
import org.junit.Test;
import java.util.Map;
import static org.junit.Assert.assertNotNull;
public class DecoupledPlanningCalciteJoinQueryTest extends CalciteJoinQueryTest
{
@Rule(order = 0)
public NotYetSupportedProcessor decoupledIgnoreProcessor = new NotYetSupportedProcessor();
private static final ImmutableMap<String, Object> CONTEXT_OVERRIDES =
ImmutableMap.<String, Object>builder()
.putAll(BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT)
.put(PlannerConfig.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
.put(QueryContexts.ENABLE_DEBUG, true)
.build();
@Override
protected QueryTestBuilder testBuilder()
{
PlannerComponentSupplier componentSupplier = this;
CalciteTestConfig testConfig = new CalciteTestConfig(CONTEXT_OVERRIDES)
{
@Override
public SqlTestFramework.PlannerFixture plannerFixture(PlannerConfig plannerConfig, AuthConfig authConfig)
{
plannerConfig = plannerConfig.withOverrides(CONTEXT_OVERRIDES);
return queryFramework().plannerFixture(componentSupplier, plannerConfig, authConfig);
}
};
QueryTestBuilder builder = new QueryTestBuilder(testConfig)
.cannotVectorize(cannotVectorize)
.skipVectorize(skipVectorize);
DecoupledTestConfig decTestConfig = queryFrameworkRule.getAnnotation(DecoupledTestConfig.class);
if (decTestConfig != null && decTestConfig.nativeQueryIgnore().isPresent()) {
builder.verifyNativeQueries(x -> false);
}
return builder;
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
@DecoupledTestConfig
public void ensureDecoupledTestConfigAnnotationWorks(Map<String, Object> queryContext)
{
assertNotNull(queryFrameworkRule.getAnnotation(DecoupledTestConfig.class));
assertNotNull(queryContext);
}
}

View File

@ -59,7 +59,7 @@ public class DecoupledPlanningCalciteQueryTest extends CalciteQueryTest
.cannotVectorize(cannotVectorize)
.skipVectorize(skipVectorize);
DecoupledTestConfig decTestConfig = queryFrameworkRule.getDescription().getAnnotation(DecoupledTestConfig.class);
DecoupledTestConfig decTestConfig = queryFrameworkRule.getAnnotation(DecoupledTestConfig.class);
if (decTestConfig != null && decTestConfig.nativeQueryIgnore().isPresent()) {
builder.verifyNativeQueries(x -> false);

View File

@ -59,7 +59,7 @@ public class DecoupledPlanningCalciteUnionQueryTest extends CalciteUnionQueryTes
.cannotVectorize(cannotVectorize)
.skipVectorize(skipVectorize);
DecoupledTestConfig decTestConfig = queryFrameworkRule.getDescription().getAnnotation(DecoupledTestConfig.class);
DecoupledTestConfig decTestConfig = queryFrameworkRule.getAnnotation(DecoupledTestConfig.class);
if (decTestConfig != null && decTestConfig.nativeQueryIgnore().isPresent()) {
builder.verifyNativeQueries(x -> false);

View File

@ -20,6 +20,10 @@
package org.apache.druid.sql.calcite;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -63,7 +67,41 @@ public @interface DecoupledTestConfig
/**
* Worse plan; may loose vectorization; but no extra queries
*/
SLIGHTLY_WORSE_PLAN;
SLIGHTLY_WORSE_PLAN,
/**
* {@link TimeseriesQuery} to {@link ScanQuery} change.
*
* Not yet sure if this is improvement; or some issue
*/
TS_TO_SCAN,
/**
* GroupBy doesn't sorted?!
*/
GBY_DOESNT_SORT,
/**
* Equvivalent plan.
*
* Renamed variable
*/
EQUIV_PLAN,
/**
* {@link QueryContexts#SQL_JOIN_LEFT_SCAN_DIRECT} not supported.
*/
JOIN_LEFT_DIRECT_ACCESS,
/**
* Different filter layout.
*
* Filter is pushed below join to the left.
*/
JOIN_FILTER_LOCATIONS,
/**
* New scans / etc.
*/
DEFINETLY_WORSE_PLAN,
/**
* A new {@link FinalizingFieldAccessPostAggregator} appeared in the plan.
*/
FINALIZING_FIELD_ACCESS;
public boolean isPresent()
{

View File

@ -20,18 +20,27 @@
package org.apache.druid.sql.calcite;
import com.google.common.base.Throwables;
import junitparams.JUnitParamsRunner;
import org.apache.commons.lang3.RegExUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.ISE;
import org.junit.AssumptionViolatedException;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.model.Statement;
import java.lang.annotation.Annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Method;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertThrows;
@ -68,7 +77,6 @@ public @interface NotYetSupported
enum Modes
{
PLAN_MISMATCH(AssertionError.class, "AssertionError: query #"),
NOT_ENOUGH_RULES(DruidException.class, "not enough rules"),
ERROR_HANDLING(AssertionError.class, "targetPersona: is <[A-Z]+> and category: is <[A-Z_]+> and errorCode: is"),
EXPRESSION_NOT_GROUPED(DruidException.class, "Expression '[a-z]+' is not being grouped"),
@ -83,12 +91,17 @@ public @interface NotYetSupported
INCORRECT_SYNTAX(DruidException.class, "Incorrect syntax near the keyword"),
// at least c7 is represented oddly in the parquet file
T_ALLTYPES_ISSUES(AssertionError.class, "(t_alltype|allTypsUniq|fewRowsAllData).parquet.*Verifier.verify"),
RESULT_MISMATCH(AssertionError.class, "(assertResultsEquals|AssertionError: column content mismatch)"),
RESULT_MISMATCH(AssertionError.class, "(assertResulEquals|AssertionError: column content mismatch)"),
UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering with NULLS (LAST|FIRST)"),
MISSING_JOIN_CONVERSION(DruidException.class, "Missing conversions? (was|is) (Logical)?Join"),
MISSING_JOIN_CONVERSION2(AssertionError.class, "Missing conversions? (was|is) (Logical)?Join"),
UNION_WITH_COMPLEX_OPERAND(DruidException.class, "Only Table and Values are supported as inputs for Union"),
UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs");
UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs"),
JOIN_CONDITION_NOT_PUSHED_CONDITION(DruidException.class, "SQL requires a join with '.*' condition"),
JOIN_CONDITION_UNSUPORTED_OPERAND(DruidException.class, "SQL .* unsupported operand type"),
JOIN_TABLE_TABLE(ISE.class, "Cannot handle subquery structure for dataSource: JoinDataSource"),
CORRELATE_CONVERSION(DruidException.class, "Missing conversion( is|s are) LogicalCorrelate"),
SORT_REMOVE_TROUBLE(DruidException.class, "Calcite assertion violated.*Sort\\.<init>"),
STACK_OVERFLOW(StackOverflowError.class, ""),
CANNOT_JOIN_LOOKUP_NON_KEY(RuntimeException.class, "Cannot join lookup with condition referring to non-key");
public Class<? extends Throwable> throwableClass;
public String regex;
@ -116,7 +129,7 @@ public @interface NotYetSupported
@Override
public Statement apply(Statement base, Description description)
{
NotYetSupported annotation = description.getAnnotation(NotYetSupported.class);
NotYetSupported annotation = getAnnotation(description, NotYetSupported.class);
if (annotation == null) {
return base;
@ -159,5 +172,38 @@ public @interface NotYetSupported
}
};
}
private static Method getMethodForName(Class<?> testClass, String realMethodName)
{
List<Method> matches = Stream.of(testClass.getMethods())
.filter(m -> realMethodName.equals(m.getName()))
.collect(Collectors.toList());
switch (matches.size()) {
case 0:
throw new IllegalArgumentException("Expected to find method...but there is none?");
case 1:
return matches.get(0);
default:
throw new IllegalArgumentException("method overrides are not supported");
}
}
public static <T extends Annotation> T getAnnotation(Description description, Class<T> annotationType)
{
T annotation = description.getAnnotation(annotationType);
if (annotation != null) {
return annotation;
}
Class<?> testClass = description.getTestClass();
RunWith runWith = testClass.getAnnotation(RunWith.class);
if (runWith == null || !runWith.value().equals(JUnitParamsRunner.class)) {
return null;
}
String mehodName = description.getMethodName();
String realMethodName = RegExUtils.replaceAll(mehodName, "\\(.*", "");
Method m = getMethodForName(testClass, realMethodName);
return m.getAnnotation(annotationType);
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
import org.apache.druid.sql.calcite.util.CacheTestHelperModule.ResultCacheMode;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.apache.druid.sql.calcite.util.SqlTestFramework.QueryComponentSupplier;
@ -27,6 +28,7 @@ import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import java.lang.annotation.Annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@ -119,9 +121,9 @@ public @interface SqlTestFrameworkConfig
return getConfigurationInstance().framework;
}
public Description getDescription()
public <T extends Annotation> T getAnnotation(Class<T> annotationType)
{
return description;
return NotYetSupportedProcessor.getAnnotation(description, annotationType);
}
private ConfigurationInstance getConfigurationInstance()
@ -133,12 +135,10 @@ public @interface SqlTestFrameworkConfig
{
return new ConfigurationInstance(config, testHost);
}
}
class ConfigurationInstance
{
public SqlTestFramework framework;
ConfigurationInstance(SqlTestFrameworkConfig config, QueryComponentSupplier testHost)
@ -156,5 +156,4 @@ public @interface SqlTestFrameworkConfig
framework.close();
}
}
}

View File

@ -21,25 +21,21 @@ package org.apache.druid.sql.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.druid.annotations.UsedByJUnitParamsRunner;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.EnumSet;
@RunWith(JUnitParamsRunner.class)
public class ResultFormatTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
@Parameters(source = ResultFormatTypeProvider.class)
@ParameterizedTest
@MethodSource("provideResultFormats")
public void testSerde(ResultFormat target) throws JsonProcessingException
{
final String json = jsonMapper.writeValueAsString(target);
@ -56,9 +52,6 @@ public class ResultFormatTest
Assert.assertEquals(ResultFormat.OBJECTLINES, jsonMapper.readValue("\"oBjEcTlInEs\"", ResultFormat.class));
}
public static class ResultFormatTypeProvider
{
@UsedByJUnitParamsRunner
public static Object[] provideResultFormats()
{
return EnumSet.allOf(ResultFormat.class)
@ -67,4 +60,3 @@ public class ResultFormatTest
.toArray(Object[]::new);
}
}
}