Window planning: use collation traits, improve subquery logic. (#13902)

* Window planning: use collation traits, improve subquery logic.

SQL changes:

1) Attach RelCollation (sorting) trait to any PartialDruidQuery
   that ends in AGGREGATE or AGGREGATE_PROJECT. This allows planning to
   take advantage of the fact that Druid sorts by dimensions when
   doing aggregations.

2) Windowing: inspect RelCollation trait from input, and insert naiveSort
   if, and only if, necessary.

3) Windowing: add support for Project after Window, when the Project
   is a simple mapping. Helps eliminate subqueries.

4) DruidRules: update logic for considering subqueries to reflect that
   subqueries are not required to be GroupBys, and that we have a bunch
   of new Stages now. With all of this evolution that has happened, the
   old logic didn't quite make sense.

Native changes:

1) Use merge sort (stable) rather than quicksort when sorting
   RowsAndColumns. Makes it easier to write test cases for plans that
   involve re-sorting the data.

* Changes from review.

* Mark the bad test as failing.

* Additional update.

* Fix failingTest.

* Fix tests.

* Mark a var final.
This commit is contained in:
Gian Merlino 2023-03-09 15:48:13 -08:00 committed by GitHub
parent fe9d0c46d5
commit bf39b4d313
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1550 additions and 342 deletions

View File

@ -291,7 +291,8 @@ public class ArrayListRowsAndColumns<RowType> implements AppendableRowsAndColumn
swappers.add(swapper);
}
Arrays.quickSort(
// Use stable sort, so peer rows retain original order.
Arrays.mergeSort(
0,
rows.size(),
(lhs, rhs) -> {

View File

@ -97,7 +97,9 @@ public class DefaultNaiveSortMaker implements NaiveSortMaker
}
final int numColsToCompare = index;
Arrays.quickSort(
// Use stable sort, so peer rows retain original order.
Arrays.mergeSort(
0,
rac.numRows(),
(k1, k2) -> {

View File

@ -122,7 +122,7 @@ public class DruidCorrelateUnnestRel extends DruidRel<DruidCorrelateUnnestRel>
{
return new DruidCorrelateUnnestRel(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
newQueryBuilder.getTraitSet(getConvention()),
correlateRel,
newQueryBuilder,
leftFilter,

View File

@ -124,7 +124,7 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
{
return new DruidJoinQueryRel(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
newQueryBuilder.getTraitSet(getConvention()),
joinRel,
leftFilter,
newQueryBuilder,
@ -136,7 +136,7 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final DruidRel<?> leftDruidRel = (DruidRel<?>) left;
final DruidQuery leftQuery = Preconditions.checkNotNull((leftDruidRel).toDruidQuery(false), "leftQuery");
final DruidQuery leftQuery = Preconditions.checkNotNull(leftDruidRel.toDruidQuery(false), "leftQuery");
final RowSignature leftSignature = leftQuery.getOutputRowSignature();
final DataSource leftDataSource;

View File

@ -73,7 +73,7 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
{
return new DruidOuterQueryRel(
sourceRel.getCluster(),
sourceRel.getTraitSet().plusAll(partialQuery.getRelTraits()),
partialQuery.getTraitSet(sourceRel.getConvention()),
sourceRel,
partialQuery,
sourceRel.getPlannerContext()
@ -91,7 +91,7 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
{
return new DruidOuterQueryRel(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
newQueryBuilder.getTraitSet(getConvention()),
sourceRel,
newQueryBuilder,
getPlannerContext()

View File

@ -473,6 +473,7 @@ public class DruidQuery
* @param virtualColumnRegistry re-usable virtual column references
* @param typeFactory factory for SQL types
* @return dimensions
*
* @throws CannotBuildQueryException if dimensions cannot be computed
*/
private static List<DimensionExpression> computeDimensions(
@ -582,7 +583,9 @@ public class DruidQuery
* @param finalizeAggregations true if this query should include explicit finalization for all of its
* aggregators, where required. Useful for subqueries where Druid's native query layer
* does not do this automatically.
*
* @return aggregations
*
* @throws CannotBuildQueryException if dimensions cannot be computed
*/
private static List<Aggregation> computeAggregations(
@ -875,8 +878,6 @@ public class DruidQuery
* <p>
* Necessary because some combinations are unsafe, mainly because they would lead to the creation of too many
* time-granular buckets during query processing.
*
* @see Granularity#getIterable(Interval) the problematic method call we are trying to avoid
*/
private static boolean canUseQueryGranularity(
final DataSource dataSource,
@ -953,11 +954,6 @@ public class DruidQuery
*/
private Query<?> computeQuery()
{
if (windowing != null) {
// Windowing can only be handled by window queries.
return toWindowQuery();
}
if (dataSource instanceof QueryDataSource) {
// If there is a subquery, then we prefer the outer query to be a groupBy if possible, since this potentially
// enables more efficient execution. (The groupBy query toolchest can handle some subqueries by itself, without
@ -969,6 +965,11 @@ public class DruidQuery
}
}
final WindowOperatorQuery operatorQuery = toWindowQuery();
if (operatorQuery != null) {
return operatorQuery;
}
final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery();
if (timeBoundaryQuery != null) {
return timeBoundaryQuery;
@ -1009,7 +1010,8 @@ public class DruidQuery
|| grouping == null
|| grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
|| grouping.getHavingFilter() != null
|| selectProjection != null) {
|| selectProjection != null
|| windowing != null) {
return null;
}
@ -1073,7 +1075,8 @@ public class DruidQuery
if (!plannerContext.engineHasFeature(EngineFeature.TIMESERIES_QUERY)
|| grouping == null
|| grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
|| grouping.getHavingFilter() != null) {
|| grouping.getHavingFilter() != null
|| windowing != null) {
return null;
}
@ -1193,7 +1196,7 @@ public class DruidQuery
}
// Must have GROUP BY one column, no GROUPING SETS, ORDER BY 1 column, LIMIT > 0 and maxTopNLimit,
// no OFFSET, no HAVING.
// no OFFSET, no HAVING, no windowing.
final boolean topNOk = grouping != null
&& grouping.getDimensions().size() == 1
&& !grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
@ -1204,7 +1207,8 @@ public class DruidQuery
&& sorting.getOffsetLimit().getLimit() <= plannerContext.getPlannerConfig()
.getMaxTopNLimit()
&& !sorting.getOffsetLimit().hasOffset())
&& grouping.getHavingFilter() == null;
&& grouping.getHavingFilter() == null
&& windowing == null;
if (!topNOk) {
return null;
@ -1283,7 +1287,7 @@ public class DruidQuery
@Nullable
private GroupByQuery toGroupByQuery()
{
if (grouping == null) {
if (grouping == null || windowing != null) {
return null;
}
@ -1428,8 +1432,8 @@ public class DruidQuery
@Nullable
private ScanQuery toScanQuery()
{
if (grouping != null) {
// Scan cannot GROUP BY.
if (grouping != null || windowing != null) {
// Scan cannot GROUP BY or do windows.
return null;
}
@ -1483,16 +1487,16 @@ public class DruidQuery
// Cannot handle this ordering.
// Scan cannot ORDER BY non-time columns.
plannerContext.setPlanningError(
"SQL query requires order by non-time column %s that is not supported.",
"SQL query requires order by non-time column %s, which is not supported.",
orderByColumns
);
return null;
}
if (!dataSource.isConcrete()) {
// Cannot handle this ordering.
// Scan cannot ORDER BY non-time columns.
// Scan cannot ORDER BY non-concrete datasources on _any_ column.
plannerContext.setPlanningError(
"SQL query is a scan and requires order by on a datasource[%s], which is not supported.",
"SQL query requires order by on non-concrete datasource [%s], which is not supported.",
dataSource
);
return null;

View File

@ -37,7 +37,6 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.DruidTable;
import javax.annotation.Nullable;
import java.util.Set;
/**
@ -170,7 +169,7 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
{
return new DruidQueryRel(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
newQueryBuilder.getTraitSet(getConvention()),
table,
druidTable,
getPlannerContext(),

View File

@ -107,7 +107,7 @@ public class DruidUnionDataSourceRel extends DruidRel<DruidUnionDataSourceRel>
{
return new DruidUnionDataSourceRel(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
newQueryBuilder.getTraitSet(getConvention()),
unionRel,
unionColumnNames,
newQueryBuilder,

View File

@ -20,8 +20,15 @@
package org.apache.druid.sql.calcite.rel;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
@ -31,6 +38,7 @@ import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
@ -41,6 +49,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
@ -61,6 +70,7 @@ public class PartialDruidQuery
private final Sort sort;
private final Project sortProject;
private final Window window;
private final Project windowProject;
public enum Stage
{
@ -71,19 +81,53 @@ public class PartialDruidQuery
WHERE_FILTER,
SELECT_PROJECT,
// AGGREGATE, HAVING_FILTER, AGGREGATE_PROJECT can only be present on non-WINDOW aggregating queries.
// AGGREGATE, HAVING_FILTER, AGGREGATE_PROJECT can be present on non-WINDOW aggregating queries.
AGGREGATE,
HAVING_FILTER,
AGGREGATE_PROJECT,
HAVING_FILTER {
@Override
public boolean canFollow(Stage stage)
{
return stage == AGGREGATE;
}
},
AGGREGATE_PROJECT {
@Override
public boolean canFollow(Stage stage)
{
return stage == AGGREGATE || stage == HAVING_FILTER;
}
},
// SORT, SORT_PROJECT may be present on any query, except ones with WINDOW.
SORT,
SORT_PROJECT,
SORT_PROJECT {
@Override
public boolean canFollow(Stage stage)
{
return stage == SORT;
}
},
// WINDOW may be present only together with SCAN.
WINDOW,
// WINDOW, WINDOW_PROJECT may be present only together with SCAN.
WINDOW {
@Override
public boolean canFollow(Stage stage)
{
return stage == SCAN;
}
},
WINDOW_PROJECT {
@Override
public boolean canFollow(Stage stage)
{
return stage == WINDOW;
}
};
UNNEST_PROJECT
public boolean canFollow(final Stage stage)
{
return stage.compareTo(this) < 0;
}
}
private PartialDruidQuery(
@ -97,6 +141,7 @@ public class PartialDruidQuery
final Sort sort,
final Project sortProject,
final Window window,
final Project windowProject,
final Project unnestProject
)
{
@ -110,16 +155,28 @@ public class PartialDruidQuery
this.sort = sort;
this.sortProject = sortProject;
this.window = window;
this.windowProject = windowProject;
this.unnestProject = unnestProject;
}
public static PartialDruidQuery create(final RelNode scanRel)
public static PartialDruidQuery create(final RelNode inputRel)
{
final Supplier<RelBuilder> builderSupplier = () -> RelFactories.LOGICAL_BUILDER.create(
scanRel.getCluster(),
scanRel.getTable() != null ? scanRel.getTable().getRelOptSchema() : null
inputRel.getCluster(),
inputRel.getTable() != null ? inputRel.getTable().getRelOptSchema() : null
);
return new PartialDruidQuery(builderSupplier, inputRel, null, null, null, null, null, null, null, null, null, null);
}
public static PartialDruidQuery createOuterQuery(final PartialDruidQuery inputQuery)
{
final RelNode inputRel = inputQuery.leafRel();
return create(
inputRel.copy(
inputQuery.getTraitSet(inputRel.getConvention()),
inputRel.getInputs()
)
);
return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null, null, null);
}
public RelNode getScan()
@ -172,6 +229,11 @@ public class PartialDruidQuery
return window;
}
public Project getWindowProject()
{
return windowProject;
}
public PartialDruidQuery withWhereFilter(final Filter newWhereFilter)
{
validateStage(Stage.WHERE_FILTER);
@ -186,6 +248,7 @@ public class PartialDruidQuery
sort,
sortProject,
window,
windowProject,
unnestProject
);
}
@ -230,6 +293,7 @@ public class PartialDruidQuery
sort,
sortProject,
window,
windowProject,
unnestProject
);
}
@ -248,6 +312,7 @@ public class PartialDruidQuery
sort,
sortProject,
window,
windowProject,
unnestProject
);
}
@ -266,6 +331,7 @@ public class PartialDruidQuery
sort,
sortProject,
window,
windowProject,
unnestProject
);
}
@ -284,6 +350,7 @@ public class PartialDruidQuery
sort,
sortProject,
window,
windowProject,
unnestProject
);
}
@ -302,6 +369,7 @@ public class PartialDruidQuery
newSort,
sortProject,
window,
windowProject,
unnestProject
);
}
@ -320,6 +388,7 @@ public class PartialDruidQuery
sort,
newSortProject,
window,
windowProject,
unnestProject
);
}
@ -338,6 +407,7 @@ public class PartialDruidQuery
sort,
sortProject,
newWindow,
windowProject,
unnestProject
);
}
@ -355,18 +425,96 @@ public class PartialDruidQuery
sort,
sortProject,
window,
windowProject,
newUnnestProject
);
}
public PartialDruidQuery withWindowProject(final Project newWindowProject)
{
validateStage(Stage.WINDOW_PROJECT);
return new PartialDruidQuery(
builderSupplier,
scan,
whereFilter,
selectProject,
aggregate,
aggregateProject,
havingFilter,
sort,
sortProject,
window,
newWindowProject,
unnestProject
);
}
public RelDataType getRowType()
{
return leafRel().getRowType();
}
public RelTrait[] getRelTraits()
/**
* Get traits for this partial query.
*
* This is the traits from {@link #leafRel()}, plus {@link RelCollationTraitDef} if {@link #stage()} is
* {@link Stage#AGGREGATE} or {@link Stage#AGGREGATE_PROJECT} (to represent the fact that Druid sorts by grouping
* keys when grouping).
*
* @param convention convention to include in the returned array
*/
public RelTraitSet getTraitSet(final Convention convention)
{
return leafRel().getTraitSet().toArray(new RelTrait[0]);
final RelTraitSet leafRelTraits = leafRel().getTraitSet();
final Stage currentStage = stage();
switch (currentStage) {
case AGGREGATE:
case AGGREGATE_PROJECT:
final RelCollation collation = leafRelTraits.getTrait(RelCollationTraitDef.INSTANCE);
if ((collation == null || collation.getFieldCollations().isEmpty()) && aggregate.getGroupSets().size() == 1) {
// Druid sorts by grouping keys when grouping. Add the collation.
// Note: [aggregate.getGroupSets().size() == 1] above means that collation isn't added for GROUPING SETS.
final List<RelFieldCollation> sortFields = new ArrayList<>();
if (currentStage == Stage.AGGREGATE) {
for (int i = 0; i < aggregate.getGroupCount(); i++) {
sortFields.add(new RelFieldCollation(i));
}
} else {
// AGGREGATE_PROJECT
final List<RexNode> projectExprs = aggregateProject.getProjects();
// Build a map of all Project exprs that are input refs. Project expr index -> dimension index.
final Int2IntMap dimensionMapping = new Int2IntOpenHashMap();
dimensionMapping.defaultReturnValue(-1);
for (int i = 0; i < projectExprs.size(); i++) {
RexNode projectExpr = projectExprs.get(i);
if (projectExpr.isA(SqlKind.INPUT_REF)) {
dimensionMapping.put(((RexInputRef) projectExpr).getIndex(), i);
}
}
// Add collations for dimensions so long as they are all mappings.
for (int i = 0; i < aggregate.getGroupCount(); i++) {
final int mapping = dimensionMapping.applyAsInt(i);
if (mapping >= 0) {
sortFields.add(new RelFieldCollation(mapping));
} else {
// As soon as we see a non-mapping, stop adding.
break;
}
}
}
return leafRelTraits.plus(convention).plus(RelCollations.of(sortFields));
}
// Fall through.
default:
return leafRelTraits.plus(convention);
}
}
public DruidQuery build(
@ -410,26 +558,7 @@ public class PartialDruidQuery
public boolean canAccept(final Stage stage)
{
final Stage currentStage = stage();
if (stage == Stage.WINDOW) {
// Special case: WINDOW can only be provided along with SCAN.
return currentStage == Stage.SCAN;
} else if (currentStage == Stage.SELECT_PROJECT && stage == Stage.SELECT_PROJECT) {
// Special case: allow layering SELECT_PROJECT on top of SELECT_PROJECT. Calcite's builtin rules cannot
// always collapse these, so we have to (one example: testSemiJoinWithOuterTimeExtract). See
// withSelectProject for the code here that handles this.
return true;
} else if (stage.compareTo(currentStage) <= 0) {
// Cannot go backwards.
return false;
} else if (stage.compareTo(Stage.AGGREGATE) > 0 && stage.compareTo(Stage.SORT) < 0 && aggregate == null) {
// Cannot do post-aggregation stages without an aggregation.
return false;
} else {
// If we are after the SORT phase, make sure we have a sort...
return stage.compareTo(Stage.SORT) <= 0 || sort != null;
}
return stage.canFollow(stage());
}
/**
@ -441,7 +570,9 @@ public class PartialDruidQuery
@SuppressWarnings("VariableNotUsedInsideIf")
public Stage stage()
{
if (window != null) {
if (windowProject != null) {
return Stage.WINDOW_PROJECT;
} else if (window != null) {
return Stage.WINDOW;
} else if (sortProject != null) {
return Stage.SORT_PROJECT;
@ -472,6 +603,8 @@ public class PartialDruidQuery
final Stage currentStage = stage();
switch (currentStage) {
case WINDOW_PROJECT:
return windowProject;
case WINDOW:
return window;
case SORT_PROJECT:
@ -504,7 +637,7 @@ public class PartialDruidQuery
// Account for the cost of post-scan expressions.
if (getSelectProject() != null) {
for (final RexNode rexNode : getSelectProject().getChildExps()) {
for (final RexNode rexNode : getSelectProject().getProjects()) {
if (!rexNode.isA(SqlKind.INPUT_REF)) {
cost += CostEstimates.COST_EXPRESSION;
}
@ -534,7 +667,7 @@ public class PartialDruidQuery
// Account for the cost of post-aggregation expressions.
if (getAggregateProject() != null) {
for (final RexNode rexNode : getAggregateProject().getChildExps()) {
for (final RexNode rexNode : getAggregateProject().getProjects()) {
if (!rexNode.isA(SqlKind.INPUT_REF)) {
cost += CostEstimates.COST_EXPRESSION;
}
@ -543,7 +676,7 @@ public class PartialDruidQuery
// Account for the cost of post-sort expressions.
if (getSortProject() != null) {
for (final RexNode rexNode : getSortProject().getChildExps()) {
for (final RexNode rexNode : getSortProject().getProjects()) {
if (!rexNode.isA(SqlKind.INPUT_REF)) {
cost += CostEstimates.COST_EXPRESSION;
}
@ -564,7 +697,7 @@ public class PartialDruidQuery
}
@Override
public boolean equals(final Object o)
public boolean equals(Object o)
{
if (this == o) {
return true;
@ -572,15 +705,18 @@ public class PartialDruidQuery
if (o == null || getClass() != o.getClass()) {
return false;
}
final PartialDruidQuery that = (PartialDruidQuery) o;
return Objects.equals(scan, that.scan) &&
Objects.equals(whereFilter, that.whereFilter) &&
Objects.equals(selectProject, that.selectProject) &&
Objects.equals(aggregate, that.aggregate) &&
Objects.equals(havingFilter, that.havingFilter) &&
Objects.equals(aggregateProject, that.aggregateProject) &&
Objects.equals(sort, that.sort) &&
Objects.equals(sortProject, that.sortProject);
PartialDruidQuery that = (PartialDruidQuery) o;
return Objects.equals(scan, that.scan)
&& Objects.equals(whereFilter, that.whereFilter)
&& Objects.equals(selectProject, that.selectProject)
&& Objects.equals(aggregate, that.aggregate)
&& Objects.equals(havingFilter, that.havingFilter)
&& Objects.equals(aggregateProject, that.aggregateProject)
&& Objects.equals(sort, that.sort)
&& Objects.equals(sortProject, that.sortProject)
&& Objects.equals(window, that.window)
&& Objects.equals(windowProject, that.windowProject)
&& Objects.equals(unnestProject, that.unnestProject);
}
@Override
@ -594,7 +730,10 @@ public class PartialDruidQuery
havingFilter,
aggregateProject,
sort,
sortProject
sortProject,
window,
windowProject,
unnestProject
);
}
@ -610,6 +749,8 @@ public class PartialDruidQuery
", aggregateProject=" + aggregateProject +
", sort=" + sort +
", sortProject=" + sortProject +
", window=" + window +
", windowProject=" + windowProject +
", unnestProject=" + unnestProject +
'}';
}

View File

@ -22,6 +22,8 @@ package org.apache.druid.sql.calcite.rel;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
@ -32,6 +34,7 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexWindowBound;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.QueryException;
@ -65,6 +68,7 @@ import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@ -111,7 +115,7 @@ public class Windowing
public static Windowing fromCalciteStuff(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RowSignature sourceRowSignature,
final RexBuilder rexBuilder
)
{
@ -119,31 +123,45 @@ public class Windowing
ArrayList<OperatorFactory> ops = new ArrayList<>();
final List<String> expectedOutputColumns = new ArrayList<>(rowSignature.getColumnNames());
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", rowSignature.getColumnNames());
final List<String> windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames());
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames());
int outputNameCounter = 0;
// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();
final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}
for (int i = 0; i < window.groups.size(); ++i) {
final WindowGroup group = new WindowGroup(window, window.groups.get(i), rowSignature);
final WindowGroup group = new WindowGroup(window, window.groups.get(i), sourceRowSignature);
if (i > 0) {
LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());
final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());
// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}
// Presently, the order by keys are not validated to ensure that the incoming query has pre-sorted the data
// as required by the window query. This should be done. In order to do it, we will need to know what the
// sub-query that we are running against actually looks like in order to then validate that the data will
// come back in the order expected. Unfortunately, the way that the queries are re-written to DruidRels
// loses all the context of sub-queries, making it not possible to validate this without changing how the
// various Druid rules work (i.e. a very large blast radius change). For now, it is easy enough to validate
// this when we build the native query, so we validate it there.
// Aggregations.
// Add aggregations.
final List<AggregateCall> aggregateCalls = group.getAggregateCalls();
final List<Processor> processors = new ArrayList<>();
@ -151,13 +169,13 @@ public class Windowing
for (AggregateCall aggregateCall : aggregateCalls) {
final String aggName = outputNamePrefix + outputNameCounter++;
expectedOutputColumns.add(aggName);
windowOutputColumns.add(aggName);
ProcessorMaker maker = KNOWN_WINDOW_FNS.get(aggregateCall.getAggregation().getName());
if (maker == null) {
final Aggregation aggregation = GroupByRules.translateAggregateCall(
plannerContext,
rowSignature,
sourceRowSignature,
null,
rexBuilder,
partialQuery.getSelectProject(),
@ -182,7 +200,7 @@ public class Windowing
new WindowAggregate(
aggName,
aggregateCall,
rowSignature,
sourceRowSignature,
plannerContext,
partialQuery.getSelectProject(),
window.constants,
@ -206,20 +224,37 @@ public class Windowing
throw new ISE("No processors from Window[%s], why was this code called?", window);
}
// The ordering required for partitioning is actually not important for the semantics. However, it *is*
// important that it be consistent across the query. Because if the incoming data is sorted descending
// and we try to partition on an ascending sort, we will think the data is not sorted correctly
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
ops.add(new WindowOperatorFactory(
processors.size() == 1 ?
processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0]))
));
}
return new Windowing(
RowSignatures.fromRelDataType(expectedOutputColumns, window.getRowType()),
ops
);
// Apply windowProject, if present.
if (partialQuery.getWindowProject() != null) {
// We know windowProject is a mapping due to the isMapping() check in DruidRules. Check for null anyway,
// as defensive programming.
final Mappings.TargetMapping mapping = Preconditions.checkNotNull(
partialQuery.getWindowProject().getMapping(),
"mapping for windowProject[%s]", partialQuery.getWindowProject()
);
final List<String> windowProjectOutputColumns = new ArrayList<>();
for (int i = 0; i < mapping.size(); i++) {
windowProjectOutputColumns.add(windowOutputColumns.get(mapping.getSourceOpt(i)));
}
return new Windowing(
RowSignatures.fromRelDataType(windowProjectOutputColumns, partialQuery.getWindowProject().getRowType()),
ops
);
} else {
// No windowProject.
return new Windowing(
RowSignatures.fromRelDataType(windowOutputColumns, window.getRowType()),
ops
);
}
}
private final RowSignature signature;
@ -400,4 +435,68 @@ public class Windowing
return ((Number) getConstantArgument(argPosition).getValue()).intValue();
}
}
/**
* Return a list of {@link ColumnWithDirection} corresponding to a {@link RelCollation}.
*
* @param collation collation
* @param sourceRowSignature signature of the collated rows
*/
private static LinkedHashSet<ColumnWithDirection> computeSortColumnsFromRelCollation(
final RelCollation collation,
final RowSignature sourceRowSignature
)
{
final LinkedHashSet<ColumnWithDirection> retVal = new LinkedHashSet<>();
for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
final ColumnWithDirection.Direction direction;
switch (fieldCollation.getDirection()) {
case ASCENDING:
case STRICTLY_ASCENDING:
direction = ColumnWithDirection.Direction.ASC;
break;
case DESCENDING:
case STRICTLY_DESCENDING:
direction = ColumnWithDirection.Direction.DESC;
break;
default:
// Not a useful direction. Return whatever we've come up with so far.
return retVal;
}
final ColumnWithDirection columnWithDirection = new ColumnWithDirection(
sourceRowSignature.getColumnName(fieldCollation.getFieldIndex()),
direction
);
retVal.add(columnWithDirection);
}
return retVal;
}
/**
* Whether currentSort is a prefix of priorSort. (i.e., whether data sorted by priorSort is *also* sorted
* by currentSort.)
*/
private static boolean sortMatches(
final Iterable<ColumnWithDirection> priorSort,
final Iterable<ColumnWithDirection> currentSort
)
{
final Iterator<ColumnWithDirection> priorIterator = priorSort.iterator();
final Iterator<ColumnWithDirection> currentIterator = currentSort.iterator();
while (currentIterator.hasNext()) {
if (!priorIterator.hasNext() || !currentIterator.next().equals(priorIterator.next())) {
return false;
}
}
return true;
}
}

View File

@ -105,6 +105,14 @@ public class DruidRules
if (plannerContext.queryContext().getBoolean(DruidQuery.CTX_ENABLE_WINDOW_FNS, false)) {
retVal.add(new DruidQueryRule<>(Window.class, PartialDruidQuery.Stage.WINDOW, PartialDruidQuery::withWindow));
retVal.add(
new DruidQueryRule<>(
Project.class,
PartialDruidQuery.Stage.WINDOW_PROJECT,
Project::isMapping, // We can remap fields, but not apply expressions
PartialDruidQuery::withWindowProject
)
);
retVal.add(DruidOuterQueryRule.WINDOW);
}
return retVal;
@ -113,12 +121,14 @@ public class DruidRules
public static class DruidQueryRule<RelType extends RelNode> extends RelOptRule
{
private final PartialDruidQuery.Stage stage;
private final BiFunction<PartialDruidQuery, RelType, PartialDruidQuery> f;
private final Predicate<RelType> matchesFn;
private final BiFunction<PartialDruidQuery, RelType, PartialDruidQuery> applyFn;
public DruidQueryRule(
final Class<RelType> relClass,
final PartialDruidQuery.Stage stage,
final BiFunction<PartialDruidQuery, RelType, PartialDruidQuery> f
final Predicate<RelType> matchesFn,
final BiFunction<PartialDruidQuery, RelType, PartialDruidQuery> applyFn
)
{
super(
@ -126,24 +136,35 @@ public class DruidRules
StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage)
);
this.stage = stage;
this.f = f;
this.matchesFn = matchesFn;
this.applyFn = applyFn;
}
public DruidQueryRule(
final Class<RelType> relClass,
final PartialDruidQuery.Stage stage,
final BiFunction<PartialDruidQuery, RelType, PartialDruidQuery> applyFn
)
{
this(relClass, stage, r -> true, applyFn);
}
@Override
public boolean matches(final RelOptRuleCall call)
{
final DruidRel druidRel = call.rel(1);
return druidRel.getPartialDruidQuery().canAccept(stage);
final RelType otherRel = call.rel(0);
final DruidRel<?> druidRel = call.rel(1);
return druidRel.getPartialDruidQuery().canAccept(stage) && matchesFn.test(otherRel);
}
@Override
public void onMatch(final RelOptRuleCall call)
{
final RelType otherRel = call.rel(0);
final DruidRel druidRel = call.rel(1);
final DruidRel<?> druidRel = call.rel(1);
final PartialDruidQuery newPartialDruidQuery = f.apply(druidRel.getPartialDruidQuery(), otherRel);
final DruidRel newDruidRel = druidRel.withPartialQuery(newPartialDruidQuery);
final PartialDruidQuery newPartialDruidQuery = applyFn.apply(druidRel.getPartialDruidQuery(), otherRel);
final DruidRel<?> newDruidRel = druidRel.withPartialQuery(newPartialDruidQuery);
if (newDruidRel.isValidDruidQuery()) {
call.transformTo(newDruidRel);
@ -154,6 +175,7 @@ public class DruidRules
public abstract static class DruidOuterQueryRule extends RelOptRule
{
public static final RelOptRule AGGREGATE = new DruidOuterQueryRule(
PartialDruidQuery.Stage.AGGREGATE,
operand(Aggregate.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"AGGREGATE"
)
@ -166,7 +188,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
.withAggregate(aggregate)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -176,6 +198,7 @@ public class DruidRules
};
public static final RelOptRule WHERE_FILTER = new DruidOuterQueryRule(
PartialDruidQuery.Stage.WHERE_FILTER,
operand(Filter.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"WHERE_FILTER"
)
@ -188,7 +211,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
.withWhereFilter(filter)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -198,6 +221,7 @@ public class DruidRules
};
public static final RelOptRule SELECT_PROJECT = new DruidOuterQueryRule(
PartialDruidQuery.Stage.SELECT_PROJECT,
operand(Project.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"SELECT_PROJECT"
)
@ -210,7 +234,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
.withSelectProject(filter)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -220,6 +244,7 @@ public class DruidRules
};
public static final RelOptRule SORT = new DruidOuterQueryRule(
PartialDruidQuery.Stage.SORT,
operand(Sort.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"SORT"
)
@ -232,7 +257,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
.withSort(sort)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -242,6 +267,7 @@ public class DruidRules
};
public static final RelOptRule WINDOW = new DruidOuterQueryRule(
PartialDruidQuery.Stage.WINDOW,
operand(Window.class, operandJ(DruidRel.class, null, CAN_BUILD_ON, any())),
"WINDOW"
)
@ -254,7 +280,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.create(druidRel.getPartialDruidQuery().leafRel())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
.withWindow(window)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -263,17 +289,24 @@ public class DruidRules
}
};
public DruidOuterQueryRule(final RelOptRuleOperand op, final String description)
private final PartialDruidQuery.Stage stage;
public DruidOuterQueryRule(
final PartialDruidQuery.Stage stage,
final RelOptRuleOperand op,
final String description
)
{
super(op, StringUtils.format("%s(%s)", DruidOuterQueryRel.class.getSimpleName(), description));
this.stage = stage;
}
@Override
public boolean matches(final RelOptRuleCall call)
{
// Subquery must be a groupBy, so stage must be >= AGGREGATE.
final DruidRel druidRel = call.rel(call.getRelList().size() - 1);
return druidRel.getPartialDruidQuery().stage().compareTo(PartialDruidQuery.Stage.AGGREGATE) >= 0;
// Only consider doing a subquery when the stage cannot be fused into a single query.
final DruidRel<?> druidRel = call.rel(call.getRelList().size() - 1);
return !stage.canFollow(druidRel.getPartialDruidQuery().stage());
}
}
}

View File

@ -68,6 +68,7 @@ import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery;
@ -2715,8 +2716,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ColumnType.STRING))
.columns("__time", "_v0")
.columns("__time", "v0")
.filters(new SelectorDimFilter("v0", "10.1", null))
.context(queryContext)
.build()
@ -2827,9 +2827,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ColumnType.STRING))
.columns("__time", "_v0")
.filters(new SelectorDimFilter("v0", "10.1", null))
.columns("__time", "v0")
.context(queryContext)
.build()
),
@ -3022,8 +3020,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("_v0", "\'10.1\'", ColumnType.STRING))
.columns("__time", "_v0")
.columns("__time", "v0")
.context(queryContext);
testQuery(
@ -3866,18 +3863,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
)
)
)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -4086,18 +4072,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0")))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.LEXICOGRAPHIC
)
),
Integer.MAX_VALUE
)
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(queryContext)
.build()
),
@ -4208,18 +4183,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0")))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.LEXICOGRAPHIC
)
),
Integer.MAX_VALUE
)
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(queryContext)
.build()
),

View File

@ -93,6 +93,7 @@ import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec.Direction;
import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
@ -2032,18 +2033,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("cnt", "d0", ColumnType.LONG)))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setLimitSpec(
DefaultLimitSpec
.builder()
.orderBy(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
)
.build()
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -5620,7 +5610,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
final Map<String, String> queries = ImmutableMap.of(
// SELECT query with order by non-__time.
"SELECT dim1 FROM druid.foo ORDER BY dim1",
"Possible error: SQL query requires order by non-time column [dim1 ASC] that is not supported.",
"Possible error: SQL query requires order by non-time column [dim1 ASC], which is not supported.",
// JOIN condition with not-equals (<>).
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
@ -6947,10 +6937,15 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE)
))
.setDimensions(
useDefault ? dimensions(
new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE),
new DefaultDimensionSpec("dim1", "d1")
) : dimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE)
)
)
.setDimFilter(new SelectorDimFilter("m1", "5.0", null))
.setAggregatorSpecs(aggregators(new LongMaxAggregatorFactory("a0", "__time")))
.setContext(QUERY_CONTEXT_DEFAULT)
@ -6967,7 +6962,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(
new DefaultDimensionSpec("v0", "_d0", ColumnType.LONG),
new DefaultDimensionSpec("d0", "_d1", ColumnType.STRING)
new DefaultDimensionSpec(useDefault ? "d1" : "d0", "_d1", ColumnType.STRING)
))
.setAggregatorSpecs(
aggregators(
@ -7309,8 +7304,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
useDefault
? aggregators(
aggregators(
new LongMaxAggregatorFactory("_a0", "a0"),
new LongMinAggregatorFactory("_a1", "a0"),
new LongSumAggregatorFactory("_a2:sum", "a0"),
@ -7318,17 +7312,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
new LongMaxAggregatorFactory("_a3", "d0"),
new CountAggregatorFactory("_a4")
)
: aggregators(
new LongMaxAggregatorFactory("_a0", "a0"),
new LongMinAggregatorFactory("_a1", "a0"),
new LongSumAggregatorFactory("_a2:sum", "a0"),
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a2:count"),
not(selector("a0", null, null))
),
new LongMaxAggregatorFactory("_a3", "d0"),
new CountAggregatorFactory("_a4")
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
@ -9285,18 +9268,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.LONG)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -9334,18 +9306,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.LONG)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -10176,18 +10137,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.LONG)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -10226,18 +10176,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.LEXICOGRAPHIC
)
),
Integer.MAX_VALUE
)
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -10477,19 +10416,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
)
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec("d0", OrderByColumnSpec.Direction.ASCENDING),
new OrderByColumnSpec(
"d1",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setLimitSpec(NoopLimitSpec.instance())
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
.build()
),
@ -11771,15 +11698,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
useDefault
? dimensions(
dimensions(
new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
new DefaultDimensionSpec("m2", "d1", ColumnType.DOUBLE),
new DefaultDimensionSpec("dim1", "d2")
) : dimensions(
new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
new DefaultDimensionSpec("dim1", "d1"),
new DefaultDimensionSpec("m2", "d2", ColumnType.DOUBLE)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
@ -11788,13 +11710,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
useDefault
? dimensions(
dimensions(
new DefaultDimensionSpec("d0", "_d0", ColumnType.LONG),
new DefaultDimensionSpec("d2", "_d1", ColumnType.STRING)
) : dimensions(
new DefaultDimensionSpec("d0", "_d0", ColumnType.LONG),
new DefaultDimensionSpec("d1", "_d1", ColumnType.STRING)
)
)
.setAggregatorSpecs(
@ -11803,7 +11721,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
? new CountAggregatorFactory("a0")
: new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
not(selector("d2", null, null))
not(selector("d1", null, null))
)
)
)

View File

@ -1373,7 +1373,7 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertTrue(
exception.getMessage()
.contains("Query not supported. " +
"Possible error: SQL query requires order by non-time column [dim1 ASC] that is not supported.")
"Possible error: SQL query requires order by non-time column [dim1 ASC], which is not supported.")
);
checkSqlRequestLog(false);
Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());

View File

@ -43,13 +43,13 @@ expectedResults:
- ["CL",1442019600000,-370,-154,1]
- ["HK",1442019600000,-113,2090,2]
- ["MY",1442019600000,-7,1018,3]
- ["SV",1442019600000,-1,114,4]
- ["MA",1442019600000,-1,-34,5]
- ["TW",1442019600000,0,869,6]
- ["BD",1442019600000,0,930,7]
- ["DE",1442019600000,0,604,8]
- ["PY",1442019600000,1,634,9]
- ["AR",1442019600000,1,3159,10]
- ["MA",1442019600000,-1,-34,4]
- ["SV",1442019600000,-1,114,5]
- ["BD",1442019600000,0,930,6]
- ["DE",1442019600000,0,604,7]
- ["TW",1442019600000,0,869,8]
- ["AR",1442019600000,1,3159,9]
- ["PY",1442019600000,1,634,10]
- ["PH",1442019600000,6,1375,11]
- ["RS",1442019600000,6,19,12]
- ["CO",1442019600000,12,478,13]
@ -80,8 +80,8 @@ expectedResults:
- ["GT",1442023200000,-167,7,1]
- ["IN",1442023200000,-142,2319,2]
- ["CN",1442023200000,-13,239,3]
- ["VN",1442023200000,-9,25,4]
- ["EC",1442023200000,-9,-346,5]
- ["EC",1442023200000,-9,-346,4]
- ["VN",1442023200000,-9,25,5]
- ["ES",1442023200000,-5,49,6]
- ["SE",1442023200000,3,233,7]
- ["DO",1442023200000,8,243,8]
@ -117,8 +117,8 @@ expectedResults:
- ["VE",1442026800000,-17,147,5]
- ["PE",1442026800000,-12,362,6]
- ["HN",1442026800000,-1,-1,7]
- ["PA",1442026800000,0,0,8]
- ["IR",1442026800000,0,367,9]
- ["IR",1442026800000,0,367,8]
- ["PA",1442026800000,0,0,9]
- ["RU",1442026800000,0,3247,10]
- ["IE",1442026800000,1,29,11]
- ["EG",1442026800000,16,142,12]
@ -152,10 +152,10 @@ expectedResults:
- ["IT",1442030400000,-17,1496,4]
- ["AR",1442030400000,-3,3449,5]
- ["MY",1442030400000,-3,1953,6]
- ["TW",1442030400000,0,1186,7]
- ["EC",1442030400000,0,222,8]
- ["SG",1442030400000,1,2821,9]
- ["IE",1442030400000,1,1091,10]
- ["EC",1442030400000,0,222,7]
- ["TW",1442030400000,0,1186,8]
- ["IE",1442030400000,1,1091,9]
- ["SG",1442030400000,1,2821,10]
- ["PR",1442030400000,2,-6,11]
- ["PH",1442030400000,26,1477,12]
- ["BR",1442030400000,30,1620,13]
@ -185,8 +185,8 @@ expectedResults:
- ["ZA",1442034000000,-3,126,5]
- ["VE",1442034000000,-2,585,6]
- ["CL",1442034000000,-1,-315,7]
- ["NL",1442034000000,0,1319,8]
- ["GR",1442034000000,0,63,9]
- ["GR",1442034000000,0,63,8]
- ["NL",1442034000000,0,1319,9]
- ["TH",1442034000000,0,113,10]
- ["ID",1442034000000,19,4,11]
- ["BR",1442034000000,21,1251,12]
@ -217,13 +217,13 @@ expectedResults:
- ["SA",1442037600000,-97,-72,3]
- ["VN",1442037600000,-11,-1,4]
- ["IT",1442037600000,-9,2705,5]
- ["UA",1442037600000,-1,3821,6]
- ["SK",1442037600000,-1,18,7]
- ["SK",1442037600000,-1,18,6]
- ["UA",1442037600000,-1,3821,7]
- ["UY",1442037600000,1,913,8]
- ["CL",1442037600000,2,445,9]
- ["SG",1442037600000,3,2898,10]
- ["AU",1442037600000,3,664,11]
- ["ES",1442037600000,3,-2,12]
- ["AU",1442037600000,3,664,10]
- ["ES",1442037600000,3,-2,11]
- ["SG",1442037600000,3,2898,12]
- ["MX",1442037600000,4,4668,13]
- ["DK",1442037600000,10,41,14]
- ["FI",1442037600000,14,1703,15]
@ -241,8 +241,8 @@ expectedResults:
- ["HU",1442037600000,197,881,27]
- ["TW",1442037600000,266,1479,28]
- ["BR",1442037600000,267,443,29]
- ["GB",1442037600000,544,3469,30]
- ["DE",1442037600000,544,2515,31]
- ["DE",1442037600000,544,2515,30]
- ["GB",1442037600000,544,3469,31]
- ["HK",1442037600000,636,1725,32]
- ["JP",1442037600000,2181,7873,33]
- ["US",1442037600000,3675,12996,34]
@ -254,12 +254,12 @@ expectedResults:
- ["HK",1442041200000,-15,1937,4]
- ["CL",1442041200000,-1,418,5]
- ["IQ",1442041200000,-1,5,6]
- ["VN",1442041200000,0,98,7]
- ["PH",1442041200000,0,219,8]
- ["PH",1442041200000,0,219,7]
- ["VN",1442041200000,0,98,8]
- ["TR",1442041200000,1,436,9]
- ["ID",1442041200000,2,-58,10]
- ["TH",1442041200000,3,91,11]
- ["BR",1442041200000,3,624,12]
- ["BR",1442041200000,3,624,11]
- ["TH",1442041200000,3,91,12]
- ["CA",1442041200000,5,43,13]
- ["GR",1442041200000,7,71,14]
- ["BG",1442041200000,9,19136,15]
@ -274,8 +274,8 @@ expectedResults:
- ["SG",1442041200000,59,2950,24]
- ["UA",1442041200000,74,3823,25]
- ["IN",1442041200000,80,5952,26]
- ["SE",1442041200000,91,89,27]
- ["HU",1442041200000,91,1380,28]
- ["HU",1442041200000,91,1380,27]
- ["SE",1442041200000,91,89,28]
- ["ES",1442041200000,118,53,29]
- ["ZM",1442041200000,133,133,30]
- ["AU",1442041200000,194,611,31]
@ -383,13 +383,13 @@ expectedResults:
- ["TH",1442052000000,-22,45,3]
- ["AO",1442052000000,-18,740,4]
- ["ES",1442052000000,-4,1684,5]
- ["KR",1442052000000,-3,4320,6]
- ["AE",1442052000000,-3,182,7]
- ["AE",1442052000000,-3,182,6]
- ["KR",1442052000000,-3,4320,7]
- ["US",1442052000000,-2,1837,8]
- ["BE",1442052000000,-1,279,9]
- ["OM",1442052000000,0,0,10]
- ["CN",1442052000000,0,622,10]
- ["IQ",1442052000000,0,3,11]
- ["CN",1442052000000,0,622,12]
- ["OM",1442052000000,0,0,12]
- ["SE",1442052000000,1,1448,13]
- ["SK",1442052000000,13,464,14]
- ["HK",1442052000000,15,828,15]
@ -431,11 +431,11 @@ expectedResults:
- ["LB",1442055600000,-67,-67,3]
- ["AR",1442055600000,-54,475,4]
- ["SE",1442055600000,-5,1432,5]
- ["JO",1442055600000,-2,2,6]
- ["KW",1442055600000,-2,1811,7]
- ["HU",1442055600000,-2,1427,8]
- ["TH",1442055600000,0,11,9]
- ["CH",1442055600000,0,50,10]
- ["HU",1442055600000,-2,1427,6]
- ["JO",1442055600000,-2,2,7]
- ["KW",1442055600000,-2,1811,8]
- ["CH",1442055600000,0,50,9]
- ["TH",1442055600000,0,11,10]
- ["BY",1442055600000,1,2061,11]
- ["MY",1442055600000,1,1459,12]
- ["IL",1442055600000,4,4377,13]
@ -478,13 +478,13 @@ expectedResults:
- ["CL",1442059200000,-12,355,5]
- ["AE",1442059200000,-11,6387,6]
- ["UA",1442059200000,-2,15681,7]
- ["SA",1442059200000,0,1206,8]
- ["DK",1442059200000,0,490,8]
- ["JO",1442059200000,0,2,9]
- ["MY",1442059200000,0,525,10]
- ["DK",1442059200000,0,490,11]
- ["SA",1442059200000,0,1206,11]
- ["HK",1442059200000,2,862,12]
- ["VN",1442059200000,8,1077,13]
- ["CN",1442059200000,8,345,14]
- ["CN",1442059200000,8,345,13]
- ["VN",1442059200000,8,1077,14]
- ["US",1442059200000,11,518,15]
- ["ID",1442059200000,17,-290,16]
- ["CZ",1442059200000,21,1358,17]
@ -523,9 +523,9 @@ expectedResults:
- ["TH",1442062800000,-46,8,6]
- ["NL",1442062800000,-30,311,7]
- ["PE",1442062800000,-12,360,8]
- ["MA",1442062800000,0,221,9]
- ["CZ",1442062800000,0,3331,10]
- ["KZ",1442062800000,0,191,11]
- ["CZ",1442062800000,0,3331,9]
- ["KZ",1442062800000,0,191,10]
- ["MA",1442062800000,0,221,11]
- ["DK",1442062800000,1,445,12]
- ["HK",1442062800000,1,1175,13]
- ["SK",1442062800000,6,372,14]
@ -565,11 +565,11 @@ expectedResults:
- ["CN",1442066400000,-15,51,4]
- ["MX",1442066400000,-1,631,5]
- ["AR",1442066400000,0,-29,6]
- ["SG",1442066400000,0,517,7]
- ["AT",1442066400000,0,4908,7]
- ["KZ",1442066400000,0,-243,8]
- ["AT",1442066400000,0,4908,9]
- ["LU",1442066400000,0,606,9]
- ["RS",1442066400000,0,832,10]
- ["LU",1442066400000,0,606,11]
- ["SG",1442066400000,0,517,11]
- ["MY",1442066400000,1,642,12]
- ["IL",1442066400000,3,1472,13]
- ["PT",1442066400000,12,3692,14]
@ -611,8 +611,8 @@ expectedResults:
- ["RO",1442070000000,-29,872,4]
- ["AU",1442070000000,-12,1326,5]
- ["MX",1442070000000,-1,3561,6]
- ["ZA",1442070000000,0,127,7]
- ["TH",1442070000000,0,-67,8]
- ["TH",1442070000000,0,-67,7]
- ["ZA",1442070000000,0,127,8]
- ["UG",1442070000000,1,1,9]
- ["GR",1442070000000,2,-235,10]
- ["MM",1442070000000,3,28,11]
@ -662,10 +662,10 @@ expectedResults:
- ["MX",1442073600000,-21,3181,5]
- ["MV",1442073600000,-3,-3,6]
- ["FI",1442073600000,-1,912,7]
- ["ME",1442073600000,0,0,8]
- ["HR",1442073600000,0,310,9]
- ["MY",1442073600000,1,732,10]
- ["EG",1442073600000,1,170,11]
- ["HR",1442073600000,0,310,8]
- ["ME",1442073600000,0,0,9]
- ["EG",1442073600000,1,170,10]
- ["MY",1442073600000,1,732,11]
- ["SA",1442073600000,2,1697,12]
- ["KG",1442073600000,6,6,13]
- ["RO",1442073600000,15,1377,14]
@ -754,23 +754,23 @@ expectedResults:
- ["BR",1442080800000,-267,5316,2]
- ["PT",1442080800000,-79,3750,3]
- ["SI",1442080800000,-45,-36,4]
- ["KW",1442080800000,-33,1778,5]
- ["KR",1442080800000,-33,4286,6]
- ["KR",1442080800000,-33,4286,5]
- ["KW",1442080800000,-33,1778,6]
- ["CZ",1442080800000,-28,2308,7]
- ["GE",1442080800000,-27,-140,8]
- ["CN",1442080800000,-10,51,9]
- ["PE",1442080800000,-2,-276,10]
- ["UA",1442080800000,-1,5776,11]
- ["TR",1442080800000,-1,2968,12]
- ["IN",1442080800000,0,19268,13]
- ["RS",1442080800000,0,900,14]
- ["HK",1442080800000,0,5894,15]
- ["TR",1442080800000,-1,2968,11]
- ["UA",1442080800000,-1,5776,12]
- ["HK",1442080800000,0,5894,13]
- ["IN",1442080800000,0,19268,14]
- ["RS",1442080800000,0,900,15]
- ["AR",1442080800000,1,122,16]
- ["BE",1442080800000,1,497,17]
- ["BO",1442080800000,4,4,18]
- ["JO",1442080800000,4,2,19]
- ["PY",1442080800000,5,634,20]
- ["MA",1442080800000,5,207,21]
- ["MA",1442080800000,5,207,20]
- ["PY",1442080800000,5,634,21]
- ["PL",1442080800000,7,866,22]
- ["MR",1442080800000,10,10,23]
- ["LT",1442080800000,12,-12,24]
@ -805,8 +805,8 @@ expectedResults:
- ["PE",1442084400000,-68,1597,4]
- ["NZ",1442084400000,-52,1032,5]
- ["KZ",1442084400000,-22,-248,6]
- ["VN",1442084400000,-10,1426,7]
- ["HR",1442084400000,-10,350,8]
- ["HR",1442084400000,-10,350,7]
- ["VN",1442084400000,-10,1426,8]
- ["AR",1442084400000,-5,752,9]
- ["LK",1442084400000,-3,131,10]
- ["AT",1442084400000,-2,7254,11]
@ -817,9 +817,9 @@ expectedResults:
- ["UA",1442084400000,5,3655,16]
- ["DO",1442084400000,8,264,17]
- ["CH",1442084400000,13,457,18]
- ["TH",1442084400000,13,-67,19]
- ["JP",1442084400000,13,1931,19]
- ["PL",1442084400000,13,1622,20]
- ["JP",1442084400000,13,1931,21]
- ["TH",1442084400000,13,-67,21]
- ["SE",1442084400000,37,278,22]
- ["PH",1442084400000,39,670,23]
- ["CA",1442084400000,44,3222,24]
@ -982,8 +982,8 @@ expectedResults:
- ["SE",1442098800000,0,97,9]
- ["GT",1442098800000,1,7,10]
- ["CZ",1442098800000,2,2140,11]
- ["PT",1442098800000,2,345,12]
- ["NO",1442098800000,2,31,13]
- ["NO",1442098800000,2,31,12]
- ["PT",1442098800000,2,345,13]
- ["IL",1442098800000,3,1847,14]
- ["NL",1442098800000,4,105,15]
- ["SK",1442098800000,7,367,16]
@ -1010,4 +1010,4 @@ expectedResults:
- ["PE",1442098800000,1861,1772,37]
- ["US",1442098800000,3575,8184,38]
- ["RU",1442098800000,12098,18578,39]
- ["",1442098800000,276159,1468959,40]
- ["",1442098800000,276159,1468959,40]

View File

@ -0,0 +1,23 @@
type: "failingTest"
sql: |
SELECT
__time,
"user",
page,
LAG(page, 1) OVER (PARTITION BY "user" ORDER BY __time) as priorPage
FROM wikipedia
expectedOperators:
- { type: "naiveSort", columns: [ { column: "user", direction: "ASC" }, { column: "__time", direction: "ASC" } ]}
- { type: "naivePartition", partitionColumns: [ "user" ] }
- type: "window"
processor:
type: "offset"
inputColumn: page
outputColumn: w0
offset: -1
# Not correct: there should actually be results for this query. But when the query runs, at the time I write this, it
# does not return any results at all. Therefore, we leave expectedResults empty and mark the test as a failingTest.
expectedResults: [ [ "to be added later, when fixing this test" ] ]