SQL: Allow Scans to be used as outer queries. (#11831)

* SQL: Allow Scans to be used as outer queries.

This has been possible in the native query system for a while, but the capability
hasn't yet propagated into the SQL layer. One example of where this is useful is
a query like:

  SELECT * FROM (... LIMIT X) WHERE <filter>

Because this expands the kinds of subquery structures the SQL layer will consider,
it was also necessary to improve the cost calculations. These changes appear in
PartialDruidQuery and DruidOuterQueryRel. The ideas are:

- Attach per-column penalties to the output signature of each query, instead of to
  the initial projection that starts a query. This encourages moving projections
  into subqueries instead of leaving them on outer queries.
- Only attach penalties to projections if there are actually expressions happening.
  So, now, projections that simply reorder or remove fields are free.
- Attach a constant penalty to every outer query. This discourages creating them
  when they are not needed.

The changes are generally beneficial to the test cases we have in CalciteQueryTest.
Most plans are unchanged, or are changed in purely cosmetic ways. Two have changed
for the better:

- testUsingSubqueryWithLimit now returns a constant from the subquery, instead of
  returning every column.
- testJoinOuterGroupByAndSubqueryHasLimit returns a minimal set of columns from
  the innermost subquery; two unnecessary columns are no longer there.

* Fix various DS operator conversions.

These were all implemented as direct conversions, which isn't appropriate
because they do not actually map onto native functions. These are only
usable as post-aggregations.

* Test case adjustment.
This commit is contained in:
Gian Merlino 2021-10-23 17:18:43 -07:00 committed by GitHub
parent 98ecbb21cd
commit d4cace385f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 197 additions and 176 deletions

View File

@ -32,16 +32,16 @@ import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimatePostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
public class HllSketchEstimateOperatorConversion extends DirectOperatorConversion
public class HllSketchEstimateOperatorConversion implements SqlOperatorConversion
{
private static final String FUNCTION_NAME = "HLL_SKETCH_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
@ -51,11 +51,6 @@ public class HllSketchEstimateOperatorConversion extends DirectOperatorConversio
.returnTypeInference(ReturnTypes.DOUBLE)
.build();
public HllSketchEstimateOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{

View File

@ -31,16 +31,16 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
public class HllSketchEstimateWithErrorBoundsOperatorConversion extends DirectOperatorConversion
public class HllSketchEstimateWithErrorBoundsOperatorConversion implements SqlOperatorConversion
{
private static final String FUNCTION_NAME = "HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
@ -50,12 +50,6 @@ public class HllSketchEstimateWithErrorBoundsOperatorConversion extends DirectOp
.returnTypeNonNull(SqlTypeName.OTHER)
.build();
public HllSketchEstimateWithErrorBoundsOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{

View File

@ -29,16 +29,16 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToStringPostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
public class HllSketchToStringOperatorConversion extends DirectOperatorConversion
public class HllSketchToStringOperatorConversion implements SqlOperatorConversion
{
private static final String FUNCTION_NAME = "HLL_SKETCH_TO_STRING";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
@ -47,11 +47,6 @@ public class HllSketchToStringOperatorConversion extends DirectOperatorConversio
.returnTypeNonNull(SqlTypeName.VARCHAR)
.build();
public HllSketchToStringOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{

View File

@ -40,7 +40,7 @@ public class DoublesSketchQuantileOperatorConversion extends DoublesSketchSingle
public DoublesSketchQuantileOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
super(SQL_FUNCTION);
}
@Override

View File

@ -39,7 +39,7 @@ public class DoublesSketchRankOperatorConversion extends DoublesSketchSingleArgB
public DoublesSketchRankOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
super(SQL_FUNCTION);
}
@Override

View File

@ -26,23 +26,28 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
public abstract class DoublesSketchSingleArgBaseOperatorConversion extends DirectOperatorConversion
public abstract class DoublesSketchSingleArgBaseOperatorConversion implements SqlOperatorConversion
{
protected DoublesSketchSingleArgBaseOperatorConversion(
SqlOperator operator,
String druidFunctionName
)
private final SqlOperator operator;
protected DoublesSketchSingleArgBaseOperatorConversion(SqlOperator operator)
{
super(operator, druidFunctionName);
this.operator = operator;
}
@Override
public SqlOperator calciteOperator()
{
return operator;
}
@Override

View File

@ -29,16 +29,16 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
public class DoublesSketchSummaryOperatorConversion extends DirectOperatorConversion
public class DoublesSketchSummaryOperatorConversion implements SqlOperatorConversion
{
private static final String FUNCTION_NAME = "DS_QUANTILE_SUMMARY";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
@ -47,11 +47,6 @@ public class DoublesSketchSummaryOperatorConversion extends DirectOperatorConver
.returnTypeNonNull(SqlTypeName.VARCHAR)
.build();
public DoublesSketchSummaryOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{

View File

@ -29,16 +29,16 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
public class ThetaSketchEstimateOperatorConversion extends DirectOperatorConversion
public class ThetaSketchEstimateOperatorConversion implements SqlOperatorConversion
{
private static final String FUNCTION_NAME = "THETA_SKETCH_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
@ -47,11 +47,6 @@ public class ThetaSketchEstimateOperatorConversion extends DirectOperatorConvers
.returnTypeInference(ReturnTypes.DOUBLE)
.build();
public ThetaSketchEstimateOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{

View File

@ -31,16 +31,16 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
public class ThetaSketchEstimateWithErrorBoundsOperatorConversion extends DirectOperatorConversion
public class ThetaSketchEstimateWithErrorBoundsOperatorConversion implements SqlOperatorConversion
{
private static final String FUNCTION_NAME = "THETA_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
@ -49,12 +49,6 @@ public class ThetaSketchEstimateWithErrorBoundsOperatorConversion extends Direct
.returnTypeNonNull(SqlTypeName.OTHER)
.build();
public ThetaSketchEstimateWithErrorBoundsOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{

View File

@ -26,6 +26,9 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
/**
* Conversion for SQL operators that map 1-1 onto native functions.
*/
public class DirectOperatorConversion implements SqlOperatorConversion
{
private final SqlOperator operator;

View File

@ -35,9 +35,9 @@ public class CostEstimates
static final double COST_BASE = 1;
/**
* Cost to read a value out of a column directly.
* Cost to include a column in query output.
*/
static final double COST_COLUMN_READ = 0.05;
static final double COST_OUTPUT_COLUMN = 0.05;
/**
* Cost to compute and read an expression.
@ -77,14 +77,14 @@ public class CostEstimates
static final double MULTIPLIER_OUTER_QUERY = .1;
/**
* Cost to add to a join when either side is a subquery. Strongly encourages avoiding subqueries, since they must be
* inlined and then the join must run on the Broker.
* Cost to add to a subquery. Strongly encourages avoiding subqueries, since they must be inlined and then the join
* must run on the Broker.
*/
static final double COST_JOIN_SUBQUERY = 1e5;
static final double COST_SUBQUERY = 1e5;
/**
* Cost to perform a cross join. Strongly encourages pushing down filters into join conditions, even if it means
* we need to add a subquery (this is higher than {@link #COST_JOIN_SUBQUERY}).
* we need to add a subquery (this is higher than {@link #COST_SUBQUERY}).
*/
static final double COST_JOIN_CROSS = 1e8;

View File

@ -316,7 +316,7 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
double cost;
if (computeLeftRequiresSubquery(getSomeDruidChild(left))) {
cost = CostEstimates.COST_JOIN_SUBQUERY;
cost = CostEstimates.COST_SUBQUERY;
} else {
cost = partialQuery.estimateCost();
if (joinRel.getJoinType() == JoinRelType.INNER && plannerConfig.isComputeInnerJoinCostAsFilter()) {
@ -325,7 +325,7 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
}
if (computeRightRequiresSubquery(getSomeDruidChild(right))) {
cost += CostEstimates.COST_JOIN_SUBQUERY;
cost += CostEstimates.COST_SUBQUERY;
}
if (joinRel.getCondition().isA(SqlKind.LITERAL) && !joinRel.getCondition().isAlwaysFalse()) {

View File

@ -72,7 +72,7 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
{
return new DruidOuterQueryRel(
sourceRel.getCluster(),
sourceRel.getTraitSet(),
sourceRel.getTraitSet().plusAll(partialQuery.getRelTraits()),
sourceRel,
partialQuery,
sourceRel.getQueryMaker()
@ -217,6 +217,7 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
{
return planner.getCostFactory()
.makeCost(partialQuery.estimateCost(), 0, 0)
.multiplyBy(CostEstimates.MULTIPLIER_OUTER_QUERY);
.multiplyBy(CostEstimates.MULTIPLIER_OUTER_QUERY)
.plus(planner.getCostFactory().makeCost(CostEstimates.COST_SUBQUERY, 0, 0));
}
}

View File

@ -735,16 +735,15 @@ public class DruidQuery
private Query computeQuery()
{
if (dataSource instanceof QueryDataSource) {
// If there is a subquery then the outer query must be a groupBy.
// If there is a subquery, then we prefer the outer query to be a groupBy if possible, since this potentially
// enables more efficient execution. (The groupBy query toolchest can handle some subqueries by itself, without
// requiring the Broker to inline results.)
final GroupByQuery outerQuery = toGroupByQuery();
if (outerQuery == null) {
// Bug in the planner rules. They shouldn't allow this to happen.
throw new IllegalStateException("Can't use QueryDataSource without an outer groupBy query!");
}
if (outerQuery != null) {
return outerQuery;
}
}
final TimeseriesQuery tsQuery = toTimeseriesQuery();
if (tsQuery != null) {

View File

@ -404,11 +404,10 @@ public class PartialDruidQuery
{
double cost = CostEstimates.COST_BASE;
// Account for the cost of post-scan expressions.
if (getSelectProject() != null) {
for (final RexNode rexNode : getSelectProject().getChildExps()) {
if (rexNode.isA(SqlKind.INPUT_REF)) {
cost += CostEstimates.COST_COLUMN_READ;
} else {
if (!rexNode.isA(SqlKind.INPUT_REF)) {
cost += CostEstimates.COST_EXPRESSION;
}
}
@ -421,12 +420,6 @@ public class PartialDruidQuery
}
if (getAggregate() != null) {
if (getSelectProject() == null) {
// No projection before aggregation, that means the aggregate operator is reading things directly.
// Account for the costs.
cost += CostEstimates.COST_COLUMN_READ * getAggregate().getGroupSet().size();
}
cost += CostEstimates.COST_DIMENSION * getAggregate().getGroupSet().size();
cost += CostEstimates.COST_AGGREGATION * getAggregate().getAggCallList().size();
}
@ -441,13 +434,26 @@ public class PartialDruidQuery
}
}
// Account for the cost of post-aggregation expressions.
if (getAggregateProject() != null) {
cost += CostEstimates.COST_EXPRESSION * getAggregateProject().getChildExps().size();
for (final RexNode rexNode : getAggregateProject().getChildExps()) {
if (!rexNode.isA(SqlKind.INPUT_REF)) {
cost += CostEstimates.COST_EXPRESSION;
}
}
}
// Account for the cost of post-sort expressions.
if (getSortProject() != null) {
cost += CostEstimates.COST_EXPRESSION * getSortProject().getChildExps().size();
for (final RexNode rexNode : getSortProject().getChildExps()) {
if (!rexNode.isA(SqlKind.INPUT_REF)) {
cost += CostEstimates.COST_EXPRESSION;
}
}
}
// Account for the cost of generating outputs.
cost += CostEstimates.COST_OUTPUT_COLUMN * getRowType().getFieldCount();
return cost;
}

View File

@ -112,7 +112,11 @@ public class DruidJoinRule extends RelOptRule
// Already verified to be present in "matches", so just call "get".
// Can't be final, because we're going to reassign it up to a couple of times.
ConditionAnalysis conditionAnalysis = analyzeCondition(join.getCondition(), join.getLeft().getRowType(), right).get();
ConditionAnalysis conditionAnalysis = analyzeCondition(
join.getCondition(),
join.getLeft().getRowType(),
right
).get();
final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (left instanceof DruidQueryRel);
if (left.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT
@ -147,10 +151,14 @@ public class DruidJoinRule extends RelOptRule
final Project rightProject = right.getPartialDruidQuery().getSelectProject();
// Right-side projection expressions rewritten to be on top of the join.
Iterables.addAll(
newProjectExprs,
RexUtil.shift(rightProject.getProjects(), newLeft.getRowType().getFieldCount())
);
for (final RexNode rexNode : RexUtil.shift(rightProject.getProjects(), newLeft.getRowType().getFieldCount())) {
if (join.getJoinType().generatesNullsOnRight()) {
newProjectExprs.add(makeNullableIfLiteral(rexNode, rexBuilder));
} else {
newProjectExprs.add(rexNode);
}
}
newRight = right.withPartialQuery(PartialDruidQuery.create(rightScan));
conditionAnalysis = conditionAnalysis.pushThroughRightProject(rightProject);
} else {
@ -195,6 +203,19 @@ public class DruidJoinRule extends RelOptRule
call.transformTo(relBuilder.build());
}
private static RexNode makeNullableIfLiteral(final RexNode rexNode, final RexBuilder rexBuilder)
{
if (rexNode.isA(SqlKind.LITERAL)) {
return rexBuilder.makeLiteral(
RexLiteral.value(rexNode),
rexBuilder.getTypeFactory().createTypeWithNullability(rexNode.getType(), true),
false
);
} else {
return rexNode;
}
}
/**
* Returns whether {@link #analyzeCondition} would return something.
*/
@ -208,7 +229,11 @@ public class DruidJoinRule extends RelOptRule
* If this condition is an AND of some combination of (1) literals; (2) equality conditions of the form
* {@code f(LeftRel) = RightColumn}, then return a {@link ConditionAnalysis}.
*/
private static Optional<ConditionAnalysis> analyzeCondition(final RexNode condition, final RelDataType leftRowType, DruidRel<?> right)
private static Optional<ConditionAnalysis> analyzeCondition(
final RexNode condition,
final RelDataType leftRowType,
DruidRel<?> right
)
{
final List<RexNode> subConditions = decomposeAnd(condition);
final List<Pair<RexNode, RexInputRef>> equalitySubConditions = new ArrayList<>();

View File

@ -19,7 +19,6 @@
package org.apache.druid.sql.calcite.rule;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
@ -38,9 +37,11 @@ import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Predicate;
public class DruidRules
{
@SuppressWarnings("rawtypes")
public static final Predicate<DruidRel> CAN_BUILD_ON = druidRel -> druidRel.getPartialDruidQuery() != null;
private DruidRules()
@ -88,10 +89,9 @@ public class DruidRules
PartialDruidQuery::withSortProject
),
DruidOuterQueryRule.AGGREGATE,
DruidOuterQueryRule.FILTER_AGGREGATE,
DruidOuterQueryRule.FILTER_PROJECT_AGGREGATE,
DruidOuterQueryRule.PROJECT_AGGREGATE,
DruidOuterQueryRule.AGGREGATE_SORT_PROJECT,
DruidOuterQueryRule.WHERE_FILTER,
DruidOuterQueryRule.SELECT_PROJECT,
DruidOuterQueryRule.SORT,
DruidUnionRule.instance(),
DruidUnionDataSourceRule.instance(),
DruidSortUnionRule.instance(),
@ -111,7 +111,7 @@ public class DruidRules
)
{
super(
operand(relClass, operand(DruidRel.class, null, CAN_BUILD_ON, any())),
operand(relClass, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage)
);
this.stage = stage;
@ -143,7 +143,7 @@ public class DruidRules
public abstract static class DruidOuterQueryRule extends RelOptRule
{
public static final RelOptRule AGGREGATE = new DruidOuterQueryRule(
operand(Aggregate.class, operand(DruidRel.class, null, CAN_BUILD_ON, any())),
operand(Aggregate.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"AGGREGATE"
)
{
@ -164,23 +164,21 @@ public class DruidRules
}
};
public static final RelOptRule FILTER_AGGREGATE = new DruidOuterQueryRule(
operand(Aggregate.class, operand(Filter.class, operand(DruidRel.class, null, CAN_BUILD_ON, any()))),
"FILTER_AGGREGATE"
public static final RelOptRule WHERE_FILTER = new DruidOuterQueryRule(
operand(Filter.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"WHERE_FILTER"
)
{
@Override
public void onMatch(final RelOptRuleCall call)
{
final Aggregate aggregate = call.rel(0);
final Filter filter = call.rel(1);
final DruidRel druidRel = call.rel(2);
final Filter filter = call.rel(0);
final DruidRel druidRel = call.rel(1);
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
.withWhereFilter(filter)
.withAggregate(aggregate)
);
if (outerQueryRel.isValidDruidQuery()) {
call.transformTo(outerQueryRel);
@ -188,28 +186,21 @@ public class DruidRules
}
};
public static final RelOptRule FILTER_PROJECT_AGGREGATE = new DruidOuterQueryRule(
operand(
Aggregate.class,
operand(Project.class, operand(Filter.class, operand(DruidRel.class, null, CAN_BUILD_ON, any())))
),
"FILTER_PROJECT_AGGREGATE"
public static final RelOptRule SELECT_PROJECT = new DruidOuterQueryRule(
operand(Project.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"SELECT_PROJECT"
)
{
@Override
public void onMatch(final RelOptRuleCall call)
{
final Aggregate aggregate = call.rel(0);
final Project project = call.rel(1);
final Filter filter = call.rel(2);
final DruidRel druidRel = call.rel(3);
final Project filter = call.rel(0);
final DruidRel druidRel = call.rel(1);
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
.withWhereFilter(filter)
.withSelectProject(project)
.withAggregate(aggregate)
.withSelectProject(filter)
);
if (outerQueryRel.isValidDruidQuery()) {
call.transformTo(outerQueryRel);
@ -217,52 +208,21 @@ public class DruidRules
}
};
public static final RelOptRule PROJECT_AGGREGATE = new DruidOuterQueryRule(
operand(Aggregate.class, operand(Project.class, operand(DruidRel.class, null, CAN_BUILD_ON, any()))),
"PROJECT_AGGREGATE"
public static final RelOptRule SORT = new DruidOuterQueryRule(
operand(Sort.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"SORT"
)
{
@Override
public void onMatch(final RelOptRuleCall call)
{
final Aggregate aggregate = call.rel(0);
final Project project = call.rel(1);
final DruidRel druidRel = call.rel(2);
final Sort sort = call.rel(0);
final DruidRel druidRel = call.rel(1);
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
.withSelectProject(project)
.withAggregate(aggregate)
);
if (outerQueryRel.isValidDruidQuery()) {
call.transformTo(outerQueryRel);
}
}
};
public static final RelOptRule AGGREGATE_SORT_PROJECT = new DruidOuterQueryRule(
operand(
Project.class,
operand(Sort.class, operand(Aggregate.class, operand(DruidRel.class, null, CAN_BUILD_ON, any())))
),
"AGGREGATE_SORT_PROJECT"
)
{
@Override
public void onMatch(RelOptRuleCall call)
{
final Project sortProject = call.rel(0);
final Sort sort = call.rel(1);
final Aggregate aggregate = call.rel(2);
final DruidRel druidRel = call.rel(3);
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
.withAggregate(aggregate)
.withSort(sort)
.withSortProject(sortProject)
);
if (outerQueryRel.isValidDruidQuery()) {
call.transformTo(outerQueryRel);

View File

@ -131,7 +131,12 @@ public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest
.setDimensions(new DefaultDimensionSpec("d1", "_d0"))
.setAggregatorSpecs(
new LongSumAggregatorFactory("_a0:sum", "a0"),
new CountAggregatorFactory("_a0:count")
useDefault
? new CountAggregatorFactory("_a0:count")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0:count"),
not(selector("a0", null, null))
)
)
.setPostAggregatorSpecs(Collections.singletonList(new ArithmeticPostAggregator(
"_a0",

View File

@ -437,7 +437,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.intervals(querySegmentSpec(Filtration.eternity()))
.limit(10)
.columns("dim2", "j0.m1", "m1", "m2")
.columns("dim2", "m2")
.context(QUERY_CONTEXT_DEFAULT)
.build()
)
@ -2161,6 +2161,42 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testOrderThenLimitThenFilter() throws Exception
{
testQuery(
"SELECT dim1 FROM "
+ "(SELECT __time, dim1 FROM druid.foo ORDER BY __time DESC LIMIT 4) "
+ "WHERE dim1 IN ('abc', 'def')",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns(ImmutableList.of("__time", "dim1"))
.limit(4)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns(ImmutableList.of("dim1"))
.filters(in("dim1", Arrays.asList("abc", "def"), null))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"abc"},
new Object[]{"def"}
)
);
}
@Test
public void testGroupBySingleColumnDescendingNoTopN() throws Exception
{
@ -7966,8 +8002,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
new DefaultDimensionSpec("dim2", "d0"),
new DefaultDimensionSpec("dim1", "d1")
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setContext(QUERY_CONTEXT_DEFAULT)
@ -7976,12 +8012,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("d0", "_d0")))
.setDimensions(dimensions(new DefaultDimensionSpec("d1", "_d0")))
.setAggregatorSpecs(aggregators(
new LongSumAggregatorFactory("_a0", "a0"),
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a1"),
not(selector("d1", null, null))
not(selector("d0", null, null))
)
))
.setContext(QUERY_CONTEXT_DEFAULT)
@ -8212,8 +8248,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE),
new DefaultDimensionSpec("dim1", "d1")
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE)
))
.setDimFilter(new SelectorDimFilter("m1", "5.0", null))
.setAggregatorSpecs(aggregators(new LongMaxAggregatorFactory("a0", "__time")))
@ -8231,7 +8267,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(
new DefaultDimensionSpec("v0", "_d0", ColumnType.LONG),
new DefaultDimensionSpec("d1", "_d1", ColumnType.STRING)
new DefaultDimensionSpec("d0", "_d1", ColumnType.STRING)
))
.setAggregatorSpecs(
aggregators(
@ -8239,7 +8275,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
? new CountAggregatorFactory("_a0")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0"),
not(selector("d0", null, null))
not(selector("d1", null, null))
)
)
)
@ -8513,10 +8549,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new FieldAccessPostAggregator(null, "_a2:count")
)
),
expressionPostAgg("s0", "timestamp_extract(\"_a3\",'EPOCH','UTC')")
expressionPostAgg("p0", "timestamp_extract(\"_a3\",'EPOCH','UTC')")
)
)
.setLimit(1)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -9478,8 +9513,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setGranularity(Granularities.ALL)
.setDimensions(
dimensions(
new DefaultDimensionSpec("dim2", "d0"),
new DefaultDimensionSpec("dim1", "d1")
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
)
.setAggregatorSpecs(
@ -9490,7 +9525,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec("d1", OrderByColumnSpec.Direction.ASCENDING)
new OrderByColumnSpec("d0", OrderByColumnSpec.Direction.ASCENDING)
),
4
)
@ -15352,7 +15387,8 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.virtualColumns(expressionVirtualColumn("v0", "0", ColumnType.LONG))
.columns("v0")
.limit(10)
.context(QUERY_CONTEXT_DEFAULT)
.build()
@ -15593,27 +15629,40 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
.setDimensions(
useDefault
? dimensions(
new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE),
new DefaultDimensionSpec("dim1", "d2")
))
) : dimensions(
new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
new DefaultDimensionSpec("dim1", "d1"),
new DefaultDimensionSpec("m2", "d2", ColumnType.DOUBLE)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
.setDimensions(
useDefault
? dimensions(
new DefaultDimensionSpec("d0", "_d0", ColumnType.LONG),
new DefaultDimensionSpec("d2", "_d1", ColumnType.STRING)
))
) : dimensions(
new DefaultDimensionSpec("d0", "_d0", ColumnType.LONG),
new DefaultDimensionSpec("d1", "_d1", ColumnType.STRING)
)
)
.setAggregatorSpecs(
aggregators(
useDefault
? new CountAggregatorFactory("a0")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
not(selector("d1", null, null))
not(selector("d2", null, null))
)
)
)
@ -17621,7 +17670,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
if (useDefault) {
rightTable = InlineDataSource.fromIterable(
ImmutableList.of(),
RowSignature.builder().add("dim2", ColumnType.STRING).build()
RowSignature.builder().add("dim2", ColumnType.STRING).add("m2", ColumnType.DOUBLE).build()
);
} else {
rightTable = new QueryDataSource(