mirror of https://github.com/apache/druid.git
SQL: More straightforward handling of join planning. (#9648)
* SQL: More straightforward handling of join planning. Two changes that simplify how joins are planned: 1) Stop using JoinProjectTransposeRule as a way of guiding subquery removal. Instead, add logic to DruidJoinRule that identifies removable subqueries and removes them at the point of creating a DruidJoinQueryRel. This approach reduces the size of the planning space and allows the planner to complete quickly. 2) Remove rules that reorder joins. Not because of an impact on the planning time (it seems minimal), but because the decisions that the planner was making in the new tests were sometimes worse than the user-provided order. I think we'll need to go with the user-provided order for now, and revisit reordering when we can add more smarts to the cost estimator. A third change updates numeric ExprEval classes to store their value as a boxed type that corresponds to what it is supposed to be. This is useful because it affects the behavior of "asString", and is included in this patch because it is needed for the new test "testInnerJoinTwoLookupsToTableUsingNumericColumnInReverse". This test relies on CAST('6', 'DOUBLE') stringifying to "6.0" like an actual double would. Fixes #9646. * Fix comments. * Fix tests.
This commit is contained in:
parent
eb45981b60
commit
75c543b50f
|
@ -244,7 +244,7 @@ public abstract class ExprEval<T>
|
|||
{
|
||||
private DoubleExprEval(@Nullable Number value)
|
||||
{
|
||||
super(value == null ? NullHandling.defaultDoubleValue() : value);
|
||||
super(value == null ? NullHandling.defaultDoubleValue() : (Double) value.doubleValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -304,7 +304,7 @@ public abstract class ExprEval<T>
|
|||
{
|
||||
private LongExprEval(@Nullable Number value)
|
||||
{
|
||||
super(value == null ? NullHandling.defaultLongValue() : value);
|
||||
super(value == null ? NullHandling.defaultLongValue() : (Long) value.longValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -175,7 +175,7 @@ public class FunctionTest extends InitializedNullHandlingTest
|
|||
public void testArrayLength()
|
||||
{
|
||||
assertExpr("array_length([1,2,3])", 3L);
|
||||
assertExpr("array_length(a)", 4);
|
||||
assertExpr("array_length(a)", 4L);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -199,7 +199,7 @@ public class FunctionTest extends InitializedNullHandlingTest
|
|||
{
|
||||
assertExpr("array_offset_of([1, 2, 3], 3)", 2L);
|
||||
assertExpr("array_offset_of([1, 2, 3], 4)", NullHandling.replaceWithDefault() ? -1L : null);
|
||||
assertExpr("array_offset_of(a, 'baz')", 2);
|
||||
assertExpr("array_offset_of(a, 'baz')", 2L);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -207,7 +207,7 @@ public class FunctionTest extends InitializedNullHandlingTest
|
|||
{
|
||||
assertExpr("array_ordinal_of([1, 2, 3], 3)", 3L);
|
||||
assertExpr("array_ordinal_of([1, 2, 3], 4)", NullHandling.replaceWithDefault() ? -1L : null);
|
||||
assertExpr("array_ordinal_of(a, 'baz')", 3);
|
||||
assertExpr("array_ordinal_of(a, 'baz')", 3L);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -47,10 +47,7 @@ import org.apache.calcite.rel.rules.FilterMergeRule;
|
|||
import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
|
||||
import org.apache.calcite.rel.rules.FilterTableScanRule;
|
||||
import org.apache.calcite.rel.rules.IntersectToDistinctRule;
|
||||
import org.apache.calcite.rel.rules.JoinCommuteRule;
|
||||
import org.apache.calcite.rel.rules.JoinProjectTransposeRule;
|
||||
import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
|
||||
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
|
||||
import org.apache.calcite.rel.rules.MatchRule;
|
||||
import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
|
||||
import org.apache.calcite.rel.rules.ProjectMergeRule;
|
||||
|
@ -95,6 +92,8 @@ public class Rules
|
|||
// 2) AggregateReduceFunctionsRule (it'll be added back for the Bindable rule set, but we don't want it for Druid
|
||||
// rules since it expands AVG, STDDEV, VAR, etc, and we have aggregators specifically designed for those
|
||||
// functions).
|
||||
// 3) JoinCommuteRule (we don't support reordering joins yet).
|
||||
// 4) JoinPushThroughJoinRule (we don't support reordering joins yet).
|
||||
private static final List<RelOptRule> BASE_RULES =
|
||||
ImmutableList.of(
|
||||
AggregateStarTableRule.INSTANCE,
|
||||
|
@ -110,9 +109,6 @@ public class Rules
|
|||
FilterAggregateTransposeRule.INSTANCE,
|
||||
ProjectWindowTransposeRule.INSTANCE,
|
||||
MatchRule.INSTANCE,
|
||||
JoinCommuteRule.SWAP_OUTER,
|
||||
JoinPushThroughJoinRule.RIGHT,
|
||||
JoinPushThroughJoinRule.LEFT,
|
||||
SortProjectTransposeRule.INSTANCE,
|
||||
SortJoinTransposeRule.INSTANCE,
|
||||
SortRemoveConstantKeysRule.INSTANCE,
|
||||
|
@ -167,8 +163,12 @@ public class Rules
|
|||
IntersectToDistinctRule.INSTANCE
|
||||
);
|
||||
|
||||
// Rules from RelOptUtil's registerAbstractRelationalRules, except AggregateMergeRule. (It causes
|
||||
// testDoubleNestedGroupBy2 to fail).
|
||||
// Rules from RelOptUtil's registerAbstractRelationalRules, minus:
|
||||
//
|
||||
// 1) AggregateMergeRule (it causes testDoubleNestedGroupBy2 to fail)
|
||||
// 2) SemiJoinRule.PROJECT and SemiJoinRule.JOIN (we don't need to detect semi-joins, because they are handled
|
||||
// fine as-is by DruidJoinRule).
|
||||
// 3) JoinCommuteRule (we don't support reordering joins yet).
|
||||
private static final List<RelOptRule> ABSTRACT_RELATIONAL_RULES =
|
||||
ImmutableList.of(
|
||||
FilterJoinRule.FILTER_ON_JOIN,
|
||||
|
@ -183,13 +183,6 @@ public class Rules
|
|||
SortRemoveRule.INSTANCE
|
||||
);
|
||||
|
||||
// Rules that pull projections up above a join. This lets us eliminate some subqueries.
|
||||
private static final List<RelOptRule> JOIN_PROJECT_TRANSPOSE_RULES =
|
||||
ImmutableList.of(
|
||||
JoinProjectTransposeRule.RIGHT_PROJECT,
|
||||
JoinProjectTransposeRule.LEFT_PROJECT
|
||||
);
|
||||
|
||||
private Rules()
|
||||
{
|
||||
// No instantiation.
|
||||
|
@ -197,15 +190,17 @@ public class Rules
|
|||
|
||||
public static List<Program> programs(final PlannerContext plannerContext, final QueryMaker queryMaker)
|
||||
{
|
||||
final Program hepProgram =
|
||||
// Program that pre-processes the tree before letting the full-on VolcanoPlanner loose.
|
||||
final Program preProgram =
|
||||
Programs.sequence(
|
||||
Programs.subQuery(DefaultRelMetadataProvider.INSTANCE),
|
||||
new DecorrelateAndTrimFieldsProgram(),
|
||||
DecorrelateAndTrimFieldsProgram.INSTANCE,
|
||||
Programs.hep(REDUCTION_RULES, true, DefaultRelMetadataProvider.INSTANCE)
|
||||
);
|
||||
|
||||
return ImmutableList.of(
|
||||
Programs.sequence(hepProgram, Programs.ofRules(druidConventionRuleSet(plannerContext, queryMaker))),
|
||||
Programs.sequence(hepProgram, Programs.ofRules(bindableConventionRuleSet(plannerContext)))
|
||||
Programs.sequence(preProgram, Programs.ofRules(druidConventionRuleSet(plannerContext, queryMaker))),
|
||||
Programs.sequence(preProgram, Programs.ofRules(bindableConventionRuleSet(plannerContext)))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -242,7 +237,6 @@ public class Rules
|
|||
rules.addAll(BASE_RULES);
|
||||
rules.addAll(ABSTRACT_RULES);
|
||||
rules.addAll(ABSTRACT_RELATIONAL_RULES);
|
||||
rules.addAll(JOIN_PROJECT_TRANSPOSE_RULES);
|
||||
|
||||
if (!plannerConfig.isUseApproximateCountDistinct()) {
|
||||
// For some reason, even though we support grouping sets, using AggregateExpandDistinctAggregatesRule.INSTANCE
|
||||
|
@ -261,6 +255,8 @@ public class Rules
|
|||
// accessible through Programs.standard (which we don't want, since it also adds Enumerable rules).
|
||||
private static class DecorrelateAndTrimFieldsProgram implements Program
|
||||
{
|
||||
private static final DecorrelateAndTrimFieldsProgram INSTANCE = new DecorrelateAndTrimFieldsProgram();
|
||||
|
||||
@Override
|
||||
public RelNode run(
|
||||
RelOptPlanner planner,
|
||||
|
@ -270,8 +266,8 @@ public class Rules
|
|||
List<RelOptLattice> lattices
|
||||
)
|
||||
{
|
||||
final RelNode decorrelatedRel = RelDecorrelator.decorrelateQuery(rel);
|
||||
final RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(decorrelatedRel.getCluster(), null);
|
||||
final RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(rel.getCluster(), null);
|
||||
final RelNode decorrelatedRel = RelDecorrelator.decorrelateQuery(rel, relBuilder);
|
||||
return new RelFieldTrimmer(null, relBuilder).trim(decorrelatedRel);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,13 +74,19 @@ public class CostEstimates
|
|||
* Multiplier to apply to an outer query via {@link DruidOuterQueryRel}. Encourages pushing down time-saving
|
||||
* operations to the lowest level of the query stack, because they'll have bigger impact there.
|
||||
*/
|
||||
static final double MULTIPLIER_OUTER_QUERY = 0.1;
|
||||
static final double MULTIPLIER_OUTER_QUERY = .1;
|
||||
|
||||
/**
|
||||
* Multiplier to apply to a join when the left-hand side is a subquery. Encourages avoiding subqueries. Subqueries
|
||||
* inside joins must be inlined, which incurs substantial reduction in scalability, so this high number is justified.
|
||||
* Cost to add to a join when either side is a subquery. Strongly encourages avoiding subqueries, since they must be
|
||||
* inlined and then the join must run on the Broker.
|
||||
*/
|
||||
static final double MULTIPLIER_JOIN_SUBQUERY = 1000000000;
|
||||
static final double COST_JOIN_SUBQUERY = 1e5;
|
||||
|
||||
/**
|
||||
* Cost to perform a cross join. Strongly encourages pushing down filters into join conditions, even if it means
|
||||
* we need to add a subquery (this is higher than {@link #COST_JOIN_SUBQUERY}).
|
||||
*/
|
||||
static final double COST_JOIN_CROSS = 1e8;
|
||||
|
||||
private CostEstimates()
|
||||
{
|
||||
|
|
|
@ -22,11 +22,13 @@ package org.apache.druid.sql.calcite.rel;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.calcite.plan.RelOptCluster;
|
||||
import org.apache.calcite.plan.RelOptCost;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
import org.apache.calcite.plan.RelOptRule;
|
||||
import org.apache.calcite.plan.RelTraitSet;
|
||||
import org.apache.calcite.plan.volcano.RelSubset;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.rel.RelWriter;
|
||||
import org.apache.calcite.rel.core.Join;
|
||||
|
@ -34,6 +36,7 @@ import org.apache.calcite.rel.core.JoinRelType;
|
|||
import org.apache.calcite.rel.metadata.RelMetadataQuery;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
@ -66,32 +69,10 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
private RelNode left;
|
||||
private RelNode right;
|
||||
|
||||
/**
|
||||
* True if {@link #left} requires a subquery.
|
||||
*
|
||||
* This is useful to store in a variable because {@link #left} is sometimes not actually a {@link DruidRel} when
|
||||
* {@link #computeSelfCost} is called. (It might be a {@link org.apache.calcite.plan.volcano.RelSubset}.)
|
||||
*
|
||||
* @see #computeLeftRequiresSubquery(DruidRel)
|
||||
*/
|
||||
private final boolean leftRequiresSubquery;
|
||||
|
||||
/**
|
||||
* True if {@link #right} requires a subquery.
|
||||
*
|
||||
* This is useful to store in a variable because {@link #left} is sometimes not actually a {@link DruidRel} when
|
||||
* {@link #computeSelfCost} is called. (It might be a {@link org.apache.calcite.plan.volcano.RelSubset}.)
|
||||
*
|
||||
* @see #computeLeftRequiresSubquery(DruidRel)
|
||||
*/
|
||||
private final boolean rightRequiresSubquery;
|
||||
|
||||
private DruidJoinQueryRel(
|
||||
RelOptCluster cluster,
|
||||
RelTraitSet traitSet,
|
||||
Join joinRel,
|
||||
boolean leftRequiresSubquery,
|
||||
boolean rightRequiresSubquery,
|
||||
PartialDruidQuery partialQuery,
|
||||
QueryMaker queryMaker
|
||||
)
|
||||
|
@ -100,8 +81,6 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
this.joinRel = joinRel;
|
||||
this.left = joinRel.getLeft();
|
||||
this.right = joinRel.getRight();
|
||||
this.leftRequiresSubquery = leftRequiresSubquery;
|
||||
this.rightRequiresSubquery = rightRequiresSubquery;
|
||||
this.partialQuery = partialQuery;
|
||||
}
|
||||
|
||||
|
@ -110,18 +89,15 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
*/
|
||||
public static DruidJoinQueryRel create(
|
||||
final Join joinRel,
|
||||
final DruidRel<?> left,
|
||||
final DruidRel<?> right
|
||||
final QueryMaker queryMaker
|
||||
)
|
||||
{
|
||||
return new DruidJoinQueryRel(
|
||||
joinRel.getCluster(),
|
||||
joinRel.getTraitSet(),
|
||||
joinRel,
|
||||
computeLeftRequiresSubquery(left),
|
||||
computeRightRequiresSubquery(right),
|
||||
PartialDruidQuery.create(joinRel),
|
||||
left.getQueryMaker()
|
||||
queryMaker
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -149,19 +125,11 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
getCluster(),
|
||||
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
|
||||
joinRel,
|
||||
leftRequiresSubquery,
|
||||
rightRequiresSubquery,
|
||||
newQueryBuilder,
|
||||
getQueryMaker()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueryCount()
|
||||
{
|
||||
return ((DruidRel<?>) left).getQueryCount() + ((DruidRel<?>) right).getQueryCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
|
||||
{
|
||||
|
@ -176,18 +144,14 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
final DataSource rightDataSource;
|
||||
|
||||
if (computeLeftRequiresSubquery(leftDruidRel)) {
|
||||
assert leftRequiresSubquery;
|
||||
leftDataSource = new QueryDataSource(leftQuery.getQuery());
|
||||
} else {
|
||||
assert !leftRequiresSubquery;
|
||||
leftDataSource = leftQuery.getDataSource();
|
||||
}
|
||||
|
||||
if (computeRightRequiresSubquery(rightDruidRel)) {
|
||||
assert rightRequiresSubquery;
|
||||
rightDataSource = new QueryDataSource(rightQuery.getQuery());
|
||||
} else {
|
||||
assert !rightRequiresSubquery;
|
||||
rightDataSource = rightQuery.getDataSource();
|
||||
}
|
||||
|
||||
|
@ -250,8 +214,6 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
.map(input -> RelOptRule.convert(input, DruidConvention.instance()))
|
||||
.collect(Collectors.toList())
|
||||
),
|
||||
leftRequiresSubquery,
|
||||
rightRequiresSubquery,
|
||||
partialQuery,
|
||||
getQueryMaker()
|
||||
);
|
||||
|
@ -290,8 +252,6 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
getCluster(),
|
||||
traitSet,
|
||||
joinRel.copy(joinRel.getTraitSet(), inputs),
|
||||
leftRequiresSubquery,
|
||||
rightRequiresSubquery,
|
||||
getPartialDruidQuery(),
|
||||
getQueryMaker()
|
||||
);
|
||||
|
@ -319,12 +279,9 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return pw.input("left", left)
|
||||
.input("right", right)
|
||||
.item("condition", joinRel.getCondition())
|
||||
.item("joinType", joinRel.getJoinType())
|
||||
.item("query", queryString)
|
||||
.item("signature", druidQuery.getOutputRowSignature());
|
||||
return joinRel.explainTerms(pw)
|
||||
.item("query", queryString)
|
||||
.item("signature", druidQuery.getOutputRowSignature());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -336,10 +293,23 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
@Override
|
||||
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
|
||||
{
|
||||
return planner.getCostFactory()
|
||||
.makeCost(partialQuery.estimateCost(), 0, 0)
|
||||
.multiplyBy(leftRequiresSubquery ? CostEstimates.MULTIPLIER_JOIN_SUBQUERY : 1)
|
||||
.multiplyBy(rightRequiresSubquery ? CostEstimates.MULTIPLIER_JOIN_SUBQUERY : 1);
|
||||
double cost;
|
||||
|
||||
if (computeLeftRequiresSubquery(getSomeDruidChild(left))) {
|
||||
cost = CostEstimates.COST_JOIN_SUBQUERY;
|
||||
} else {
|
||||
cost = partialQuery.estimateCost();
|
||||
}
|
||||
|
||||
if (computeRightRequiresSubquery(getSomeDruidChild(right))) {
|
||||
cost += CostEstimates.COST_JOIN_SUBQUERY;
|
||||
}
|
||||
|
||||
if (joinRel.getCondition().isA(SqlKind.LITERAL) && !joinRel.getCondition().isAlwaysFalse()) {
|
||||
cost += CostEstimates.COST_JOIN_CROSS;
|
||||
}
|
||||
|
||||
return planner.getCostFactory().makeCost(cost, 0, 0);
|
||||
}
|
||||
|
||||
private static JoinType toDruidJoinType(JoinRelType calciteJoinType)
|
||||
|
@ -395,4 +365,14 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
|
|||
|
||||
return Pair.of(rightPrefix, signatureBuilder.build());
|
||||
}
|
||||
|
||||
private static DruidRel<?> getSomeDruidChild(final RelNode child)
|
||||
{
|
||||
if (child instanceof DruidRel) {
|
||||
return (DruidRel<?>) child;
|
||||
} else {
|
||||
final RelSubset subset = (RelSubset) child;
|
||||
return (DruidRel<?>) Iterables.getFirst(subset.getRels(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -113,12 +113,6 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueryCount()
|
||||
{
|
||||
return 1 + ((DruidRel) sourceRel).getQueryCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
|
||||
{
|
||||
|
|
|
@ -112,7 +112,6 @@ public class DruidQuery
|
|||
private final Sorting sorting;
|
||||
|
||||
private final Query query;
|
||||
private final RowSignature sourceRowSignature;
|
||||
private final RowSignature outputRowSignature;
|
||||
private final RelDataType outputRowType;
|
||||
private final VirtualColumnRegistry virtualColumnRegistry;
|
||||
|
@ -135,7 +134,6 @@ public class DruidQuery
|
|||
this.selectProjection = selectProjection;
|
||||
this.grouping = grouping;
|
||||
this.sorting = sorting;
|
||||
this.sourceRowSignature = Preconditions.checkNotNull(sourceRowSignature, "sourceRowSignature");
|
||||
this.outputRowSignature = computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, sorting);
|
||||
this.outputRowType = Preconditions.checkNotNull(outputRowType, "outputRowType");
|
||||
this.virtualColumnRegistry = Preconditions.checkNotNull(virtualColumnRegistry, "virtualColumnRegistry");
|
||||
|
@ -976,6 +974,7 @@ public class DruidQuery
|
|||
|
||||
// Compute the list of columns to select.
|
||||
final Set<String> columns = new HashSet<>(outputRowSignature.getColumnNames());
|
||||
|
||||
if (order != ScanQuery.Order.NONE) {
|
||||
columns.add(ColumnHolder.TIME_COLUMN_NAME);
|
||||
}
|
||||
|
|
|
@ -136,12 +136,6 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueryCount()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Object[]> runQuery()
|
||||
{
|
||||
|
|
|
@ -45,13 +45,6 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
|
|||
@Nullable
|
||||
public abstract PartialDruidQuery getPartialDruidQuery();
|
||||
|
||||
/**
|
||||
* Return the number of Druid queries this rel involves, including sub-queries. Simple queries will return 1.
|
||||
*
|
||||
* @return number of nested queries
|
||||
*/
|
||||
public abstract int getQueryCount();
|
||||
|
||||
public abstract Sequence<Object[]> runQuery();
|
||||
|
||||
public abstract T withPartialQuery(PartialDruidQuery newQueryBuilder);
|
||||
|
|
|
@ -87,12 +87,6 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueryCount()
|
||||
{
|
||||
return rels.stream().mapToInt(rel -> ((DruidRel) rel).getQueryCount()).sum();
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Sequence<Object[]> runQuery()
|
||||
|
|
|
@ -21,22 +21,35 @@ package org.apache.druid.sql.calcite.rule;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.calcite.plan.RelOptRule;
|
||||
import org.apache.calcite.plan.RelOptRuleCall;
|
||||
import org.apache.calcite.plan.RelOptUtil;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.rel.core.Join;
|
||||
import org.apache.calcite.rel.core.Project;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.rex.RexBuilder;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexInputRef;
|
||||
import org.apache.calcite.rex.RexLiteral;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.rex.RexUtil;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
import org.apache.calcite.tools.RelBuilder;
|
||||
import org.apache.calcite.util.ImmutableBitSet;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
|
||||
import org.apache.druid.sql.calcite.rel.DruidRel;
|
||||
import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Stack;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DruidJoinRule extends RelOptRule
|
||||
{
|
||||
|
@ -47,8 +60,8 @@ public class DruidJoinRule extends RelOptRule
|
|||
super(
|
||||
operand(
|
||||
Join.class,
|
||||
operand(DruidRel.class, none()),
|
||||
operand(DruidRel.class, none())
|
||||
operand(DruidRel.class, any()),
|
||||
operand(DruidRel.class, any())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -62,12 +75,7 @@ public class DruidJoinRule extends RelOptRule
|
|||
public boolean matches(RelOptRuleCall call)
|
||||
{
|
||||
final Join join = call.rel(0);
|
||||
final DruidRel<?> right = call.rel(2);
|
||||
|
||||
// 1) Condition must be handleable.
|
||||
// 2) Right cannot be a join; we want to generate left-heavy trees.
|
||||
return canHandleCondition(join.getCondition(), join.getLeft().getRowType())
|
||||
&& !(right instanceof DruidJoinQueryRel);
|
||||
return canHandleCondition(join.getCondition(), join.getLeft().getRowType());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,50 +85,140 @@ public class DruidJoinRule extends RelOptRule
|
|||
final DruidRel<?> left = call.rel(1);
|
||||
final DruidRel<?> right = call.rel(2);
|
||||
|
||||
// Preconditions were already verified in "matches".
|
||||
call.transformTo(DruidJoinQueryRel.create(join, left, right));
|
||||
final RexBuilder rexBuilder = join.getCluster().getRexBuilder();
|
||||
|
||||
final DruidRel<?> newLeft;
|
||||
final DruidRel<?> newRight;
|
||||
final List<RexNode> newProjectExprs = new ArrayList<>();
|
||||
|
||||
// Already verified to be present in "matches", so just call "get".
|
||||
// Can't be final, because we're going to reassign it up to a couple of times.
|
||||
ConditionAnalysis conditionAnalysis = analyzeCondition(join.getCondition(), join.getLeft().getRowType()).get();
|
||||
|
||||
if (left.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT
|
||||
&& left.getPartialDruidQuery().getWhereFilter() == null) {
|
||||
// Swap the left-side projection above the join, so the left side is a simple scan or mapping. This helps us
|
||||
// avoid subqueries.
|
||||
final RelNode leftScan = left.getPartialDruidQuery().getScan();
|
||||
final Project leftProject = left.getPartialDruidQuery().getSelectProject();
|
||||
|
||||
// Left-side projection expressions rewritten to be on top of the join.
|
||||
newProjectExprs.addAll(leftProject.getProjects());
|
||||
newLeft = left.withPartialQuery(PartialDruidQuery.create(leftScan));
|
||||
conditionAnalysis = conditionAnalysis.pushThroughLeftProject(leftProject);
|
||||
} else {
|
||||
// Leave left as-is. Write input refs that do nothing.
|
||||
for (int i = 0; i < left.getRowType().getFieldCount(); i++) {
|
||||
newProjectExprs.add(rexBuilder.makeInputRef(join.getRowType().getFieldList().get(i).getType(), i));
|
||||
}
|
||||
|
||||
newLeft = left;
|
||||
}
|
||||
|
||||
if (right.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT
|
||||
&& right.getPartialDruidQuery().getWhereFilter() == null
|
||||
&& !right.getPartialDruidQuery().getSelectProject().isMapping()
|
||||
&& conditionAnalysis.onlyUsesMappingsFromRightProject(right.getPartialDruidQuery().getSelectProject())) {
|
||||
// Swap the right-side projection above the join, so the right side is a simple scan or mapping. This helps us
|
||||
// avoid subqueries.
|
||||
final RelNode rightScan = right.getPartialDruidQuery().getScan();
|
||||
final Project rightProject = right.getPartialDruidQuery().getSelectProject();
|
||||
|
||||
// Right-side projection expressions rewritten to be on top of the join.
|
||||
Iterables.addAll(
|
||||
newProjectExprs,
|
||||
RexUtil.shift(rightProject.getProjects(), newLeft.getRowType().getFieldCount())
|
||||
);
|
||||
newRight = right.withPartialQuery(PartialDruidQuery.create(rightScan));
|
||||
conditionAnalysis = conditionAnalysis.pushThroughRightProject(rightProject);
|
||||
} else {
|
||||
// Leave right as-is. Write input refs that do nothing.
|
||||
for (int i = 0; i < right.getRowType().getFieldCount(); i++) {
|
||||
newProjectExprs.add(
|
||||
rexBuilder.makeInputRef(
|
||||
join.getRowType().getFieldList().get(left.getRowType().getFieldCount() + i).getType(),
|
||||
newLeft.getRowType().getFieldCount() + i
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
newRight = right;
|
||||
}
|
||||
|
||||
// Druid join written on top of the new left and right sides.
|
||||
final DruidJoinQueryRel druidJoin = DruidJoinQueryRel.create(
|
||||
join.copy(
|
||||
join.getTraitSet(),
|
||||
conditionAnalysis.getCondition(rexBuilder),
|
||||
newLeft,
|
||||
newRight,
|
||||
join.getJoinType(),
|
||||
join.isSemiJoinDone()
|
||||
),
|
||||
left.getQueryMaker()
|
||||
);
|
||||
|
||||
final RelBuilder relBuilder =
|
||||
call.builder()
|
||||
.push(druidJoin)
|
||||
.project(
|
||||
RexUtil.fixUp(
|
||||
rexBuilder,
|
||||
newProjectExprs,
|
||||
RelOptUtil.getFieldTypeList(druidJoin.getRowType())
|
||||
)
|
||||
);
|
||||
|
||||
call.transformTo(relBuilder.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this condition is an AND of equality conditions of the form: f(LeftRel) = RightColumn.
|
||||
*
|
||||
* @see org.apache.druid.segment.join.JoinConditionAnalysis where "equiCondition" is the same concept.
|
||||
* Returns whether {@link #analyzeCondition} would return something.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static boolean canHandleCondition(final RexNode condition, final RelDataType leftRowType)
|
||||
{
|
||||
return analyzeCondition(condition, leftRowType).isPresent();
|
||||
}
|
||||
|
||||
/**
|
||||
* If this condition is an AND of some combination of (1) literals; (2) equality conditions of the form
|
||||
* {@code f(LeftRel) = RightColumn}, then return a {@link ConditionAnalysis}.
|
||||
*/
|
||||
private static Optional<ConditionAnalysis> analyzeCondition(final RexNode condition, final RelDataType leftRowType)
|
||||
{
|
||||
final List<RexNode> subConditions = decomposeAnd(condition);
|
||||
final List<Pair<RexNode, RexInputRef>> equalitySubConditions = new ArrayList<>();
|
||||
final List<RexLiteral> literalSubConditions = new ArrayList<>();
|
||||
final int numLeftFields = leftRowType.getFieldCount();
|
||||
|
||||
for (RexNode subCondition : subConditions) {
|
||||
if (subCondition.isA(SqlKind.LITERAL)) {
|
||||
// Literals are always OK.
|
||||
literalSubConditions.add((RexLiteral) subCondition);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!subCondition.isA(SqlKind.EQUALS)) {
|
||||
// If it's not EQUALS, it's not supported.
|
||||
return false;
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
final List<RexNode> operands = ((RexCall) subCondition).getOperands();
|
||||
Preconditions.checkState(operands.size() == 2, "Expected 2 operands, got[%,d]", operands.size());
|
||||
|
||||
final int numLeftFields = leftRowType.getFieldList().size();
|
||||
|
||||
final boolean rhsIsFieldOfRightRel =
|
||||
operands.get(1).isA(SqlKind.INPUT_REF)
|
||||
&& ((RexInputRef) operands.get(1)).getIndex() >= numLeftFields;
|
||||
|
||||
final boolean lhsIsExpressionOfLeftRel =
|
||||
RelOptUtil.InputFinder.bits(operands.get(0)).intersects(ImmutableBitSet.range(numLeftFields));
|
||||
|
||||
if (!(lhsIsExpressionOfLeftRel && rhsIsFieldOfRightRel)) {
|
||||
if (isLeftExpression(operands.get(0), numLeftFields) && isRightInputRef(operands.get(1), numLeftFields)) {
|
||||
equalitySubConditions.add(Pair.of(operands.get(0), (RexInputRef) operands.get(1)));
|
||||
} else if (isRightInputRef(operands.get(0), numLeftFields)
|
||||
&& isLeftExpression(operands.get(1), numLeftFields)) {
|
||||
equalitySubConditions.add(Pair.of(operands.get(1), (RexInputRef) operands.get(0)));
|
||||
} else {
|
||||
// Cannot handle this condition.
|
||||
return false;
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return Optional.of(new ConditionAnalysis(numLeftFields, equalitySubConditions, literalSubConditions));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -148,4 +246,148 @@ public class DruidJoinRule extends RelOptRule
|
|||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private static boolean isLeftExpression(final RexNode rexNode, final int numLeftFields)
|
||||
{
|
||||
return ImmutableBitSet.range(numLeftFields).contains(RelOptUtil.InputFinder.bits(rexNode));
|
||||
}
|
||||
|
||||
private static boolean isRightInputRef(final RexNode rexNode, final int numLeftFields)
|
||||
{
|
||||
return rexNode.isA(SqlKind.INPUT_REF) && ((RexInputRef) rexNode).getIndex() >= numLeftFields;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class ConditionAnalysis
|
||||
{
|
||||
/**
|
||||
* Number of fields on the left-hand side. Useful for identifying if a particular field is from on the left
|
||||
* or right side of a join.
|
||||
*/
|
||||
private final int numLeftFields;
|
||||
|
||||
/**
|
||||
* Each equality subcondition is an equality of the form f(LeftRel) = g(RightRel).
|
||||
*/
|
||||
private final List<Pair<RexNode, RexInputRef>> equalitySubConditions;
|
||||
|
||||
/**
|
||||
* Each literal subcondition is... a literal.
|
||||
*/
|
||||
private final List<RexLiteral> literalSubConditions;
|
||||
|
||||
ConditionAnalysis(
|
||||
int numLeftFields,
|
||||
List<Pair<RexNode, RexInputRef>> equalitySubConditions,
|
||||
List<RexLiteral> literalSubConditions
|
||||
)
|
||||
{
|
||||
this.numLeftFields = numLeftFields;
|
||||
this.equalitySubConditions = equalitySubConditions;
|
||||
this.literalSubConditions = literalSubConditions;
|
||||
}
|
||||
|
||||
public ConditionAnalysis pushThroughLeftProject(final Project leftProject)
|
||||
{
|
||||
// Pushing through the project will shift right-hand field references by this amount.
|
||||
final int rhsShift =
|
||||
leftProject.getInput().getRowType().getFieldCount() - leftProject.getRowType().getFieldCount();
|
||||
|
||||
return new ConditionAnalysis(
|
||||
leftProject.getInput().getRowType().getFieldCount(),
|
||||
equalitySubConditions
|
||||
.stream()
|
||||
.map(
|
||||
equality -> Pair.of(
|
||||
RelOptUtil.pushPastProject(equality.lhs, leftProject),
|
||||
(RexInputRef) RexUtil.shift(equality.rhs, rhsShift)
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList()),
|
||||
literalSubConditions
|
||||
);
|
||||
}
|
||||
|
||||
public ConditionAnalysis pushThroughRightProject(final Project rightProject)
|
||||
{
|
||||
Preconditions.checkArgument(onlyUsesMappingsFromRightProject(rightProject), "Cannot push through");
|
||||
|
||||
return new ConditionAnalysis(
|
||||
numLeftFields,
|
||||
equalitySubConditions
|
||||
.stream()
|
||||
.map(
|
||||
equality -> Pair.of(
|
||||
equality.lhs,
|
||||
(RexInputRef) RexUtil.shift(
|
||||
RelOptUtil.pushPastProject(
|
||||
RexUtil.shift(equality.rhs, -numLeftFields),
|
||||
rightProject
|
||||
),
|
||||
numLeftFields
|
||||
)
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList()),
|
||||
literalSubConditions
|
||||
);
|
||||
}
|
||||
|
||||
public boolean onlyUsesMappingsFromRightProject(final Project rightProject)
|
||||
{
|
||||
for (Pair<RexNode, RexInputRef> equality : equalitySubConditions) {
|
||||
final int rightIndex = equality.rhs.getIndex() - numLeftFields;
|
||||
|
||||
if (!rightProject.getProjects().get(rightIndex).isA(SqlKind.INPUT_REF)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public RexNode getCondition(final RexBuilder rexBuilder)
|
||||
{
|
||||
return RexUtil.composeConjunction(
|
||||
rexBuilder,
|
||||
Iterables.concat(
|
||||
literalSubConditions,
|
||||
equalitySubConditions
|
||||
.stream()
|
||||
.map(equality -> rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, equality.lhs, equality.rhs))
|
||||
.collect(Collectors.toList())
|
||||
),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
ConditionAnalysis that = (ConditionAnalysis) o;
|
||||
return Objects.equals(equalitySubConditions, that.equalitySubConditions) &&
|
||||
Objects.equals(literalSubConditions, that.literalSubConditions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(equalitySubConditions, literalSubConditions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ConditionAnalysis{" +
|
||||
"equalitySubConditions=" + equalitySubConditions +
|
||||
", literalSubConditions=" + literalSubConditions +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3680,7 +3680,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
|
||||
// JOIN condition with a function of both sides.
|
||||
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
|
||||
+ "FROM foo INNER JOIN lookup.lookyloo l ON CHARACTER_LENGTH(foo.dim2 || l.k) > 3\n"
|
||||
+ "FROM foo INNER JOIN lookup.lookyloo l ON CHARACTER_LENGTH(foo.dim2 || l.k) > 3\n",
|
||||
|
||||
// Interpreted as a JOIN against VALUES.
|
||||
"SELECT COUNT(*) FROM foo WHERE dim1 IN (NULL)"
|
||||
);
|
||||
|
||||
for (final String query : queries) {
|
||||
|
@ -6796,7 +6799,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
|
||||
final String explanation =
|
||||
"DruidOuterQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"__subquery__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"postAggregations\":[],\"limit\":2147483647,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"skipEmptyBuckets\":true,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"}}], signature=[{a0:LONG}])\n"
|
||||
+ " DruidJoinQueryRel(condition=[=(SUBSTRING($3, 1, 1), $8)], joinType=[INNER], query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"__join__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim2\",\"outputName\":\"d0\",\"outputType\":\"STRING\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n"
|
||||
+ " DruidJoinQueryRel(condition=[=(SUBSTRING($3, 1, 1), $8)], joinType=[inner], query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"__join__\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim2\",\"outputName\":\"d0\",\"outputType\":\"STRING\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"order\":\"none\",\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, dim3:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":null,\"extractionFn\":null}},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"extraction\",\"dimension\":\"dim1\",\"outputName\":\"d0\",\"outputType\":\"STRING\",\"extractionFn\":{\"type\":\"substring\",\"index\":0,\"length\":1}}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\"},\"descending\":false}], signature=[{d0:STRING}])\n";
|
||||
|
||||
|
@ -8069,10 +8072,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
@Test
|
||||
public void testFilterAndGroupByLookupUsingJoinOperatorBackwards() throws Exception
|
||||
{
|
||||
// Like "testFilterAndGroupByLookupUsingJoinOperator", but with the table and lookup reversed.
|
||||
|
||||
// Cannot vectorize JOIN operator.
|
||||
cannotVectorize();
|
||||
|
||||
// Like "testFilterAndGroupByLookupUsingJoinOperator", but with the table and lookup reversed.
|
||||
testQuery(
|
||||
"SELECT lookyloo.v, COUNT(*)\n"
|
||||
+ "FROM lookup.lookyloo RIGHT JOIN foo ON foo.dim2 = lookyloo.k\n"
|
||||
|
@ -8082,17 +8086,24 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
GroupByQuery.builder()
|
||||
.setDataSource(
|
||||
join(
|
||||
new TableDataSource(CalciteTests.DATASOURCE1),
|
||||
new LookupDataSource("lookyloo"),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("dim2")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"j0.",
|
||||
equalsCondition(DruidExpression.fromColumn("dim2"), DruidExpression.fromColumn("j0.k")),
|
||||
JoinType.LEFT
|
||||
equalsCondition(DruidExpression.fromColumn("k"), DruidExpression.fromColumn("j0.dim2")),
|
||||
JoinType.RIGHT
|
||||
)
|
||||
)
|
||||
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||
.setDimFilter(not(selector("j0.v", "xa", null)))
|
||||
.setDimFilter(not(selector("v", "xa", null)))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(dimensions(new DefaultDimensionSpec("j0.v", "d0")))
|
||||
.setDimensions(dimensions(new DefaultDimensionSpec("v", "d0")))
|
||||
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
|
@ -8380,6 +8391,387 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInnerJoinTwoLookupsToTableUsingNumericColumn() throws Exception
|
||||
{
|
||||
// Regression test for https://github.com/apache/druid/issues/9646.
|
||||
|
||||
// Cannot vectorize JOIN operator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT COUNT(*)\n"
|
||||
+ "FROM foo\n"
|
||||
+ "INNER JOIN lookup.lookyloo l1 ON l1.k = foo.m1\n"
|
||||
+ "INNER JOIN lookup.lookyloo l2 ON l2.k = l1.k",
|
||||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(
|
||||
join(
|
||||
join(
|
||||
new TableDataSource(CalciteTests.DATASOURCE1),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(new LookupDataSource("lookyloo"))
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.virtualColumns(
|
||||
expressionVirtualColumn(
|
||||
"v0",
|
||||
"CAST(\"k\", 'DOUBLE')",
|
||||
ValueType.FLOAT
|
||||
)
|
||||
)
|
||||
.columns("k", "v0")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("m1"),
|
||||
DruidExpression.fromColumn("j0.v0")
|
||||
),
|
||||
JoinType.INNER
|
||||
),
|
||||
new LookupDataSource("lookyloo"),
|
||||
"_j0.",
|
||||
equalsCondition(DruidExpression.fromColumn("j0.k"), DruidExpression.fromColumn("_j0.k")),
|
||||
JoinType.INNER
|
||||
)
|
||||
)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.aggregators(new CountAggregatorFactory("a0"))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{1L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInnerJoinTwoLookupsToTableUsingNumericColumnInReverse() throws Exception
|
||||
{
|
||||
// Like "testInnerJoinTwoLookupsToTableUsingNumericColumn", but the tables are specified backwards.
|
||||
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT COUNT(*)\n"
|
||||
+ "FROM lookup.lookyloo l1\n"
|
||||
+ "INNER JOIN lookup.lookyloo l2 ON l1.k = l2.k\n"
|
||||
+ "INNER JOIN foo on l2.k = foo.m1",
|
||||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(
|
||||
join(
|
||||
join(
|
||||
new LookupDataSource("lookyloo"),
|
||||
new LookupDataSource("lookyloo"),
|
||||
"j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("k"),
|
||||
DruidExpression.fromColumn("j0.k")
|
||||
),
|
||||
JoinType.INNER
|
||||
),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("m1")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"_j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromExpression("CAST(\"j0.k\", 'DOUBLE')"),
|
||||
DruidExpression.fromColumn("_j0.m1")
|
||||
),
|
||||
JoinType.INNER
|
||||
)
|
||||
)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.aggregators(new CountAggregatorFactory("a0"))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{1L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInnerJoinLookupTableTable() throws Exception
|
||||
{
|
||||
// Regression test for https://github.com/apache/druid/issues/9646.
|
||||
|
||||
// Cannot vectorize JOIN operator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT l.k, l.v, SUM(f.m1), SUM(nf.m1)\n"
|
||||
+ "FROM lookup.lookyloo l\n"
|
||||
+ "INNER JOIN druid.foo f on f.dim1 = l.k\n"
|
||||
+ "INNER JOIN druid.numfoo nf on nf.dim1 = l.k\n"
|
||||
+ "GROUP BY 1, 2 ORDER BY 2",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(
|
||||
join(
|
||||
join(
|
||||
new LookupDataSource("lookyloo"),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("dim1", "m1")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("k"),
|
||||
DruidExpression.fromColumn("j0.dim1")
|
||||
),
|
||||
JoinType.INNER
|
||||
),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE3)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("dim1", "m1")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"_j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("k"),
|
||||
DruidExpression.fromColumn("_j0.dim1")
|
||||
),
|
||||
JoinType.INNER
|
||||
)
|
||||
)
|
||||
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(
|
||||
dimensions(
|
||||
new DefaultDimensionSpec("k", "d0"),
|
||||
new DefaultDimensionSpec("v", "d1")
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
aggregators(
|
||||
new DoubleSumAggregatorFactory("a0", "j0.m1"),
|
||||
new DoubleSumAggregatorFactory("a1", "_j0.m1")
|
||||
)
|
||||
)
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
ImmutableList.of(new OrderByColumnSpec("d1", Direction.ASCENDING)),
|
||||
null
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"abc", "xabc", 6d, 6d}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInnerJoinLookupTableTableChained() throws Exception
|
||||
{
|
||||
// Cannot vectorize JOIN operator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT l.k, l.v, SUM(f.m1), SUM(nf.m1)\n"
|
||||
+ "FROM lookup.lookyloo l\n"
|
||||
+ "INNER JOIN druid.foo f on f.dim1 = l.k\n"
|
||||
+ "INNER JOIN druid.numfoo nf on nf.dim1 = f.dim1\n"
|
||||
+ "GROUP BY 1, 2 ORDER BY 2",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(
|
||||
join(
|
||||
join(
|
||||
new LookupDataSource("lookyloo"),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("dim1", "m1")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("k"),
|
||||
DruidExpression.fromColumn("j0.dim1")
|
||||
),
|
||||
JoinType.INNER
|
||||
),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE3)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("dim1", "m1")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"_j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("j0.dim1"),
|
||||
DruidExpression.fromColumn("_j0.dim1")
|
||||
),
|
||||
JoinType.INNER
|
||||
)
|
||||
)
|
||||
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(
|
||||
dimensions(
|
||||
new DefaultDimensionSpec("k", "d0"),
|
||||
new DefaultDimensionSpec("v", "d1")
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
aggregators(
|
||||
new DoubleSumAggregatorFactory("a0", "j0.m1"),
|
||||
new DoubleSumAggregatorFactory("a1", "_j0.m1")
|
||||
)
|
||||
)
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
ImmutableList.of(new OrderByColumnSpec("d1", Direction.ASCENDING)),
|
||||
null
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"abc", "xabc", 6d, 6d}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhereInSelectNullFromLookup() throws Exception
|
||||
{
|
||||
// Regression test for https://github.com/apache/druid/issues/9646.
|
||||
|
||||
testQuery(
|
||||
"SELECT * FROM foo where dim1 IN (SELECT NULL FROM lookup.lookyloo)",
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommaJoinLeftFunction() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
|
||||
+ "FROM foo, lookup.lookyloo l\n"
|
||||
+ "WHERE SUBSTRING(foo.dim2, 1, 1) = l.k\n",
|
||||
ImmutableList.of(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(
|
||||
join(
|
||||
new TableDataSource(CalciteTests.DATASOURCE1),
|
||||
new LookupDataSource("lookyloo"),
|
||||
"j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromExpression("substring(\"dim2\", 0, 1)"),
|
||||
DruidExpression.fromColumn("j0.k")
|
||||
),
|
||||
JoinType.INNER
|
||||
)
|
||||
)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.columns("dim1", "dim2", "j0.k", "j0.v")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"", "a", "a", "xa"},
|
||||
new Object[]{"1", "a", "a", "xa"},
|
||||
new Object[]{"def", "abc", "a", "xa"}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommaJoinTableLookupTableMismatchedTypes() throws Exception
|
||||
{
|
||||
// Regression test for https://github.com/apache/druid/issues/9646.
|
||||
|
||||
// Cannot vectorize JOIN operator.
|
||||
cannotVectorize();
|
||||
|
||||
testQuery(
|
||||
"SELECT COUNT(*)\n"
|
||||
+ "FROM foo, lookup.lookyloo l, numfoo\n"
|
||||
+ "WHERE foo.cnt = l.k AND l.k = numfoo.cnt\n",
|
||||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(
|
||||
join(
|
||||
join(
|
||||
new TableDataSource(CalciteTests.DATASOURCE1),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(new LookupDataSource("lookyloo"))
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.virtualColumns(
|
||||
expressionVirtualColumn("v0", "CAST(\"k\", 'LONG')", ValueType.LONG)
|
||||
)
|
||||
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
|
||||
.columns("k", "v0")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromColumn("cnt"),
|
||||
DruidExpression.fromColumn("j0.v0")
|
||||
),
|
||||
JoinType.INNER
|
||||
),
|
||||
new QueryDataSource(
|
||||
newScanQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE3)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
|
||||
.columns("cnt")
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
"_j0.",
|
||||
equalsCondition(
|
||||
DruidExpression.fromExpression("CAST(\"j0.k\", 'LONG')"),
|
||||
DruidExpression.fromColumn("_j0.cnt")
|
||||
),
|
||||
JoinType.INNER
|
||||
)
|
||||
)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.aggregators(new CountAggregatorFactory("a0"))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInnerJoinCastLeft() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue