diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index 1ffa89ab247..b3f6ac20d53 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -217,11 +217,11 @@ public class MSQWindowTest extends MSQTestBase .add("w1", ColumnType.DOUBLE) .build(), ImmutableList.of( + new NaivePartitioningOperatorFactory(ImmutableList.of()), + new WindowOperatorFactory(proc1), new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"))), new NaivePartitioningOperatorFactory(ImmutableList.of("d0")), - new WindowOperatorFactory(proc), - new NaivePartitioningOperatorFactory(ImmutableList.of()), - new WindowOperatorFactory(proc1) + new WindowOperatorFactory(proc) ), null ); @@ -245,12 +245,12 @@ public class MSQWindowTest extends MSQTestBase .build()) .setExpectedRowSignature(rowSignature) .setExpectedResultRows(ImmutableList.of( - new Object[]{1.0f, 1.0, 1.0, 1.0}, - new Object[]{2.0f, 2.0, 2.0, 2.0}, - new Object[]{3.0f, 3.0, 3.0, 3.0}, - new Object[]{4.0f, 4.0, 4.0, 4.0}, - new Object[]{5.0f, 5.0, 5.0, 5.0}, - new Object[]{6.0f, 6.0, 6.0, 6.0} + new Object[]{1.0f, 1.0, 1.0, 21.0}, + new Object[]{2.0f, 2.0, 2.0, 21.0}, + new Object[]{3.0f, 3.0, 3.0, 21.0}, + new Object[]{4.0f, 4.0, 4.0, 21.0}, + new Object[]{5.0f, 5.0, 5.0, 21.0}, + new Object[]{6.0f, 6.0, 6.0, 21.0} )) .setQueryContext(context) .setExpectedCountersForStageWorkerChannel( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 20c672ce924..4f0f0eda21b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -68,9 +68,11 @@ import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; +import java.util.Objects; /** * Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}. @@ -123,45 +125,14 @@ public class Windowing { final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window"); - ArrayList ops = new ArrayList<>(); - + final List windowGroupProcessors = new ArrayList<>(); final List windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames()); + final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames()); int outputNameCounter = 0; - // Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if - // we really need to. - List priorPartitionColumns = null; - LinkedHashSet priorSortColumns = new LinkedHashSet<>(); - - final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); - if (priorCollation != null) { - // Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip - // the initial sort operator if the rows were already in the desired order. - priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature); - } - - for (int i = 0; i < window.groups.size(); ++i) { - final WindowGroup group = new WindowGroup(window, window.groups.get(i), sourceRowSignature); - - final LinkedHashSet sortColumns = new LinkedHashSet<>(); - for (String partitionColumn : group.getPartitionColumns()) { - sortColumns.add(ColumnWithDirection.ascending(partitionColumn)); - } - sortColumns.addAll(group.getOrdering()); - - // Add sorting and partitioning if needed. - if (!sortMatches(priorSortColumns, sortColumns)) { - // Sort order needs to change. Resort and repartition. - ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns))); - ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); - priorSortColumns = sortColumns; - priorPartitionColumns = group.getPartitionColumns(); - } else if (!group.getPartitionColumns().equals(priorPartitionColumns)) { - // Sort order doesn't need to change, but partitioning does. Only repartition. - ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); - priorPartitionColumns = group.getPartitionColumns(); - } + for (Window.Group windowGroup : window.groups) { + final WindowGroup group = new WindowGroup(window, windowGroup, sourceRowSignature); // Add aggregations. final List aggregateCalls = group.getAggregateCalls(); @@ -184,7 +155,8 @@ public class Windowing InputAccessor.buildFor( window, partialQuery.getSelectProject(), - sourceRowSignature), + sourceRowSignature + ), Collections.emptyList(), aggName, aggregateCall, @@ -232,12 +204,14 @@ public class Windowing throw new ISE("No processors from Window[%s], why was this code called?", window); } - ops.add(new WindowOperatorFactory( + windowGroupProcessors.add(new WindowComputationProcessor(group, new WindowOperatorFactory( processors.size() == 1 ? processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0])) - )); + ))); } + List ops = computeWindowOperations(partialQuery, sourceRowSignature, windowGroupProcessors); + // Apply windowProject, if present. if (partialQuery.getWindowProject() != null) { // We know windowProject is a mapping due to the isMapping() check in DruidRules. @@ -270,6 +244,119 @@ public class Windowing } } + /** + * Computes the list of operators that are to be applied in an optimised order + */ + private static List computeWindowOperations( + final PartialDruidQuery partialQuery, + final RowSignature sourceRowSignature, + List windowGroupProcessors + ) + { + final List ops = new ArrayList<>(); + // Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if + // we really need to. + List priorPartitionColumns = null; + LinkedHashSet priorSortColumns = new LinkedHashSet<>(); + + final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); + if (priorCollation != null) { + // Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip + // the initial sort operator if the rows were already in the desired order. + priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature); + } + + // sort the processors to optimise the order of window operators + // currently we are moving the empty groups to the front + windowGroupProcessors.sort(WindowComputationProcessor.MOVE_EMPTY_GROUPS_FIRST); + + for (WindowComputationProcessor windowComputationProcessor : windowGroupProcessors) { + final WindowGroup group = windowComputationProcessor.getGroup(); + final LinkedHashSet sortColumns = new LinkedHashSet<>(); + for (String partitionColumn : group.getPartitionColumns()) { + sortColumns.add(ColumnWithDirection.ascending(partitionColumn)); + } + sortColumns.addAll(group.getOrdering()); + + // Add sorting and partitioning if needed. + if (!sortMatches(priorSortColumns, sortColumns)) { + // Sort order needs to change. Resort and repartition. + ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns))); + ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); + priorSortColumns = sortColumns; + priorPartitionColumns = group.getPartitionColumns(); + } else if (!group.getPartitionColumns().equals(priorPartitionColumns)) { + // Sort order doesn't need to change, but partitioning does. Only repartition. + ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); + priorPartitionColumns = group.getPartitionColumns(); + } + + ops.add(windowComputationProcessor.getProcessorOperatorFactory()); + } + + return ops; + } + + private static class WindowComputationProcessor + { + private final WindowGroup group; + private final OperatorFactory processorOperatorFactory; + + public WindowComputationProcessor(WindowGroup group, OperatorFactory processorOperatorFactory) + { + this.group = group; + this.processorOperatorFactory = processorOperatorFactory; + } + + public WindowGroup getGroup() + { + return group; + } + + public OperatorFactory getProcessorOperatorFactory() + { + return processorOperatorFactory; + } + + /** + * Comparator to move the empty windows to the front + */ + public static final Comparator MOVE_EMPTY_GROUPS_FIRST = (o1, o2) -> { + if (o1.getGroup().getPartitionColumns().isEmpty() && o2.getGroup().getPartitionColumns().isEmpty()) { + return 0; + } + if (o1.getGroup().getPartitionColumns().isEmpty()) { + return -1; + } + if (o2.getGroup().getPartitionColumns().isEmpty()) { + return 1; + } + return 0; + }; + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WindowComputationProcessor obj = (WindowComputationProcessor) o; + return Objects.equals(group, obj.group) && Objects.equals( + processorOperatorFactory, + obj.processorOperatorFactory + ); + } + + @Override + public int hashCode() + { + return Objects.hash(group, processorOperatorFactory); + } + } + private final RowSignature signature; public Windowing( diff --git a/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest new file mode 100644 index 00000000000..0c9d88b5041 --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest @@ -0,0 +1,51 @@ +type: "operatorValidation" + +sql: | + SELECT + m1, + m2, + SUM(m1) OVER(PARTITION BY m2) as sum1, + SUM(m2) OVER() as sum2 + from druid.numfoo + GROUP BY m1,m2 + +expectedOperators: + - type: "naivePartition" + partitionColumns: [ ] + - type: "window" + processor: + type: "framedAgg" + frame: + peerType: "ROWS" + lowUnbounded: true + lowOffset: 0 + uppUnbounded: true + uppOffset: 0 + orderBy: null + aggregations: + - { "type": "doubleSum", "name": "w1", "fieldName": "_d1" } + - type: "naiveSort" + columns: + - column: "_d1" + direction: "ASC" + - type: "naivePartition" + partitionColumns: [ "_d1" ] + - type: "window" + processor: + type: "framedAgg" + frame: + peerType: "ROWS" + lowUnbounded: true + lowOffset: 0 + uppUnbounded: true + uppOffset: 0 + orderBy: null + aggregations: + - { "type": "doubleSum", "name": "w0", "fieldName": "_d0" } +expectedResults: + - [1.0, 1.0, 1.0, 21.0] + - [2.0, 2.0, 2.0, 21.0] + - [3.0, 3.0, 3.0, 21.0] + - [4.0, 4.0, 4.0, 21.0] + - [5.0, 5.0, 5.0, 21.0] + - [6.0, 6.0, 6.0, 21.0] \ No newline at end of file