SQL: Allow select-sort-project query shapes. (#7769)

* SQL: Allow select-sort-project query shapes.

Fixes #7768.

Design changes:

- In PartialDruidQuery, allow projection after select + sort by removing
  the SELECT_SORT query stage and instead allowing the SORT and
  SORT_PROJECT stages to apply either after aggregation or after a plain
  non-aggregating select. This is different from prior behavior, where
  SORT and SORT_PROJECT were only considered valid after aggregation
  stages. This logic change is in the "canAccept" method.
- In DruidQuery, represent either kind of sorting with a single "Sorting"
  class (instead of DefaultLimitSpec). The Sorting class is still
  convertible into a DefaultLimitSpec, but is also convertible into the
  sorting parameters accepted by a Scan query.
- In DruidQuery, represent post-select and post-sorting projections with
  a single "Projection" class. This obsoletes the SortProject and
  SelectProjection classes, and simplifies the DruidQuery by allowing us
  to move virtual-column and post-aggregator-creation logic into the
  new Projection class.
- Split "DruidQuerySignature" into RowSignature and VirtualColumnRegistry.
  This effectively means that instead of having mutable and immutable
  versions of DruidQuerySignature, we instead of RowSignature (always
  immutable) and VirtualColumnRegistry (always mutable, but sometimes
  null). This change wasn't required, but IMO it this makes the logic
  involving them easier to follow, and makes it more clear when the
  virtual column registry is active and when it's not.

Other changes:

- ConvertBoundsToSelectors now just accepts a RowSignature, but we
  use the VirtualColumnRegistry.getFullRowSignature() method to get
  a signature that includes all columns, and therefore allows us to
  simplify the logic (no need to special-case virtual columns).
- Add `__time` to the Scan column list if the query is ordering by time.

* Remove unused import.
This commit is contained in:
Gian Merlino 2019-05-30 12:56:29 -07:00 committed by GitHub
parent 245eded350
commit 8649b8ab4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1010 additions and 804 deletions

View File

@ -48,7 +48,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -71,7 +71,8 @@ public class HllSketchSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
DruidQuerySignature querySignature,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
@ -80,7 +81,6 @@ public class HllSketchSqlAggregator implements SqlAggregator
boolean finalizeAggregations
)
{
final RowSignature rowSignature = querySignature.getRowSignature();
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
// for string columns.
final RexNode columnRexNode = Expressions.fromFieldAccess(
@ -148,7 +148,7 @@ public class HllSketchSqlAggregator implements SqlAggregator
if (columnArg.isDirectColumnAccess()) {
dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else {
VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
columnArg,
sqlTypeName

View File

@ -44,7 +44,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -66,7 +66,8 @@ public class DoublesSketchSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
@ -75,7 +76,6 @@ public class DoublesSketchSqlAggregator implements SqlAggregator
final boolean finalizeAggregations
)
{
final RowSignature rowSignature = querySignature.getRowSignature();
final DruidExpression input = Expressions.toDruidExpression(
plannerContext,
rowSignature,
@ -179,7 +179,7 @@ public class DoublesSketchSqlAggregator implements SqlAggregator
k
);
} else {
VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
input,
SqlTypeName.FLOAT

View File

@ -47,7 +47,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -70,7 +70,8 @@ public class ThetaSketchSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
DruidQuerySignature querySignature,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
@ -79,7 +80,6 @@ public class ThetaSketchSqlAggregator implements SqlAggregator
boolean finalizeAggregations
)
{
final RowSignature rowSignature = querySignature.getRowSignature();
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
// for string columns.
final RexNode columnRexNode = Expressions.fromFieldAccess(
@ -136,7 +136,7 @@ public class ThetaSketchSqlAggregator implements SqlAggregator
if (columnArg.isDirectColumnAccess()) {
dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else {
VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
columnArg,
sqlTypeName

View File

@ -46,7 +46,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -68,7 +68,8 @@ public class BloomFilterSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
DruidQuerySignature querySignature,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
@ -77,7 +78,6 @@ public class BloomFilterSqlAggregator implements SqlAggregator
boolean finalizeAggregations
)
{
final RowSignature rowSignature = querySignature.getRowSignature();
final RexNode inputOperand = Expressions.fromFieldAccess(
rowSignature,
project,
@ -168,7 +168,7 @@ public class BloomFilterSqlAggregator implements SqlAggregator
input.getSimpleExtraction().getExtractionFn()
);
} else {
VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
input,
inputOperand.getType().getSqlTypeName()

View File

@ -39,7 +39,8 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.io.IOException;
@ -68,14 +69,15 @@ public class BloomFilterOperatorConversion extends DirectOperatorConversion
@Override
public DimFilter toDruidFilter(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
RowSignature rowSignature,
@Nullable VirtualColumnRegistry virtualColumnRegistry,
final RexNode rexNode
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final DruidExpression druidExpression = Expressions.toDruidExpression(
plannerContext,
querySignature.getRowSignature(),
rowSignature,
operands.get(0)
);
if (druidExpression == null) {
@ -100,8 +102,8 @@ public class BloomFilterOperatorConversion extends DirectOperatorConversion
holder,
druidExpression.getSimpleExtraction().getExtractionFn()
);
} else {
VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
} else if (virtualColumnRegistry != null) {
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
druidExpression,
operands.get(0).getType().getSqlTypeName()
@ -114,6 +116,8 @@ public class BloomFilterOperatorConversion extends DirectOperatorConversion
holder,
null
);
} else {
return null;
}
}
}

View File

@ -44,7 +44,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -66,7 +66,8 @@ public class FixedBucketsHistogramQuantileSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
DruidQuerySignature querySignature,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
@ -75,7 +76,6 @@ public class FixedBucketsHistogramQuantileSqlAggregator implements SqlAggregator
boolean finalizeAggregations
)
{
final RowSignature rowSignature = querySignature.getRowSignature();
final DruidExpression input = Expressions.toDruidExpression(
plannerContext,
rowSignature,
@ -234,7 +234,7 @@ public class FixedBucketsHistogramQuantileSqlAggregator implements SqlAggregator
outlierHandlingMode
);
} else {
VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
input,
SqlTypeName.FLOAT

View File

@ -46,7 +46,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -68,7 +68,8 @@ public class QuantileSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
DruidQuerySignature querySignature,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
@ -77,7 +78,6 @@ public class QuantileSqlAggregator implements SqlAggregator
final boolean finalizeAggregations
)
{
final RowSignature rowSignature = querySignature.getRowSignature();
final DruidExpression input = Expressions.toDruidExpression(
plannerContext,
rowSignature,
@ -196,7 +196,7 @@ public class QuantileSqlAggregator implements SqlAggregator
}
} else {
final VirtualColumn virtualColumn =
querySignature.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.FLOAT);
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, input, SqlTypeName.FLOAT);
virtualColumns.add(virtualColumn);
aggregatorFactory = new ApproximateHistogramAggregatorFactory(
histogramName,

View File

@ -31,7 +31,8 @@ import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -150,7 +151,11 @@ public class Aggregation
: Iterables.getOnlyElement(aggregatorFactories).getName();
}
public Aggregation filter(final DruidQuerySignature querySignature, final DimFilter filter)
public Aggregation filter(
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final DimFilter filter
)
{
if (filter == null) {
return this;
@ -173,13 +178,13 @@ public class Aggregation
}
final DimFilter baseOptimizedFilter = Filtration.create(filter)
.optimizeFilterOnly(querySignature)
.optimizeFilterOnly(virtualColumnRegistry.getFullRowSignature())
.getDimFilter();
Set<VirtualColumn> aggVirtualColumnsPlusFilterColumns = new HashSet<>(virtualColumns);
for (String column : baseOptimizedFilter.getRequiredColumns()) {
if (querySignature.isVirtualColumnDefined(column)) {
aggVirtualColumnsPlusFilterColumns.add(querySignature.getVirtualColumn(column));
if (virtualColumnRegistry.isVirtualColumnDefined(column)) {
aggVirtualColumnsPlusFilterColumns.add(virtualColumnRegistry.getVirtualColumn(column));
}
}
final List<AggregatorFactory> newAggregators = new ArrayList<>();
@ -187,15 +192,15 @@ public class Aggregation
if (agg instanceof FilteredAggregatorFactory) {
final FilteredAggregatorFactory filteredAgg = (FilteredAggregatorFactory) agg;
for (String column : filteredAgg.getFilter().getRequiredColumns()) {
if (querySignature.isVirtualColumnDefined(column)) {
aggVirtualColumnsPlusFilterColumns.add(querySignature.getVirtualColumn(column));
if (virtualColumnRegistry.isVirtualColumnDefined(column)) {
aggVirtualColumnsPlusFilterColumns.add(virtualColumnRegistry.getVirtualColumn(column));
}
}
newAggregators.add(
new FilteredAggregatorFactory(
filteredAgg.getAggregator(),
Filtration.create(new AndDimFilter(ImmutableList.of(filteredAgg.getFilter(), baseOptimizedFilter)))
.optimizeFilterOnly(querySignature)
.optimizeFilterOnly(virtualColumnRegistry.getFullRowSignature())
.getDimFilter()
)
);

View File

@ -24,7 +24,8 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
@ -45,26 +46,29 @@ public interface SqlAggregator
* Returns a Druid Aggregation corresponding to a SQL {@link AggregateCall}. This method should ignore filters;
* they will be applied to your aggregator in a later step.
*
* @param plannerContext SQL planner context
* @param querySignature signature of the rows row signature and re-usable virtual column references
* @param rexBuilder a rexBuilder, in case you need one
* @param name desired output name of the aggregation
* @param aggregateCall aggregate call object
* @param project project that should be applied before aggregation; may be null
* @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
* ignored if you do not want to re-use existing aggregations.
* @param finalizeAggregations true if this query should include explicit finalization for all of its
* aggregators, where required. This is set for subqueries where Druid's native query
* layer does not do this automatically.
* @param plannerContext SQL planner context
* @param rowSignature input row signature
* @param virtualColumnRegistry re-usable virtual column references
* @param rexBuilder a rexBuilder, in case you need one
* @param name desired output name of the aggregation
* @param aggregateCall aggregate call object
* @param project project that should be applied before aggregation; may be null
* @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
* ignored if you do not want to re-use existing aggregations.
* @param finalizeAggregations true if this query should include explicit finalization for all of its
* aggregators, where required. This is set for subqueries where Druid's native query
* layer does not do this automatically.
*
* @return aggregation, or null if the call cannot be translated
*/
@Nullable
Aggregation toDruidAggregation(
PlannerContext plannerContext,
DruidQuerySignature querySignature,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
RexBuilder rexBuilder,
String name, AggregateCall aggregateCall,
String name,
AggregateCall aggregateCall,
Project project,
List<Aggregation> existingAggregations,
boolean finalizeAggregations

View File

@ -47,7 +47,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -70,7 +70,8 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
DruidQuerySignature querySignature,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
@ -79,7 +80,6 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
final boolean finalizeAggregations
)
{
final RowSignature rowSignature = querySignature.getRowSignature();
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
// for string columns.
final RexNode rexNode = Expressions.fromFieldAccess(
@ -112,7 +112,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else {
VirtualColumn virtualColumn =
querySignature.getOrCreateVirtualColumnForExpression(plannerContext, arg, sqlTypeName);
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, arg, sqlTypeName);
dimensionSpec = new DefaultDimensionSpec(virtualColumn.getOutputName(), null, inputType);
myvirtualColumns.add(virtualColumn);
}

View File

@ -36,7 +36,8 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
@ -55,7 +56,8 @@ public class CountSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
@ -66,7 +68,7 @@ public class CountSqlAggregator implements SqlAggregator
{
final List<DruidExpression> args = Aggregations.getArgumentsForSimpleAggregator(
plannerContext,
querySignature.getRowSignature(),
rowSignature,
aggregateCall,
project
);
@ -83,7 +85,8 @@ public class CountSqlAggregator implements SqlAggregator
if (plannerContext.getPlannerConfig().isUseApproximateCountDistinct()) {
return APPROX_COUNT_DISTINCT.toDruidAggregation(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
rexBuilder,
name, aggregateCall, project, existingAggregations,
finalizeAggregations
@ -96,7 +99,7 @@ public class CountSqlAggregator implements SqlAggregator
// COUNT(x) should count all non-null values of x.
final RexNode rexNode = Expressions.fromFieldAccess(
querySignature.getRowSignature(),
rowSignature,
project,
Iterables.getOnlyElement(aggregateCall.getArgList())
);
@ -104,7 +107,8 @@ public class CountSqlAggregator implements SqlAggregator
if (rexNode.getType().isNullable()) {
final DimFilter nonNullFilter = Expressions.toFilter(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, ImmutableList.of(rexNode))
);
@ -113,7 +117,8 @@ public class CountSqlAggregator implements SqlAggregator
throw new ISE("Could not create not-null filter for rexNode[%s]", rexNode);
}
return Aggregation.create(new CountAggregatorFactory(name)).filter(querySignature, nonNullFilter);
return Aggregation.create(new CountAggregatorFactory(name))
.filter(rowSignature, virtualColumnRegistry, nonNullFilter);
} else {
return Aggregation.create(new CountAggregatorFactory(name));
}

View File

@ -29,7 +29,7 @@ import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -44,7 +44,8 @@ public abstract class SimpleSqlAggregator implements SqlAggregator
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
@ -56,7 +57,6 @@ public abstract class SimpleSqlAggregator implements SqlAggregator
if (aggregateCall.isDistinct()) {
return null;
}
final RowSignature rowSignature = querySignature.getRowSignature();
final List<DruidExpression> arguments = Aggregations.getArgumentsForSimpleAggregator(
plannerContext,

View File

@ -58,7 +58,7 @@ import org.apache.druid.sql.calcite.filtration.Bounds;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import org.joda.time.Interval;
@ -208,14 +208,16 @@ public class Expressions
/**
* Translates "condition" to a Druid filter, or returns null if we cannot translate the condition.
*
* @param plannerContext planner context
* @param querySignature row signature of the dataSource to be filtered
* @param expression Calcite row expression
* @param plannerContext planner context
* @param rowSignature input row signature
* @param virtualColumnRegistry re-usable virtual column references, may be null if virtual columns aren't allowed
* @param expression Calcite row expression
*/
@Nullable
public static DimFilter toFilter(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
final RowSignature rowSignature,
@Nullable final VirtualColumnRegistry virtualColumnRegistry,
final RexNode expression
)
{
@ -224,20 +226,27 @@ public class Expressions
if (kind == SqlKind.IS_TRUE || kind == SqlKind.IS_NOT_FALSE) {
return toFilter(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
Iterables.getOnlyElement(((RexCall) expression).getOperands())
);
} else if (kind == SqlKind.IS_FALSE || kind == SqlKind.IS_NOT_TRUE) {
return new NotDimFilter(
toFilter(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
Iterables.getOnlyElement(((RexCall) expression).getOperands())
)
);
} else if (kind == SqlKind.CAST && expression.getType().getSqlTypeName() == SqlTypeName.BOOLEAN) {
// Calcite sometimes leaves errant, useless cast-to-booleans inside filters. Strip them and continue.
return toFilter(plannerContext, querySignature, Iterables.getOnlyElement(((RexCall) expression).getOperands()));
return toFilter(
plannerContext,
rowSignature,
virtualColumnRegistry,
Iterables.getOnlyElement(((RexCall) expression).getOperands())
);
} else if (kind == SqlKind.AND
|| kind == SqlKind.OR
|| kind == SqlKind.NOT) {
@ -245,7 +254,8 @@ public class Expressions
for (final RexNode rexNode : ((RexCall) expression).getOperands()) {
final DimFilter nextFilter = toFilter(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
rexNode
);
if (nextFilter == null) {
@ -264,7 +274,7 @@ public class Expressions
}
} else {
// Handle filter conditions on everything else.
return toLeafFilter(plannerContext, querySignature, expression);
return toLeafFilter(plannerContext, rowSignature, virtualColumnRegistry, expression);
}
}
@ -272,14 +282,16 @@ public class Expressions
* Translates "condition" to a Druid filter, assuming it does not contain any boolean expressions. Returns null
* if we cannot translate the condition.
*
* @param plannerContext planner context
* @param querySignature row signature of the dataSource to be filtered
* @param rexNode Calcite row expression
* @param plannerContext planner context
* @param rowSignature input row signature
* @param virtualColumnRegistry re-usable virtual column references, may be null if virtual columns aren't allowed
* @param rexNode Calcite row expression
*/
@Nullable
private static DimFilter toLeafFilter(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
final RowSignature rowSignature,
@Nullable final VirtualColumnRegistry virtualColumnRegistry,
final RexNode rexNode
)
{
@ -291,21 +303,29 @@ public class Expressions
final DimFilter simpleFilter = toSimpleLeafFilter(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
rexNode
);
return simpleFilter != null
? simpleFilter
: toExpressionLeafFilter(plannerContext, querySignature.getRowSignature(), rexNode);
: toExpressionLeafFilter(plannerContext, rowSignature, rexNode);
}
/**
* Translates to a simple leaf filter, i.e. is not an expression filter.
* Translates to a simple leaf filter, i.e. not an "expression" type filter. Note that the filter may still
* reference expression virtual columns, if and only if "virtualColumnRegistry" is defined.
*
* @param plannerContext planner context
* @param rowSignature input row signature
* @param virtualColumnRegistry re-usable virtual column references, may be null if virtual columns aren't allowed
* @param rexNode Calcite row expression
*/
@Nullable
private static DimFilter toSimpleLeafFilter(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
final RowSignature rowSignature,
@Nullable final VirtualColumnRegistry virtualColumnRegistry,
final RexNode rexNode
)
{
@ -314,24 +334,23 @@ public class Expressions
if (kind == SqlKind.IS_TRUE || kind == SqlKind.IS_NOT_FALSE) {
return toSimpleLeafFilter(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
Iterables.getOnlyElement(((RexCall) rexNode).getOperands())
);
} else if (kind == SqlKind.IS_FALSE || kind == SqlKind.IS_NOT_TRUE) {
return new NotDimFilter(
toSimpleLeafFilter(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
Iterables.getOnlyElement(((RexCall) rexNode).getOperands())
)
);
} else if (kind == SqlKind.IS_NULL || kind == SqlKind.IS_NOT_NULL) {
final RexNode operand = Iterables.getOnlyElement(((RexCall) rexNode).getOperands());
// operand must be translatable to a SimpleExtraction to be simple-filterable
final DruidExpression druidExpression =
toDruidExpression(plannerContext, querySignature.getRowSignature(), operand);
final DruidExpression druidExpression = toDruidExpression(plannerContext, rowSignature, operand);
if (druidExpression == null) {
return null;
}
@ -343,20 +362,20 @@ public class Expressions
NullHandling.defaultStringValue(),
druidExpression.getSimpleExtraction().getExtractionFn()
);
} else {
final VirtualColumn virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
} else if (virtualColumnRegistry != null) {
final VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
druidExpression,
operand.getType().getSqlTypeName()
);
if (virtualColumn == null) {
return null;
}
equalFilter = new SelectorDimFilter(
virtualColumn.getOutputName(),
NullHandling.defaultStringValue(),
null
);
} else {
return null;
}
return kind == SqlKind.IS_NOT_NULL ? new NotDimFilter(equalFilter) : equalFilter;
@ -414,7 +433,7 @@ public class Expressions
}
// Translate lhs to a DruidExpression.
final DruidExpression lhsExpression = toDruidExpression(plannerContext, querySignature.getRowSignature(), lhs);
final DruidExpression lhsExpression = toDruidExpression(plannerContext, rowSignature, lhs);
if (lhsExpression == null) {
return null;
}
@ -432,17 +451,17 @@ public class Expressions
if (lhsExpression.isSimpleExtraction()) {
column = lhsExpression.getSimpleExtraction().getColumn();
extractionFn = lhsExpression.getSimpleExtraction().getExtractionFn();
} else {
VirtualColumn virtualLhs = querySignature.getOrCreateVirtualColumnForExpression(
} else if (virtualColumnRegistry != null) {
VirtualColumn virtualLhs = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
lhsExpression,
lhs.getType().getSqlTypeName()
);
if (virtualLhs == null) {
return null;
}
column = virtualLhs.getOutputName();
extractionFn = null;
} else {
return null;
}
if (column.equals(ColumnHolder.TIME_COLUMN_NAME) && extractionFn instanceof TimeFormatExtractionFn) {
@ -515,22 +534,16 @@ public class Expressions
return filter;
} else if (rexNode instanceof RexCall) {
final SqlOperator operator = ((RexCall) rexNode).getOperator();
final SqlOperatorConversion conversion =
plannerContext.getOperatorTable().lookupOperatorConversion(operator);
final SqlOperatorConversion conversion = plannerContext.getOperatorTable().lookupOperatorConversion(operator);
if (conversion == null) {
return null;
} else {
DimFilter filter =
conversion.toDruidFilter(plannerContext, querySignature, rexNode);
if (filter != null) {
return filter;
}
return null;
return conversion.toDruidFilter(plannerContext, rowSignature, virtualColumnRegistry, rexNode);
}
} else {
return null;
}
return null;
}
/**

View File

@ -23,7 +23,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
@ -54,16 +54,18 @@ public interface SqlOperatorConversion
/**
* Returns a Druid filter corresponding to a Calcite {@code RexNode} used as a filter condition.
*
* @param plannerContext SQL planner context
* @param querySignature signature of the rows being filtered, and any expression column references
* @param rexNode filter expression rex node
* @param plannerContext SQL planner context
* @param rowSignature input row signature
* @param virtualColumnRegistry re-usable virtual column references
* @param rexNode filter expression rex node
*
* @return filter, or null if the call cannot be translated
* @return filter, or null if the call cannot be translated to a filter
*/
@Nullable
default DimFilter toDruidFilter(
PlannerContext plannerContext,
DruidQuerySignature querySignature,
RowSignature rowSignature,
@Nullable VirtualColumnRegistry virtualColumnRegistry,
RexNode rexNode
)
{

View File

@ -31,7 +31,8 @@ import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
@ -55,14 +56,15 @@ public class LikeOperatorConversion extends DirectOperatorConversion
@Override
public DimFilter toDruidFilter(
PlannerContext plannerContext,
DruidQuerySignature querySignature,
RowSignature rowSignature,
@Nullable VirtualColumnRegistry virtualColumnRegistry,
RexNode rexNode
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final DruidExpression druidExpression = Expressions.toDruidExpression(
plannerContext,
querySignature.getRowSignature(),
rowSignature,
operands.get(0)
);
if (druidExpression == null) {
@ -76,21 +78,21 @@ public class LikeOperatorConversion extends DirectOperatorConversion
operands.size() > 2 ? RexLiteral.stringValue(operands.get(2)) : null,
druidExpression.getSimpleExtraction().getExtractionFn()
);
} else {
VirtualColumn v = querySignature.getOrCreateVirtualColumnForExpression(
} else if (virtualColumnRegistry != null) {
VirtualColumn v = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
druidExpression,
operands.get(0).getType().getSqlTypeName()
);
if (v == null) {
return null;
}
return new LikeDimFilter(
v.getOutputName(),
RexLiteral.stringValue(operands.get(1)),
operands.size() > 2 ? RexLiteral.stringValue(operands.get(2)) : null,
null
);
} else {
return null;
}
}
}

View File

@ -24,20 +24,20 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.sql.calcite.expression.SimpleExtraction;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.table.RowSignature;
public class ConvertBoundsToSelectors extends BottomUpTransform
{
private final DruidQuerySignature querySignature;
private final RowSignature rowSignature;
private ConvertBoundsToSelectors(final DruidQuerySignature querySignature)
private ConvertBoundsToSelectors(final RowSignature rowSignature)
{
this.querySignature = querySignature;
this.rowSignature = rowSignature;
}
public static ConvertBoundsToSelectors create(final DruidQuerySignature querySignature)
public static ConvertBoundsToSelectors create(final RowSignature rowSignature)
{
return new ConvertBoundsToSelectors(querySignature);
return new ConvertBoundsToSelectors(rowSignature);
}
@Override
@ -45,7 +45,7 @@ public class ConvertBoundsToSelectors extends BottomUpTransform
{
if (filter instanceof BoundDimFilter) {
final BoundDimFilter bound = (BoundDimFilter) filter;
final StringComparator comparator = querySignature.getRowSignature().naturalStringComparator(
final StringComparator comparator = rowSignature.naturalStringComparator(
SimpleExtraction.of(bound.getDimension(), bound.getExtractionFn())
);
@ -54,7 +54,7 @@ public class ConvertBoundsToSelectors extends BottomUpTransform
&& bound.getUpper().equals(bound.getLower())
&& !bound.isUpperStrict()
&& !bound.isLowerStrict()
&& (querySignature.isVirtualColumnDefined(bound.getDimension()) || bound.getOrdering().equals(comparator))) {
&& bound.getOrdering().equals(comparator)) {
return new SelectorDimFilter(
bound.getDimension(),
bound.getUpper(),

View File

@ -28,7 +28,7 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.table.RowSignature;
import org.joda.time.Interval;
import java.util.List;
@ -108,15 +108,15 @@ public class Filtration
*
* @return equivalent Filtration
*/
public Filtration optimize(final DruidQuerySignature querySignature)
public Filtration optimize(final RowSignature rowSignature)
{
return transform(
this,
ImmutableList.of(
CombineAndSimplifyBounds.instance(),
MoveTimeFiltersToIntervals.instance(),
ConvertBoundsToSelectors.create(querySignature),
ConvertSelectorsToIns.create(querySignature.getRowSignature()),
ConvertBoundsToSelectors.create(rowSignature),
ConvertSelectorsToIns.create(rowSignature),
MoveMarkerFiltersToIntervals.instance(),
ValidateNoMarkerFiltersRemain.instance()
)
@ -128,7 +128,7 @@ public class Filtration
*
* @return equivalent Filtration
*/
public Filtration optimizeFilterOnly(final DruidQuerySignature querySignature)
public Filtration optimizeFilterOnly(final RowSignature rowSignature)
{
if (!intervals.equals(ImmutableList.of(eternity()))) {
throw new ISE("Cannot optimizeFilterOnly when intervals are set");
@ -138,8 +138,8 @@ public class Filtration
this,
ImmutableList.of(
CombineAndSimplifyBounds.instance(),
ConvertBoundsToSelectors.create(querySignature),
ConvertSelectorsToIns.create(querySignature.getRowSignature())
ConvertBoundsToSelectors.create(rowSignature),
ConvertSelectorsToIns.create(rowSignature)
)
);

View File

@ -21,11 +21,11 @@ package org.apache.druid.sql.calcite.rel;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.Aggregate;
@ -44,18 +44,17 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.math.expr.Parser;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
@ -105,25 +104,19 @@ public class DruidQuery
private final DimFilter filter;
@Nullable
private final SelectProjection selectProjection;
private final Projection selectProjection;
@Nullable
private final Grouping grouping;
@Nullable
private final SortProject sortProject;
@Nullable
private final DefaultLimitSpec limitSpec;
@Nullable
private final RelDataType outputRowType;
private final Sorting sorting;
private final Query query;
private final DruidQuerySignature sourceQuerySignature;
private final RowSignature sourceRowSignature;
private final RowSignature outputRowSignature;
private final RelDataType outputRowType;
private final VirtualColumnRegistry virtualColumnRegistry;
public DruidQuery(
final PartialDruidQuery partialQuery,
@ -136,58 +129,88 @@ public class DruidQuery
{
this.dataSource = dataSource;
this.outputRowType = partialQuery.leafRel().getRowType();
this.sourceQuerySignature = new DruidQuerySignature(sourceRowSignature);
this.sourceRowSignature = sourceRowSignature;
this.virtualColumnRegistry = VirtualColumnRegistry.create(sourceRowSignature);
this.plannerContext = plannerContext;
// Now the fun begins.
this.filter = computeWhereFilter(partialQuery, plannerContext, sourceQuerySignature);
this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceQuerySignature);
this.grouping = computeGrouping(
partialQuery,
plannerContext,
sourceQuerySignature,
rexBuilder,
finalizeAggregations
);
final RowSignature sortingInputRowSignature;
if (this.selectProjection != null) {
sortingInputRowSignature = this.selectProjection.getOutputRowSignature();
} else if (this.grouping != null) {
sortingInputRowSignature = this.grouping.getOutputRowSignature();
if (partialQuery.getWhereFilter() != null) {
this.filter = Preconditions.checkNotNull(
computeWhereFilter(
partialQuery,
plannerContext,
sourceRowSignature,
virtualColumnRegistry
)
);
} else {
sortingInputRowSignature = sourceRowSignature;
this.filter = null;
}
this.sortProject = computeSortProject(partialQuery, plannerContext, sortingInputRowSignature);
// Only compute "selectProjection" if this is a non-aggregating query. (For aggregating queries, "grouping" will
// reflect select-project from partialQuery on its own.)
if (partialQuery.getSelectProject() != null && partialQuery.getAggregate() == null) {
this.selectProjection = Preconditions.checkNotNull(
computeSelectProjection(
partialQuery,
plannerContext,
computeOutputRowSignature(),
virtualColumnRegistry
)
);
} else {
this.selectProjection = null;
}
this.outputRowSignature = sortProject == null ? sortingInputRowSignature : sortProject.getOutputRowSignature();
if (partialQuery.getAggregate() != null) {
this.grouping = Preconditions.checkNotNull(
computeGrouping(
partialQuery,
plannerContext,
computeOutputRowSignature(),
virtualColumnRegistry,
rexBuilder,
finalizeAggregations
)
);
} else {
this.grouping = null;
}
this.limitSpec = computeLimitSpec(partialQuery, sortingInputRowSignature);
if (partialQuery.getSort() != null) {
this.sorting = Preconditions.checkNotNull(
computeSorting(
partialQuery,
plannerContext,
computeOutputRowSignature(),
// When sorting follows grouping, virtual columns cannot be used
partialQuery.getAggregate() != null ? null : virtualColumnRegistry
)
);
} else {
this.sorting = null;
}
this.outputRowSignature = computeOutputRowSignature();
this.query = computeQuery();
}
@Nullable
@Nonnull
private static DimFilter computeWhereFilter(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final DruidQuerySignature querySignature
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry
)
{
final Filter whereFilter = partialQuery.getWhereFilter();
if (whereFilter == null) {
return null;
}
return getDimFilter(plannerContext, querySignature, whereFilter);
return getDimFilter(plannerContext, rowSignature, virtualColumnRegistry, partialQuery.getWhereFilter());
}
@Nullable
private static DimFilter computeHavingFilter(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final DruidQuerySignature querySignature
final RowSignature aggregateSignature
)
{
final Filter havingFilter = partialQuery.getHavingFilter();
@ -196,20 +219,23 @@ public class DruidQuery
return null;
}
return getDimFilter(plannerContext, querySignature, havingFilter);
// null virtualColumnRegistry, since virtual columns cannot be referenced by "having" filters.
return getDimFilter(plannerContext, aggregateSignature, null, havingFilter);
}
@Nonnull
private static DimFilter getDimFilter(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
Filter filter
final RowSignature rowSignature,
@Nullable final VirtualColumnRegistry virtualColumnRegistry,
final Filter filter
)
{
final RexNode condition = filter.getCondition();
final DimFilter dimFilter = Expressions.toFilter(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
condition
);
if (dimFilter == null) {
@ -219,83 +245,48 @@ public class DruidQuery
}
}
@Nullable
private static SelectProjection computeSelectProjection(
@Nonnull
private static Projection computeSelectProjection(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final DruidQuerySignature queryColumns
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry
)
{
final Project project = partialQuery.getSelectProject();
final Project project = Preconditions.checkNotNull(partialQuery.getSelectProject(), "selectProject");
if (project == null || partialQuery.getAggregate() != null) {
return null;
if (partialQuery.getAggregate() != null) {
throw new ISE("Cannot have both 'selectProject' and 'aggregate', how can this be?");
} else {
return Projection.preAggregation(project, plannerContext, rowSignature, virtualColumnRegistry);
}
final List<DruidExpression> expressions = new ArrayList<>();
for (final RexNode rexNode : project.getChildExps()) {
final DruidExpression expression = Expressions.toDruidExpression(
plannerContext,
queryColumns.getRowSignature(),
rexNode
);
if (expression == null) {
throw new CannotBuildQueryException(project, rexNode);
} else {
expressions.add(expression);
}
}
final List<String> directColumns = new ArrayList<>();
final Set<VirtualColumn> virtualColumns = new HashSet<>();
final List<String> rowOrder = new ArrayList<>();
for (int i = 0; i < expressions.size(); i++) {
final DruidExpression expression = expressions.get(i);
if (expression.isDirectColumnAccess()) {
directColumns.add(expression.getDirectColumn());
rowOrder.add(expression.getDirectColumn());
} else {
VirtualColumn virtualColumn = queryColumns.getOrCreateVirtualColumnForExpression(
plannerContext,
expression,
project.getChildExps().get(i).getType().getSqlTypeName()
);
virtualColumns.add(virtualColumn);
rowOrder.add(virtualColumn.getOutputName());
}
}
return new SelectProjection(
directColumns,
ImmutableList.copyOf(virtualColumns),
RowSignature.from(rowOrder, project.getRowType())
);
}
@Nullable
@Nonnull
private static Grouping computeGrouping(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final DruidQuerySignature queryColumns,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final boolean finalizeAggregations
)
{
final Aggregate aggregate = partialQuery.getAggregate();
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate(), "aggregate");
final Project aggregateProject = partialQuery.getAggregateProject();
if (aggregate == null) {
return null;
}
final List<DimensionExpression> dimensions = computeDimensions(
partialQuery,
plannerContext,
rowSignature,
virtualColumnRegistry
);
final List<DimensionExpression> dimensions = computeDimensions(partialQuery, plannerContext, queryColumns);
final List<Aggregation> aggregations = computeAggregations(
partialQuery,
plannerContext,
queryColumns,
rowSignature,
virtualColumnRegistry,
rexBuilder,
finalizeAggregations
);
@ -310,23 +301,23 @@ public class DruidQuery
aggregate.getRowType()
);
DruidQuerySignature aggregateSignature = queryColumns.asAggregateSignature(aggregateRowSignature);
final DimFilter havingFilter = computeHavingFilter(
partialQuery,
plannerContext,
aggregateSignature
aggregateRowSignature
);
if (aggregateProject == null) {
return Grouping.create(dimensions, aggregations, havingFilter, aggregateRowSignature);
} else {
final ProjectRowOrderAndPostAggregations projectRowOrderAndPostAggregations = computePostAggregations(
final Projection postAggregationProjection = Projection.postAggregation(
aggregateProject,
plannerContext,
aggregateRowSignature,
aggregateProject,
"p"
);
projectRowOrderAndPostAggregations.postAggregations.forEach(
postAggregationProjection.getPostAggregators().forEach(
postAggregator -> aggregations.add(Aggregation.create(postAggregator))
);
@ -342,106 +333,17 @@ public class DruidQuery
}
}
return Grouping.create(
dimensions,
aggregations,
havingFilter,
RowSignature.from(projectRowOrderAndPostAggregations.rowOrder, aggregateProject.getRowType())
);
return Grouping.create(dimensions, aggregations, havingFilter, postAggregationProjection.getOutputRowSignature());
}
}
@Nullable
private SortProject computeSortProject(
PartialDruidQuery partialQuery,
PlannerContext plannerContext,
RowSignature sortingInputRowSignature
)
{
final Project sortProject = partialQuery.getSortProject();
if (sortProject == null) {
return null;
} else {
final ProjectRowOrderAndPostAggregations projectRowOrderAndPostAggregations = computePostAggregations(
plannerContext,
sortingInputRowSignature,
sortProject,
"s"
);
return new SortProject(
sortingInputRowSignature,
projectRowOrderAndPostAggregations.postAggregations,
RowSignature.from(projectRowOrderAndPostAggregations.rowOrder, sortProject.getRowType())
);
}
}
private static class ProjectRowOrderAndPostAggregations
{
private final List<String> rowOrder;
private final List<PostAggregator> postAggregations;
ProjectRowOrderAndPostAggregations(List<String> rowOrder, List<PostAggregator> postAggregations)
{
this.rowOrder = rowOrder;
this.postAggregations = postAggregations;
}
}
private static ProjectRowOrderAndPostAggregations computePostAggregations(
PlannerContext plannerContext,
RowSignature inputRowSignature,
Project project,
String basePrefix
)
{
final List<String> rowOrder = new ArrayList<>();
final List<PostAggregator> aggregations = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix(
basePrefix,
new TreeSet<>(inputRowSignature.getRowOrder())
);
int outputNameCounter = 0;
for (final RexNode postAggregatorRexNode : project.getChildExps()) {
// Attempt to convert to PostAggregator.
final DruidExpression postAggregatorExpression = Expressions.toDruidExpression(
plannerContext,
inputRowSignature,
postAggregatorRexNode
);
if (postAggregatorExpression == null) {
throw new CannotBuildQueryException(project, postAggregatorRexNode);
}
if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
// Direct column access, without any type cast as far as Druid's runtime is concerned.
// (There might be a SQL-level type cast that we don't care about)
rowOrder.add(postAggregatorExpression.getDirectColumn());
} else {
final String postAggregatorName = outputNamePrefix + outputNameCounter++;
final PostAggregator postAggregator = new ExpressionPostAggregator(
postAggregatorName,
postAggregatorExpression.getExpression(),
null,
plannerContext.getExprMacroTable()
);
aggregations.add(postAggregator);
rowOrder.add(postAggregator.getName());
}
}
return new ProjectRowOrderAndPostAggregations(rowOrder, aggregations);
}
/**
* Returns dimensions corresponding to {@code aggregate.getGroupSet()}, in the same order.
*
* @param partialQuery partial query
* @param plannerContext planner context
* @param querySignature source row signature and re-usable virtual column references
* @param partialQuery partial query
* @param plannerContext planner context
* @param rowSignature source row signature
* @param virtualColumnRegistry re-usable virtual column references
*
* @return dimensions
*
@ -450,29 +352,23 @@ public class DruidQuery
private static List<DimensionExpression> computeDimensions(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final DruidQuerySignature querySignature
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry
)
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<DimensionExpression> dimensions = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix(
"d",
new TreeSet<>(querySignature.getRowSignature().getRowOrder())
);
final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(rowSignature.getRowOrder()));
int outputNameCounter = 0;
for (int i : aggregate.getGroupSet()) {
// Dimension might need to create virtual columns. Avoid giving it a name that would lead to colliding columns.
final RexNode rexNode = Expressions.fromFieldAccess(
querySignature.getRowSignature(),
rowSignature,
partialQuery.getSelectProject(),
i
);
final DruidExpression druidExpression = Expressions.toDruidExpression(
plannerContext,
querySignature.getRowSignature(),
rexNode
);
final DruidExpression druidExpression = Expressions.toDruidExpression(plannerContext, rowSignature, rexNode);
if (druidExpression == null) {
throw new CannotBuildQueryException(aggregate, rexNode);
}
@ -488,7 +384,7 @@ public class DruidQuery
final String dimOutputName;
if (!druidExpression.isSimpleExtraction()) {
virtualColumn = querySignature.getOrCreateVirtualColumnForExpression(
virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
druidExpression,
sqlTypeName
@ -507,13 +403,14 @@ public class DruidQuery
/**
* Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order.
*
* @param partialQuery partial query
* @param plannerContext planner context
* @param querySignature source row signature and re-usable virtual column references
* @param rexBuilder calcite RexBuilder
* @param finalizeAggregations true if this query should include explicit finalization for all of its
* aggregators, where required. Useful for subqueries where Druid's native query layer
* does not do this automatically.
* @param partialQuery partial query
* @param plannerContext planner context
* @param rowSignature source row signature
* @param virtualColumnRegistry re-usable virtual column references
* @param rexBuilder calcite RexBuilder
* @param finalizeAggregations true if this query should include explicit finalization for all of its
* aggregators, where required. Useful for subqueries where Druid's native query layer
* does not do this automatically.
*
* @return aggregations
*
@ -522,24 +419,23 @@ public class DruidQuery
private static List<Aggregation> computeAggregations(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final boolean finalizeAggregations
)
{
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
final List<Aggregation> aggregations = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix(
"a",
new TreeSet<>(querySignature.getRowSignature().getRowOrder())
);
final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(rowSignature.getRowOrder()));
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
final String aggName = outputNamePrefix + i;
final AggregateCall aggCall = aggregate.getAggCallList().get(i);
final Aggregation aggregation = GroupByRules.translateAggregateCall(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
rexBuilder,
partialQuery.getSelectProject(),
aggregations,
@ -558,29 +454,23 @@ public class DruidQuery
return aggregations;
}
@Nullable
private static DefaultLimitSpec computeLimitSpec(
@Nonnull
private static Sorting computeSorting(
final PartialDruidQuery partialQuery,
final RowSignature outputRowSignature
final PlannerContext plannerContext,
final RowSignature rowSignature,
@Nullable final VirtualColumnRegistry virtualColumnRegistry
)
{
final Sort sort;
final Sort sort = Preconditions.checkNotNull(partialQuery.getSort(), "sort");
final Project sortProject = partialQuery.getSortProject();
if (partialQuery.getAggregate() == null) {
sort = partialQuery.getSelectSort();
} else {
sort = partialQuery.getSort();
}
if (sort == null) {
return null;
}
final Integer limit = sort.fetch != null ? RexLiteral.intValue(sort.fetch) : null;
// Extract limit.
final Long limit = sort.fetch != null ? ((Number) RexLiteral.value(sort.fetch)).longValue() : null;
final List<OrderByColumnSpec> orderBys = new ArrayList<>(sort.getChildExps().size());
if (sort.offset != null) {
// LimitSpecs don't accept offsets.
// Druid cannot currently handle LIMIT with OFFSET.
throw new CannotBuildQueryException(sort);
}
@ -610,7 +500,7 @@ public class DruidQuery
if (sortExpression.isA(SqlKind.INPUT_REF)) {
final RexInputRef ref = (RexInputRef) sortExpression;
final String fieldName = outputRowSignature.getRowOrder().get(ref.getIndex());
final String fieldName = rowSignature.getRowOrder().get(ref.getIndex());
orderBys.add(new OrderByColumnSpec(fieldName, direction, comparator));
} else {
// We don't support sorting by anything other than refs which actually appear in the query result.
@ -618,42 +508,25 @@ public class DruidQuery
}
}
return new DefaultLimitSpec(orderBys, limit);
}
// Extract any post-sort Projection.
final Projection projection;
/**
* Returns true if a post-aggregation "expression" can be realized as a direct field access. This is true if it's
* a direct column access that doesn't require an implicit cast.
*
* @param aggregateRowSignature signature of the aggregation
* @param expression post-aggregation expression
* @param rexNode RexNode for the post-aggregation expression
*
* @return yes or no
*/
private static boolean postAggregatorDirectColumnIsOk(
final RowSignature aggregateRowSignature,
final DruidExpression expression,
final RexNode rexNode
)
{
if (!expression.isDirectColumnAccess()) {
return false;
if (sortProject == null) {
projection = null;
} else if (partialQuery.getAggregate() == null) {
if (virtualColumnRegistry == null) {
throw new ISE("Must provide 'virtualColumnRegistry' for pre-aggregation Projection!");
}
projection = Projection.preAggregation(sortProject, plannerContext, rowSignature, virtualColumnRegistry);
} else {
projection = Projection.postAggregation(sortProject, plannerContext, rowSignature, "s");
}
// Check if a cast is necessary.
final ExprType toExprType = Expressions.exprTypeForValueType(
aggregateRowSignature.getColumnType(expression.getDirectColumn())
);
final ExprType fromExprType = Expressions.exprTypeForValueType(
Calcites.getValueTypeForSqlTypeName(rexNode.getType().getSqlTypeName())
);
return toExprType.equals(fromExprType);
return Sorting.create(orderBys, limit, projection);
}
public VirtualColumns getVirtualColumns(final boolean includeDimensions)
private VirtualColumns getVirtualColumns(final boolean includeDimensions)
{
// 'sourceRowSignature' could provide a list of all defined virtual columns while constructing a query, but we
// still want to collect the set of VirtualColumns this way to ensure we only add what is still being used after
@ -663,27 +536,33 @@ public class DruidQuery
// we always want to add any virtual columns used by the query level DimFilter
if (filter != null) {
for (String columnName : filter.getRequiredColumns()) {
if (sourceQuerySignature.isVirtualColumnDefined(columnName)) {
virtualColumns.add(sourceQuerySignature.getVirtualColumn(columnName));
if (virtualColumnRegistry.isVirtualColumnDefined(columnName)) {
virtualColumns.add(virtualColumnRegistry.getVirtualColumn(columnName));
}
}
}
if (selectProjection != null) {
virtualColumns.addAll(selectProjection.getVirtualColumns());
} else {
if (grouping != null) {
if (includeDimensions) {
for (DimensionExpression expression : grouping.getDimensions()) {
if (sourceQuerySignature.isVirtualColumnDefined(expression.getOutputName())) {
virtualColumns.add(sourceQuerySignature.getVirtualColumn(expression.getOutputName()));
}
}
if (grouping != null) {
if (includeDimensions) {
for (DimensionExpression expression : grouping.getDimensions()) {
if (virtualColumnRegistry.isVirtualColumnDefined(expression.getOutputName())) {
virtualColumns.add(virtualColumnRegistry.getVirtualColumn(expression.getOutputName()));
}
}
for (Aggregation aggregation : grouping.getAggregations()) {
virtualColumns.addAll(aggregation.getVirtualColumns());
}
}
for (Aggregation aggregation : grouping.getAggregations()) {
virtualColumns.addAll(aggregation.getVirtualColumns());
}
}
if (sorting != null && sorting.getProjection() != null && grouping == null) {
// Sorting without grouping means we might have some post-sort Projection virtual columns.
virtualColumns.addAll(sorting.getProjection().getVirtualColumns());
}
// sort for predictable output
@ -692,21 +571,12 @@ public class DruidQuery
return VirtualColumns.create(columns);
}
@Nullable
public Grouping getGrouping()
{
return grouping;
}
public DefaultLimitSpec getLimitSpec()
{
return limitSpec;
}
public SortProject getSortProject()
{
return sortProject;
}
public RelDataType getOutputRowType()
{
return outputRowType;
@ -722,6 +592,26 @@ public class DruidQuery
return query;
}
/**
* Return the {@link RowSignature} corresponding to the output of this query. This method may be called during
* construction, in which case it returns the output row signature at whatever phase of construction this method
* is called at. At the end of construction, the final result is assigned to {@link #outputRowSignature}.
*/
private RowSignature computeOutputRowSignature()
{
if (sorting != null && sorting.getProjection() != null) {
return sorting.getProjection().getOutputRowSignature();
} else if (grouping != null) {
// Sanity check: cannot have both "grouping" and "selectProjection".
Preconditions.checkState(selectProjection == null, "Cannot have both 'grouping' and 'selectProjection'");
return grouping.getOutputRowSignature();
} else if (selectProjection != null) {
return selectProjection.getOutputRowSignature();
} else {
return sourceRowSignature;
}
}
/**
* Return this query as some kind of Druid query. The returned query will either be {@link TopNQuery},
* {@link TimeseriesQuery}, {@link GroupByQuery}, {@link ScanQuery}, or {@link SelectQuery}.
@ -794,26 +684,24 @@ public class DruidQuery
// Timeseries only applies if the single dimension is granular __time.
return null;
}
if (limitSpec != null) {
// If there is a limit spec, set timeseriesLimit to given value if less than Integer.Max_VALUE
if (limitSpec.isLimited()) {
timeseriesLimit = limitSpec.getLimit();
if (sorting != null) {
// If there is sorting, set timeseriesLimit to given value if less than Integer.Max_VALUE
if (sorting.isLimited()) {
timeseriesLimit = Ints.checkedCast(sorting.getLimit());
}
if (limitSpec.getColumns().isEmpty()) {
descending = false;
} else {
// We're ok if the first order by is time (since every time value is distinct, the rest of the columns
// wouldn't matter anyway).
final OrderByColumnSpec firstOrderBy = limitSpec.getColumns().get(0);
if (firstOrderBy.getDimension().equals(dimensionExpression.getOutputName())) {
// Order by time.
descending = firstOrderBy.getDirection() == OrderByColumnSpec.Direction.DESCENDING;
} else {
// Order by something else.
switch (sorting.getSortKind(dimensionExpression.getOutputName())) {
case UNORDERED:
case TIME_ASCENDING:
descending = false;
break;
case TIME_DESCENDING:
descending = true;
break;
default:
// Sorting on a metric, maybe. Timeseries cannot handle.
return null;
}
}
} else {
// No limitSpec.
@ -824,11 +712,11 @@ public class DruidQuery
return null;
}
final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators());
if (sortProject != null) {
postAggregators.addAll(sortProject.getPostAggregators());
if (sorting != null && sorting.getProjection() != null) {
postAggregators.addAll(sorting.getProjection().getPostAggregators());
}
final Map<String, Object> theContext = new HashMap<>();
theContext.put("skipEmptyBuckets", true);
@ -859,9 +747,10 @@ public class DruidQuery
// Must have GROUP BY one column, ORDER BY zero or one column, limit less than maxTopNLimit, and no HAVING.
final boolean topNOk = grouping != null
&& grouping.getDimensions().size() == 1
&& limitSpec != null
&& (limitSpec.getColumns().size() <= 1
&& limitSpec.getLimit() <= plannerContext.getPlannerConfig().getMaxTopNLimit())
&& sorting != null
&& (sorting.getOrderBys().size() <= 1
&& sorting.isLimited() && sorting.getLimit() <= plannerContext.getPlannerConfig()
.getMaxTopNLimit())
&& grouping.getHavingFilter() == null;
if (!topNOk) {
@ -870,14 +759,14 @@ public class DruidQuery
final DimensionSpec dimensionSpec = Iterables.getOnlyElement(grouping.getDimensions()).toDimensionSpec();
final OrderByColumnSpec limitColumn;
if (limitSpec.getColumns().isEmpty()) {
if (sorting.getOrderBys().isEmpty()) {
limitColumn = new OrderByColumnSpec(
dimensionSpec.getOutputName(),
OrderByColumnSpec.Direction.ASCENDING,
Calcites.getStringComparatorForValueType(dimensionSpec.getOutputType())
);
} else {
limitColumn = Iterables.getOnlyElement(limitSpec.getColumns());
limitColumn = Iterables.getOnlyElement(sorting.getOrderBys());
}
final TopNMetricSpec topNMetricSpec;
@ -900,11 +789,11 @@ public class DruidQuery
return null;
}
final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators());
if (sortProject != null) {
postAggregators.addAll(sortProject.getPostAggregators());
if (sorting.getProjection() != null) {
postAggregators.addAll(sorting.getProjection().getPostAggregators());
}
return new TopNQuery(
@ -912,7 +801,7 @@ public class DruidQuery
getVirtualColumns(true),
dimensionSpec,
topNMetricSpec,
limitSpec.getLimit(),
Ints.checkedCast(sorting.getLimit()),
filtration.getQuerySegmentSpec(),
filtration.getDimFilter(),
Granularities.ALL,
@ -934,20 +823,22 @@ public class DruidQuery
return null;
}
final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
final DimFilterHavingSpec havingSpec;
if (grouping.getHavingFilter() != null) {
havingSpec = new DimFilterHavingSpec(
Filtration.create(grouping.getHavingFilter()).optimizeFilterOnly(sourceQuerySignature).getDimFilter(),
Filtration.create(grouping.getHavingFilter())
.optimizeFilterOnly(grouping.getOutputRowSignature())
.getDimFilter(),
true
);
} else {
havingSpec = null;
}
final List<PostAggregator> postAggregators = new ArrayList<>(grouping.getPostAggregators());
if (sortProject != null) {
postAggregators.addAll(sortProject.getPostAggregators());
if (sorting != null && sorting.getProjection() != null) {
postAggregators.addAll(sorting.getProjection().getPostAggregators());
}
return new GroupByQuery(
@ -960,7 +851,9 @@ public class DruidQuery
grouping.getAggregatorFactories(),
postAggregators,
havingSpec,
limitSpec,
sorting != null
? new DefaultLimitSpec(sorting.getOrderBys(), sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null)
: NoopLimitSpec.instance(),
null,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
@ -978,34 +871,44 @@ public class DruidQuery
// Scan cannot GROUP BY.
return null;
}
if (limitSpec != null &&
(limitSpec.getColumns().size() > 1
|| (limitSpec.getColumns().size() == 1 && !Iterables.getOnlyElement(limitSpec.getColumns())
.getDimension()
.equals(ColumnHolder.TIME_COLUMN_NAME)))) {
// Scan cannot ORDER BY non-time columns.
return null;
}
if (outputRowSignature.getRowOrder().isEmpty()) {
// Should never do a scan query without any columns that we're interested in. This is probably a planner bug.
throw new ISE("WTF?! Attempting to convert to Scan query without any columns?");
}
final Filtration filtration = Filtration.create(filter).optimize(sourceQuerySignature);
final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
final ScanQuery.Order order;
long scanLimit = 0L;
// DefaultLimitSpec (which we use to "remember" limits) is int typed, and Integer.MAX_VALUE means "no limit".
final long scanLimit = limitSpec == null || limitSpec.getLimit() == Integer.MAX_VALUE
? 0L
: (long) limitSpec.getLimit();
if (sorting != null) {
if (sorting.isLimited()) {
scanLimit = sorting.getLimit();
}
ScanQuery.Order order;
if (limitSpec == null || limitSpec.getColumns().size() == 0) {
order = ScanQuery.Order.NONE;
} else if (limitSpec.getColumns().get(0).getDirection() == OrderByColumnSpec.Direction.ASCENDING) {
order = ScanQuery.Order.ASCENDING;
final Sorting.SortKind sortKind = sorting.getSortKind(ColumnHolder.TIME_COLUMN_NAME);
if (sortKind == Sorting.SortKind.UNORDERED) {
order = ScanQuery.Order.NONE;
} else if (sortKind == Sorting.SortKind.TIME_ASCENDING) {
order = ScanQuery.Order.ASCENDING;
} else if (sortKind == Sorting.SortKind.TIME_DESCENDING) {
order = ScanQuery.Order.DESCENDING;
} else {
assert sortKind == Sorting.SortKind.NON_TIME;
// Scan cannot ORDER BY non-time columns.
return null;
}
} else {
order = ScanQuery.Order.DESCENDING;
order = ScanQuery.Order.NONE;
}
// Compute the list of columns to select.
final Set<String> columns = new HashSet<>(outputRowSignature.getRowOrder());
if (order != ScanQuery.Order.NONE) {
columns.add(ColumnHolder.TIME_COLUMN_NAME);
}
return new ScanQuery(
@ -1015,9 +918,9 @@ public class DruidQuery
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST,
0,
scanLimit,
order, // Will default to "none"
order,
filtration.getDimFilter(),
Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())),
Ordering.natural().sortedCopy(columns),
false,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);

View File

@ -344,8 +344,7 @@ public class DruidSemiJoin extends DruidRel<DruidSemiJoin>
PartialDruidQuery newPartialQuery = PartialDruidQuery.create(leftPartialQuery.getScan())
.withWhereFilter(newWhereFilter)
.withSelectProject(leftPartialQuery.getSelectProject())
.withSelectSort(leftPartialQuery.getSelectSort());
.withSelectProject(leftPartialQuery.getSelectProject());
if (leftPartialQuery.getAggregate() != null) {
newPartialQuery = newPartialQuery.withAggregate(leftPartialQuery.getAggregate());

View File

@ -36,6 +36,15 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Represents Druid's concept of a "group by": dimensions, aggregations, post-aggregations, and 'having' filters. This
* is always something that can be handled by a groupBy query, and in some cases, it may be handleable by a timeseries
* or topN query type as well.
*
* This corresponds to a Calcite Aggregate + optional Filter + optional Project.
*
* It does not include sorting, limiting, or post-sorting projections: for this, see the {@link Sorting} class.
*/
public class Grouping
{
private final List<DimensionExpression> dimensions;

View File

@ -51,7 +51,6 @@ public class PartialDruidQuery
private final RelNode scan;
private final Filter whereFilter;
private final Project selectProject;
private final Sort selectSort;
private final Aggregate aggregate;
private final Filter havingFilter;
private final Project aggregateProject;
@ -60,13 +59,19 @@ public class PartialDruidQuery
public enum Stage
{
// SCAN must be present on all queries.
SCAN,
// WHERE_FILTER, SELECT_PROJECT may be present on any query.
WHERE_FILTER,
SELECT_PROJECT,
SELECT_SORT,
// AGGREGATE, HAING_FILTER, AGGREGATE_PROJECT can only be present on aggregating queries.
AGGREGATE,
HAVING_FILTER,
AGGREGATE_PROJECT,
// SORT, SORT_PROJECT may be present on any query.
SORT,
SORT_PROJECT
}
@ -76,7 +81,6 @@ public class PartialDruidQuery
final RelNode scan,
final Filter whereFilter,
final Project selectProject,
final Sort selectSort,
final Aggregate aggregate,
final Project aggregateProject,
final Filter havingFilter,
@ -88,7 +92,6 @@ public class PartialDruidQuery
this.scan = Preconditions.checkNotNull(scan, "scan");
this.whereFilter = whereFilter;
this.selectProject = selectProject;
this.selectSort = selectSort;
this.aggregate = aggregate;
this.aggregateProject = aggregateProject;
this.havingFilter = havingFilter;
@ -102,7 +105,7 @@ public class PartialDruidQuery
scanRel.getCluster(),
scanRel.getTable().getRelOptSchema()
);
return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null, null);
return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null);
}
public RelNode getScan()
@ -120,11 +123,6 @@ public class PartialDruidQuery
return selectProject;
}
public Sort getSelectSort()
{
return selectSort;
}
public Aggregate getAggregate()
{
return aggregate;
@ -158,7 +156,6 @@ public class PartialDruidQuery
scan,
newWhereFilter,
selectProject,
selectSort,
aggregate,
aggregateProject,
havingFilter,
@ -200,24 +197,6 @@ public class PartialDruidQuery
scan,
whereFilter,
theProject,
selectSort,
aggregate,
aggregateProject,
havingFilter,
sort,
sortProject
);
}
public PartialDruidQuery withSelectSort(final Sort newSelectSort)
{
validateStage(Stage.SELECT_SORT);
return new PartialDruidQuery(
builderSupplier,
scan,
whereFilter,
selectProject,
newSelectSort,
aggregate,
aggregateProject,
havingFilter,
@ -234,7 +213,6 @@ public class PartialDruidQuery
scan,
whereFilter,
selectProject,
selectSort,
newAggregate,
aggregateProject,
havingFilter,
@ -251,7 +229,6 @@ public class PartialDruidQuery
scan,
whereFilter,
selectProject,
selectSort,
aggregate,
aggregateProject,
newHavingFilter,
@ -268,7 +245,6 @@ public class PartialDruidQuery
scan,
whereFilter,
selectProject,
selectSort,
aggregate,
newAggregateProject,
havingFilter,
@ -285,7 +261,6 @@ public class PartialDruidQuery
scan,
whereFilter,
selectProject,
selectSort,
aggregate,
aggregateProject,
havingFilter,
@ -302,7 +277,6 @@ public class PartialDruidQuery
scan,
whereFilter,
selectProject,
selectSort,
aggregate,
aggregateProject,
havingFilter,
@ -344,14 +318,11 @@ public class PartialDruidQuery
} else if (stage.compareTo(currentStage) <= 0) {
// Cannot go backwards.
return false;
} else if (stage.compareTo(Stage.AGGREGATE) > 0 && aggregate == null) {
} else if (stage.compareTo(Stage.AGGREGATE) > 0 && stage.compareTo(Stage.SORT) < 0 && aggregate == null) {
// Cannot do post-aggregation stages without an aggregation.
return false;
} else if (stage.compareTo(Stage.AGGREGATE) >= 0 && selectSort != null) {
// Cannot do any aggregations after a select + sort.
return false;
} else if (stage.compareTo(Stage.SORT) > 0 && sort == null) {
// Cannot add sort project without a sort
// Cannot do post-sort stages without a sort.
return false;
} else {
// Looks good.
@ -378,8 +349,6 @@ public class PartialDruidQuery
return Stage.HAVING_FILTER;
} else if (aggregate != null) {
return Stage.AGGREGATE;
} else if (selectSort != null) {
return Stage.SELECT_SORT;
} else if (selectProject != null) {
return Stage.SELECT_PROJECT;
} else if (whereFilter != null) {
@ -409,8 +378,6 @@ public class PartialDruidQuery
return havingFilter;
case AGGREGATE:
return aggregate;
case SELECT_SORT:
return selectSort;
case SELECT_PROJECT:
return selectProject;
case WHERE_FILTER:
@ -442,7 +409,6 @@ public class PartialDruidQuery
return Objects.equals(scan, that.scan) &&
Objects.equals(whereFilter, that.whereFilter) &&
Objects.equals(selectProject, that.selectProject) &&
Objects.equals(selectSort, that.selectSort) &&
Objects.equals(aggregate, that.aggregate) &&
Objects.equals(havingFilter, that.havingFilter) &&
Objects.equals(aggregateProject, that.aggregateProject) &&
@ -457,7 +423,6 @@ public class PartialDruidQuery
scan,
whereFilter,
selectProject,
selectSort,
aggregate,
havingFilter,
aggregateProject,
@ -473,7 +438,6 @@ public class PartialDruidQuery
"scan=" + scan +
", whereFilter=" + whereFilter +
", selectProject=" + selectProject +
", selectSort=" + selectSort +
", aggregate=" + aggregate +
", havingFilter=" + havingFilter +
", aggregateProject=" + aggregateProject +

View File

@ -0,0 +1,257 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexNode;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
/**
* Used to represent projections (Calcite "Project"). These are embedded in {@link Sorting} and {@link Grouping} to
* store post-sorting and post-grouping projections, as well as directly in {@link DruidQuery} to store potential
* post-selection projections. They may be built using either virtual columns (pre-aggregation) or post-aggregators.
*
* It is expected that callers will create and use Projection instances in the same context (pre- or post-aggregation).
* If this isn't done properly (i.e. a caller creates a pre-aggregation Projection but then calls
* {@link #getPostAggregators()} then an exception will be thrown.
*/
public class Projection
{
@Nullable
private final List<PostAggregator> postAggregators;
@Nullable
private final List<VirtualColumn> virtualColumns;
private final RowSignature outputRowSignature;
private Projection(
@Nullable final List<PostAggregator> postAggregators,
@Nullable final List<VirtualColumn> virtualColumns,
final RowSignature outputRowSignature
)
{
if (postAggregators == null && virtualColumns == null) {
throw new IAE("postAggregators and virtualColumns cannot both be null");
} else if (postAggregators != null && virtualColumns != null) {
throw new IAE("postAggregators and virtualColumns cannot both be nonnull");
}
this.postAggregators = postAggregators;
this.virtualColumns = virtualColumns;
this.outputRowSignature = outputRowSignature;
}
public static Projection postAggregation(
final Project project,
final PlannerContext plannerContext,
final RowSignature inputRowSignature,
final String basePrefix
)
{
final List<String> rowOrder = new ArrayList<>();
final List<PostAggregator> postAggregators = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix(
basePrefix,
new TreeSet<>(inputRowSignature.getRowOrder())
);
int outputNameCounter = 0;
for (final RexNode postAggregatorRexNode : project.getChildExps()) {
// Attempt to convert to PostAggregator.
final DruidExpression postAggregatorExpression = Expressions.toDruidExpression(
plannerContext,
inputRowSignature,
postAggregatorRexNode
);
if (postAggregatorExpression == null) {
throw new CannotBuildQueryException(project, postAggregatorRexNode);
}
if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
// Direct column access, without any type cast as far as Druid's runtime is concerned.
// (There might be a SQL-level type cast that we don't care about)
rowOrder.add(postAggregatorExpression.getDirectColumn());
} else {
final String postAggregatorName = outputNamePrefix + outputNameCounter++;
final PostAggregator postAggregator = new ExpressionPostAggregator(
postAggregatorName,
postAggregatorExpression.getExpression(),
null,
plannerContext.getExprMacroTable()
);
postAggregators.add(postAggregator);
rowOrder.add(postAggregator.getName());
}
}
return new Projection(postAggregators, null, RowSignature.from(rowOrder, project.getRowType()));
}
public static Projection preAggregation(
final Project project,
final PlannerContext plannerContext,
final RowSignature inputRowSignature,
final VirtualColumnRegistry virtualColumnRegistry
)
{
final List<DruidExpression> expressions = new ArrayList<>();
for (final RexNode rexNode : project.getChildExps()) {
final DruidExpression expression = Expressions.toDruidExpression(
plannerContext,
inputRowSignature,
rexNode
);
if (expression == null) {
throw new CannotBuildQueryException(project, rexNode);
} else {
expressions.add(expression);
}
}
final Set<VirtualColumn> virtualColumns = new HashSet<>();
final List<String> rowOrder = new ArrayList<>();
for (int i = 0; i < expressions.size(); i++) {
final DruidExpression expression = expressions.get(i);
if (expression.isDirectColumnAccess()) {
rowOrder.add(expression.getDirectColumn());
} else {
final VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
expression,
project.getChildExps().get(i).getType().getSqlTypeName()
);
virtualColumns.add(virtualColumn);
rowOrder.add(virtualColumn.getOutputName());
}
}
return new Projection(
null,
ImmutableList.copyOf(virtualColumns),
RowSignature.from(rowOrder, project.getRowType())
);
}
/**
* Returns true if a post-aggregation "expression" can be realized as a direct field access. This is true if it's
* a direct column access that doesn't require an implicit cast.
*
* @param aggregateRowSignature signature of the aggregation
* @param expression post-aggregation expression
* @param rexNode RexNode for the post-aggregation expression
*
* @return yes or no
*/
private static boolean postAggregatorDirectColumnIsOk(
final RowSignature aggregateRowSignature,
final DruidExpression expression,
final RexNode rexNode
)
{
if (!expression.isDirectColumnAccess()) {
return false;
}
// Check if a cast is necessary.
final ExprType toExprType = Expressions.exprTypeForValueType(
aggregateRowSignature.getColumnType(expression.getDirectColumn())
);
final ExprType fromExprType = Expressions.exprTypeForValueType(
Calcites.getValueTypeForSqlTypeName(rexNode.getType().getSqlTypeName())
);
return toExprType.equals(fromExprType);
}
public List<PostAggregator> getPostAggregators()
{
// If you ever see this error, it probably means a Projection was created in pre-aggregation mode, but then
// used in a post-aggregation context. This is likely a bug somewhere in DruidQuery. See class-level Javadocs.
return Preconditions.checkNotNull(postAggregators, "postAggregators");
}
public List<VirtualColumn> getVirtualColumns()
{
// If you ever see this error, it probably means a Projection was created in post-aggregation mode, but then
// used in a pre-aggregation context. This is likely a bug somewhere in DruidQuery. See class-level Javadocs.
return Preconditions.checkNotNull(virtualColumns, "virtualColumns");
}
public RowSignature getOutputRowSignature()
{
return outputRowSignature;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Projection that = (Projection) o;
return Objects.equals(postAggregators, that.postAggregators) &&
Objects.equals(virtualColumns, that.virtualColumns) &&
Objects.equals(outputRowSignature, that.outputRowSignature);
}
@Override
public int hashCode()
{
return Objects.hash(postAggregators, virtualColumns, outputRowSignature);
}
@Override
public String toString()
{
return "PostSortingExpressions{" +
"postAggregators=" + postAggregators +
", virtualColumns=" + virtualColumns +
", outputRowSignature=" + outputRowSignature +
'}';
}
}

View File

@ -1,90 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.table.RowSignature;
import java.util.List;
import java.util.Objects;
public class SelectProjection
{
private final List<String> directColumns;
private final List<VirtualColumn> virtualColumns;
private final RowSignature outputRowSignature;
public SelectProjection(
final List<String> directColumns,
final List<VirtualColumn> virtualColumns,
final RowSignature outputRowSignature
)
{
this.directColumns = directColumns;
this.virtualColumns = virtualColumns;
this.outputRowSignature = outputRowSignature;
}
public List<String> getDirectColumns()
{
return directColumns;
}
public List<VirtualColumn> getVirtualColumns()
{
return virtualColumns;
}
public RowSignature getOutputRowSignature()
{
return outputRowSignature;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SelectProjection that = (SelectProjection) o;
return Objects.equals(directColumns, that.directColumns) &&
Objects.equals(virtualColumns, that.virtualColumns) &&
Objects.equals(outputRowSignature, that.outputRowSignature);
}
@Override
public int hashCode()
{
return Objects.hash(directColumns, virtualColumns, outputRowSignature);
}
@Override
public String toString()
{
return "SelectProjection{" +
"directColumns=" + directColumns +
", virtualColumns=" + virtualColumns +
", outputRowSignature=" + outputRowSignature +
'}';
}
}

View File

@ -1,109 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.sql.calcite.table.RowSignature;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class SortProject
{
private final RowSignature inputRowSignature;
private final List<PostAggregator> postAggregators;
private final RowSignature outputRowSignature;
SortProject(
RowSignature inputRowSignature,
List<PostAggregator> postAggregators,
RowSignature outputRowSignature
)
{
this.inputRowSignature = Preconditions.checkNotNull(inputRowSignature, "inputRowSignature");
this.postAggregators = Preconditions.checkNotNull(postAggregators, "postAggregators");
this.outputRowSignature = Preconditions.checkNotNull(outputRowSignature, "outputRowSignature");
final Set<String> inputColumnNames = new HashSet<>(inputRowSignature.getRowOrder());
final Set<String> postAggregatorNames = postAggregators.stream()
.map(PostAggregator::getName)
.collect(Collectors.toSet());
// Verify no collisions between inputs and outputs.
for (String postAggregatorName : postAggregatorNames) {
if (inputColumnNames.contains(postAggregatorName)) {
throw new ISE("Duplicate field name: %s", postAggregatorName);
}
}
// Verify that items in the output signature exist.
outputRowSignature.getRowOrder().forEach(field -> {
if (!inputColumnNames.contains(field) && !postAggregatorNames.contains(field)) {
throw new ISE("Missing field in rowOrder: %s", field);
}
});
}
public List<PostAggregator> getPostAggregators()
{
return postAggregators;
}
public RowSignature getOutputRowSignature()
{
return outputRowSignature;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SortProject sortProject = (SortProject) o;
return Objects.equals(inputRowSignature, sortProject.inputRowSignature) &&
Objects.equals(postAggregators, sortProject.postAggregators) &&
Objects.equals(outputRowSignature, sortProject.outputRowSignature);
}
@Override
public int hashCode()
{
return Objects.hash(inputRowSignature, postAggregators, outputRowSignature);
}
@Override
public String toString()
{
return "SortProject{" +
"inputRowSignature=" + inputRowSignature +
", postAggregators=" + postAggregators +
", outputRowSignature=" + outputRowSignature +
'}';
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
/**
* Represents Druid's concept of sorting and limiting, including post-sort projections. The sorting and limiting piece
* may map onto multiple Druid concepts: LimitSpec (for groupBy), TopNMetricSpec and threshold (for topN),
* "descending" (for timeseries), or ScanQuery.Order (for scan). The post-sort projections will map onto either
* post-aggregations (for query types that aggregate) or virtual columns (for query types that don't).
*
* This corresponds to a Calcite Sort + optional Project.
*/
public class Sorting
{
enum SortKind
{
UNORDERED,
TIME_ASCENDING,
TIME_DESCENDING,
NON_TIME
}
private final List<OrderByColumnSpec> orderBys;
@Nullable
private final Projection projection;
@Nullable
private final Long limit;
private Sorting(
final List<OrderByColumnSpec> orderBys,
@Nullable final Long limit,
@Nullable final Projection projection
)
{
this.orderBys = Preconditions.checkNotNull(orderBys, "orderBys");
this.limit = limit;
this.projection = projection;
}
public static Sorting create(
final List<OrderByColumnSpec> orderBys,
@Nullable final Long limit,
@Nullable final Projection projection
)
{
return new Sorting(orderBys, limit, projection);
}
public SortKind getSortKind(final String timeColumn)
{
if (orderBys.isEmpty()) {
return SortKind.UNORDERED;
} else {
if (orderBys.size() == 1) {
final OrderByColumnSpec orderBy = Iterables.getOnlyElement(orderBys);
if (orderBy.getDimension().equals(timeColumn)) {
return orderBy.getDirection() == OrderByColumnSpec.Direction.ASCENDING
? SortKind.TIME_ASCENDING
: SortKind.TIME_DESCENDING;
}
}
return SortKind.NON_TIME;
}
}
public List<OrderByColumnSpec> getOrderBys()
{
return orderBys;
}
@Nullable
public Projection getProjection()
{
return projection;
}
public boolean isLimited()
{
return limit != null;
}
@Nullable
public Long getLimit()
{
return limit;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Sorting sorting = (Sorting) o;
return Objects.equals(orderBys, sorting.orderBys) &&
Objects.equals(projection, sorting.projection) &&
Objects.equals(limit, sorting.limit);
}
@Override
public int hashCode()
{
return Objects.hash(orderBys, projection, limit);
}
@Override
public String toString()
{
return "Sorting{" +
"orderBys=" + orderBys +
", projection=" + projection +
", limit=" + limit +
'}';
}
}

View File

@ -32,52 +32,38 @@ import java.util.Map;
import java.util.TreeSet;
/**
* Wraps a {@link RowSignature} and provides facilities to re-use {@link VirtualColumn} definitions for dimensions,
* filters, and filtered aggregators while constructing a {@link DruidQuery}
* Provides facilities to create and re-use {@link VirtualColumn} definitions for dimensions, filters, and filtered
* aggregators while constructing a {@link DruidQuery}.
*/
public class DruidQuerySignature
public class VirtualColumnRegistry
{
private final RowSignature rowSignature;
private final boolean isAggregateSignature;
private final RowSignature baseRowSignature;
private final Map<String, VirtualColumn> virtualColumnsByExpression;
private final Map<String, VirtualColumn> virtualColumnsByName;
private final String virtualColumnPrefix;
private int virtualColumnCounter;
public DruidQuerySignature(RowSignature rowSignature)
{
this.isAggregateSignature = false;
this.rowSignature = rowSignature;
this.virtualColumnPrefix = rowSignature == null ? "v" : Calcites.findUnusedPrefix(
"v",
new TreeSet<>(rowSignature.getRowOrder())
);
this.virtualColumnsByExpression = new HashMap<>();
this.virtualColumnsByName = new HashMap<>();
}
private DruidQuerySignature(
RowSignature rowSignature,
String prefix,
private VirtualColumnRegistry(
RowSignature baseRowSignature,
String virtualColumnPrefix,
Map<String, VirtualColumn> virtualColumnsByExpression,
Map<String, VirtualColumn> virtualColumnsByName,
boolean isAggregateSignature
Map<String, VirtualColumn> virtualColumnsByName
)
{
this.isAggregateSignature = isAggregateSignature;
this.rowSignature = rowSignature;
this.virtualColumnPrefix = prefix;
this.baseRowSignature = baseRowSignature;
this.virtualColumnPrefix = virtualColumnPrefix;
this.virtualColumnsByExpression = virtualColumnsByExpression;
this.virtualColumnsByName = virtualColumnsByName;
}
/**
* Get {@link RowSignature} of {@link DruidQuery} under construction
*/
public RowSignature getRowSignature()
public static VirtualColumnRegistry create(final RowSignature rowSignature)
{
return rowSignature;
return new VirtualColumnRegistry(
rowSignature,
Calcites.findUnusedPrefix("v", new TreeSet<>(rowSignature.getRowOrder())),
new HashMap<>(),
new HashMap<>()
);
}
/**
@ -88,19 +74,16 @@ public class DruidQuerySignature
return virtualColumnsByName.containsKey(virtualColumnName);
}
/**
* Get existing or create new (if not {@link DruidQuerySignature#isAggregateSignature}) {@link VirtualColumn} for a given
* {@link DruidExpression}
* Get existing or create new {@link VirtualColumn} for a given {@link DruidExpression}.
*/
@Nullable
public VirtualColumn getOrCreateVirtualColumnForExpression(
PlannerContext plannerContext,
DruidExpression expression,
SqlTypeName typeName
)
{
if (!isAggregateSignature && !virtualColumnsByExpression.containsKey(expression.getExpression())) {
if (!virtualColumnsByExpression.containsKey(expression.getExpression())) {
final String virtualColumnName = virtualColumnPrefix + virtualColumnCounter++;
final VirtualColumn virtualColumn = expression.toVirtualColumn(
virtualColumnName,
@ -126,23 +109,25 @@ public class DruidQuerySignature
@Nullable
public VirtualColumn getVirtualColumn(String virtualColumnName)
{
return virtualColumnsByName.getOrDefault(virtualColumnName, null);
return virtualColumnsByName.get(virtualColumnName);
}
/**
* Create as an "immutable" "aggregate" signature for a grouping, so that post aggregations and having filters
* can not define new virtual columns
* @param sourceSignature
* @return
* Get a signature representing the base signature plus all registered virtual columns.
*/
public DruidQuerySignature asAggregateSignature(RowSignature sourceSignature)
public RowSignature getFullRowSignature()
{
return new DruidQuerySignature(
sourceSignature,
virtualColumnPrefix,
virtualColumnsByExpression,
virtualColumnsByName,
true
);
final RowSignature.Builder builder = RowSignature.builder();
for (String columnName : baseRowSignature.getRowOrder()) {
builder.add(columnName, baseRowSignature.getColumnType(columnName));
}
for (VirtualColumn virtualColumn : virtualColumnsByName.values()) {
final String columnName = virtualColumn.getOutputName();
builder.add(columnName, virtualColumn.capabilities(columnName).getType());
}
return builder.build();
}
}

View File

@ -59,11 +59,6 @@ public class DruidRules
PartialDruidQuery.Stage.SELECT_PROJECT,
PartialDruidQuery::withSelectProject
),
new DruidQueryRule<>(
Sort.class,
PartialDruidQuery.Stage.SELECT_SORT,
PartialDruidQuery::withSelectSort
),
new DruidQueryRule<>(
Aggregate.class,
PartialDruidQuery.Stage.AGGREGATE,

View File

@ -31,7 +31,8 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import java.util.ArrayList;
import java.util.List;
@ -52,7 +53,8 @@ public class GroupByRules
*/
public static Aggregation translateAggregateCall(
final PlannerContext plannerContext,
final DruidQuerySignature querySignature,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final Project project,
final List<Aggregation> existingAggregations,
@ -71,11 +73,18 @@ public class GroupByRules
}
final RexNode expression = project.getChildExps().get(call.filterArg);
final DimFilter nonOptimizedFilter = Expressions.toFilter(plannerContext, querySignature, expression);
final DimFilter nonOptimizedFilter = Expressions.toFilter(
plannerContext,
rowSignature,
virtualColumnRegistry,
expression
);
if (nonOptimizedFilter == null) {
return null;
} else {
filter = Filtration.create(nonOptimizedFilter).optimizeFilterOnly(querySignature).getDimFilter();
filter = Filtration.create(nonOptimizedFilter)
.optimizeFilterOnly(virtualColumnRegistry.getFullRowSignature())
.getDimFilter();
}
} else {
filter = null;
@ -121,9 +130,13 @@ public class GroupByRules
final Aggregation retVal = sqlAggregator.toDruidAggregation(
plannerContext,
querySignature,
rowSignature,
virtualColumnRegistry,
rexBuilder,
name, call, project, existingAggregationsWithSameFilter,
name,
call,
project,
existingAggregationsWithSameFilter,
finalizeAggregations
);
@ -134,7 +147,7 @@ public class GroupByRules
if (isUsingExistingAggregation(retVal, existingAggregationsWithSameFilter)) {
return retVal;
} else {
return retVal.filter(querySignature, filter);
return retVal.filter(rowSignature, virtualColumnRegistry, filter);
}
}
}

View File

@ -743,6 +743,106 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testSelectStarFromSelectSingleColumnWithLimitDescending() throws Exception
{
testQuery(
"SELECT * FROM (SELECT dim1 FROM druid.foo ORDER BY __time DESC) LIMIT 2",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns(ImmutableList.of("__time", "dim1"))
.limit(2)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"abc"},
new Object[]{"def"}
)
);
}
@Test
public void testSelectProjectionFromSelectSingleColumnWithInnerLimitDescending() throws Exception
{
testQuery(
"SELECT 'beep ' || dim1 FROM (SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 2)",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "concat('beep ',\"dim1\")", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.limit(2)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"beep abc"},
new Object[]{"beep def"}
)
);
}
@Test
public void testSelectProjectionFromSelectSingleColumnDescending() throws Exception
{
// Regression test for https://github.com/apache/incubator-druid/issues/7768.
testQuery(
"SELECT 'beep ' || dim1 FROM (SELECT dim1 FROM druid.foo ORDER BY __time DESC)",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "concat('beep ',\"dim1\")", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"beep abc"},
new Object[]{"beep def"},
new Object[]{"beep 1"},
new Object[]{"beep 2"},
new Object[]{"beep 10.1"},
new Object[]{"beep "}
)
);
}
@Test
public void testSelectProjectionFromSelectSingleColumnWithInnerAndOuterLimitDescending() throws Exception
{
testQuery(
"SELECT 'beep ' || dim1 FROM (SELECT dim1 FROM druid.foo ORDER BY __time DESC LIMIT 4) LIMIT 2",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "concat('beep ',\"dim1\")", ValueType.STRING))
.columns(ImmutableList.of("__time", "v0"))
.limit(2)
.order(ScanQuery.Order.DESCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"beep abc"},
new Object[]{"beep def"}
)
);
}
@Test
public void testGroupBySingleColumnDescendingNoTopN() throws Exception
{
@ -1823,7 +1923,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
// It's also here so when we do support these features, we can have "real" tests for these queries.
final List<String> queries = ImmutableList.of(
"SELECT dim1 FROM druid.foo ORDER BY dim1", // SELECT query with order by
"SELECT dim1 FROM druid.foo ORDER BY dim1", // SELECT query with order by non-__time
"SELECT COUNT(*) FROM druid.foo x, druid.foo y", // Self-join
"SELECT DISTINCT dim2 FROM druid.foo ORDER BY dim2 LIMIT 2 OFFSET 5", // DISTINCT with OFFSET
"SELECT COUNT(*) FROM foo WHERE dim1 NOT IN (SELECT dim1 FROM foo WHERE dim2 = 'a')", // NOT IN subquery

View File

@ -25,7 +25,6 @@ import org.apache.druid.query.filter.IntervalDimFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.rel.DruidQuerySignature;
import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.junit.Assert;
@ -45,7 +44,7 @@ public class FiltrationTest extends CalciteTestBase
)
),
null
).optimize(new DruidQuerySignature(RowSignature.builder().add(ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG).build()));
).optimize(RowSignature.builder().add(ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG).build());
Assert.assertEquals(
ImmutableList.of(Filtration.eternity()),