From 7e3fab5bf961e2fbfdfc60e24b63ccc0b200e900 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Thu, 25 Jul 2024 04:57:36 +0200 Subject: [PATCH] Make WindowFrames more specific (#16741) Changes the WindowFrame internals / representation a bit; introduces dedicated frametypes for rows and groups which corresponds to the implemented processing methods --- docs/querying/sql-window-functions.md | 3 - .../apache/druid/msq/exec/MSQWindowTest.java | 69 ++--- .../query/operator/window/WindowFrame.java | 277 ++++++++++-------- .../DefaultFramedOnHeapAggregatable.java | 46 ++- .../operator/window/WindowFrameTest.java | 70 +++++ .../WindowFramedAggregateProcessorTest.java | 4 +- .../FramedOnHeapAggregatableTest.java | 77 ++--- .../druid/sql/calcite/rel/Windowing.java | 23 +- .../druid/sql/calcite/CalciteQueryTest.java | 2 +- .../sql/calcite/CalciteWindowQueryTest.java | 52 ++-- .../tests/window/WindowOpReorder.sqlTest | 16 +- .../tests/window/aggregateConstant.sqlTest | 2 +- .../window/defaultBoundCurrentRow.sqlTest | 43 +-- .../calcite/tests/window/no_grouping.sqlTest | 28 +- .../calcite/tests/window/no_grouping2.sqlTest | 2 +- .../tests/window/rank_handling.sqlTest | 3 + .../calcite/tests/window/simpleSum.sqlTest | 9 +- .../tests/window/virtualColumns.sqlTest | 2 +- ...ipediaAggregationsMultipleOrdering.sqlTest | 9 +- .../window/wikipediaCumulativeOrdered.sqlTest | 11 +- .../wikipediaFramedAggregations.sqlTest | 8 +- .../window/wikipediaSimplePartition.sqlTest | 2 +- ...ikipediaSimplePartitionInitialSort.sqlTest | 2 +- .../tests/window/windowed_long_null.sqlTest | 4 +- 24 files changed, 385 insertions(+), 379 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/operator/window/WindowFrameTest.java diff --git a/docs/querying/sql-window-functions.md b/docs/querying/sql-window-functions.md index d64538779f0..7c2c3aef53e 100644 --- a/docs/querying/sql-window-functions.md +++ b/docs/querying/sql-window-functions.md @@ -246,11 +246,8 @@ Druid has guardrail logic to prevent you from executing window function queries For example: - You cannot set expressions as bounds for window frames. -- You cannot use two FOLLOWING expressions in the window frame. For example: `ROWS BETWEEN 2 ROWS FOLLOWING and 3 ROWS FOLLOWING`. - You can only use a RANGE frames when both endpoints are unbounded or current row. -If you write a query that violates one of these conditions, Druid throws an error: "The query contains a window frame which may return incorrect results. To disregard this warning, set `windowingStrictValidation` to false in the query context." - ## Window function reference |Function|Notes| diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index afbedb7d704..5cc84ac6ee6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -106,7 +106,7 @@ public class MSQWindowTest extends MSQTestBase .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "d0") }; @@ -196,7 +196,7 @@ public class MSQWindowTest extends MSQTestBase .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "d1") }; @@ -306,7 +306,7 @@ public class MSQWindowTest extends MSQTestBase .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "d1") }; @@ -419,7 +419,7 @@ public class MSQWindowTest extends MSQTestBase .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "d0") }; @@ -523,7 +523,7 @@ public class MSQWindowTest extends MSQTestBase .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "d0") }; @@ -589,7 +589,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -654,7 +654,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -725,7 +725,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -793,7 +793,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "d1") }; @@ -878,7 +878,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -951,7 +951,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -1028,17 +1028,7 @@ public class MSQWindowTest extends MSQTestBase .add("m2", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame( - WindowFrame.PeerType.RANGE, - true, - 0, - false, - 0, - ImmutableList.of(new ColumnWithDirection( - "m1", - ColumnWithDirection.Direction.ASC - )) - ); + final WindowFrame theFrame = WindowFrame.forOrderBy("m1"); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -1142,14 +1132,7 @@ public class MSQWindowTest extends MSQTestBase .add("m2", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame( - WindowFrame.PeerType.ROWS, - true, - 0, - true, - 0, - null - ); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -1233,7 +1216,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.DOUBLE) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -1322,14 +1305,7 @@ public class MSQWindowTest extends MSQTestBase .add("d3", ColumnType.STRING) .build(); - final WindowFrame theFrame = new WindowFrame( - WindowFrame.PeerType.ROWS, - true, - 0, - true, - 0, - null - ); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -1412,14 +1388,7 @@ public class MSQWindowTest extends MSQTestBase .add("d3", ColumnType.STRING) .build(); - final WindowFrame theFrame = new WindowFrame( - WindowFrame.PeerType.ROWS, - true, - 0, - true, - 0, - null - ); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new DoubleSumAggregatorFactory("w0", "m1") }; @@ -1796,7 +1765,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.LONG) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new LongSumAggregatorFactory("w0", "added") }; @@ -1887,7 +1856,7 @@ public class MSQWindowTest extends MSQTestBase .add("cc", ColumnType.LONG) .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new LongSumAggregatorFactory("w0", "added") }; @@ -2001,7 +1970,7 @@ public class MSQWindowTest extends MSQTestBase .build(); - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null); + final WindowFrame theFrame = WindowFrame.unbounded(); final AggregatorFactory[] theAggs = { new LongSumAggregatorFactory("w0", "d1") }; diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java index fca50c25b28..2dd827d323e 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java @@ -21,159 +21,192 @@ package org.apache.druid.query.operator.window; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.annotations.SubclassesMustOverrideEqualsAndHashCode; + +import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; -public class WindowFrame +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "rows", value = WindowFrame.Rows.class), + @JsonSubTypes.Type(name = "groups", value = WindowFrame.Groups.class), +}) +@SubclassesMustOverrideEqualsAndHashCode +public interface WindowFrame { - public static WindowFrame unbounded() + static WindowFrame unbounded() { - return new WindowFrame(PeerType.ROWS, true, 0, true, 0, null); + return rows(null, null); } - @SuppressWarnings("unused") - public enum PeerType + static Rows rows(Integer lowerOffset, Integer upperOffset) { - ROWS, - RANGE + return new WindowFrame.Rows(lowerOffset, upperOffset); } - // Will likely need to add the order by columns to also be able to deal with RANGE peer type. - private final PeerType peerType; - private final boolean lowerUnbounded; - private final int lowerOffset; - private final boolean upperUnbounded; - private final int upperOffset; - private final List orderBy; - - @JsonCreator - public WindowFrame( - @JsonProperty("peerType") PeerType peerType, - @JsonProperty("lowUnbounded") boolean lowerUnbounded, - @JsonProperty("lowOffset") int lowerOffset, - @JsonProperty("uppUnbounded") boolean upperUnbounded, - @JsonProperty("uppOffset") int upperOffset, - @JsonProperty("orderBy") List orderBy - ) + static Groups groups(Integer lowerOffset, Integer upperOffset, List orderByColumns) { - this.peerType = peerType; - this.lowerUnbounded = lowerUnbounded; - this.lowerOffset = lowerOffset; - this.upperUnbounded = upperUnbounded; - this.upperOffset = upperOffset; - this.orderBy = orderBy; + return new WindowFrame.Groups(lowerOffset, upperOffset, orderByColumns); } - @JsonProperty("peerType") - public PeerType getPeerType() + static WindowFrame forOrderBy(String... orderByColumns) { - return peerType; + return groups(null, 0, Lists.newArrayList(orderByColumns)); } - @JsonProperty("lowUnbounded") - public boolean isLowerUnbounded() + abstract class OffsetFrame implements WindowFrame { - return lowerUnbounded; - } + @JsonProperty + public final Integer lowerOffset; + @JsonProperty + public final Integer upperOffset; - @JsonProperty("lowOffset") - public int getLowerOffset() - { - return lowerOffset; - } - - @JsonProperty("uppUnbounded") - public boolean isUpperUnbounded() - { - return upperUnbounded; - } - - @JsonProperty("uppOffset") - public int getUpperOffset() - { - return upperOffset; - } - - @JsonProperty("orderBy") - public List getOrderBy() - { - return orderBy; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; + @JsonCreator + public OffsetFrame( + @JsonProperty("lowerOffset") Integer lowerOffset, + @JsonProperty("upperOffset") Integer upperOffset) + { + this.lowerOffset = lowerOffset; + this.upperOffset = upperOffset; } - if (!(o instanceof WindowFrame)) { - return false; + + @Override + public int hashCode() + { + return Objects.hash(lowerOffset, upperOffset); } - WindowFrame that = (WindowFrame) o; - return lowerUnbounded == that.lowerUnbounded - && lowerOffset == that.lowerOffset - && upperUnbounded == that.upperUnbounded - && upperOffset == that.upperOffset - && peerType == that.peerType - && Objects.equals(orderBy, that.orderBy); - } - @Override - public int hashCode() - { - return Objects.hash(peerType, lowerUnbounded, lowerOffset, upperUnbounded, upperOffset, orderBy); - } - - @Override - public String toString() - { - return "WindowFrame{" + - "peerType=" + peerType + - ", lowerUnbounded=" + lowerUnbounded + - ", lowerOffset=" + lowerOffset + - ", upperUnbounded=" + upperUnbounded + - ", upperOffset=" + upperOffset + - ", orderBy=" + orderBy + - '}'; - } - - public static WindowFrame forOrderBy(ColumnWithDirection... orderBy) - { - return new WindowFrame(PeerType.RANGE, true, 0, false, 0, Lists.newArrayList(orderBy)); - } - - public List getOrderByColNames() - { - if (orderBy == null) { - return Collections.emptyList(); + /** + * Calculates the applicable lower offset if the max number of rows is + * known. + */ + public int getLowerOffsetClamped(int maxRows) + { + if (lowerOffset == null) { + return -maxRows; + } + return Math.max(-maxRows, lowerOffset); } - return orderBy.stream().map(ColumnWithDirection::getColumn).collect(Collectors.toList()); + + /** + * Calculates the applicable upper offset if the max number of rows is + * known. + */ + public int getUpperOffsetClamped(int maxRows) + { + if (upperOffset == null) { + return maxRows; + } + return Math.min(maxRows, upperOffset); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + OffsetFrame other = (OffsetFrame) obj; + return Objects.equals(lowerOffset, other.lowerOffset) && Objects.equals(upperOffset, other.upperOffset); + } + + @Override + public abstract String toString(); } - /** - * Calculates the applicable lower offset if the max number of rows is known. - */ - public int getLowerOffsetClamped(int maxRows) + class Rows extends OffsetFrame { - if (lowerUnbounded) { - return -maxRows; + @JsonCreator + public Rows( + @JsonProperty("lowerOffset") Integer lowerOffset, + @JsonProperty("upperOffset") Integer upperOffset) + { + super(lowerOffset, upperOffset); + } + + @Override + public String toString() + { + return "WindowFrame.Rows [" + + "lowerOffset=" + lowerOffset + + ", upperOffset=" + upperOffset + + "]"; } - return Math.max(-maxRows, lowerOffset); } - /** - * Calculates the applicable upper offset if the max number of rows is known. - */ - public int getUpperOffsetClamped(int maxRows) + class Groups extends OffsetFrame { - if (upperUnbounded) { - return maxRows; + @JsonProperty + private final ImmutableList orderByColumns; + + @JsonCreator + public Groups( + @JsonProperty("lowerOffset") Integer lowerOffset, + @JsonProperty("upperOffset") Integer upperOffset, + @JsonProperty("orderByColumns") List orderByColumns) + { + super(lowerOffset, upperOffset); + this.orderByColumns = ImmutableList.copyOf(orderByColumns); } - return Math.min(maxRows, upperOffset); + + public List getOrderByColumns() + { + return orderByColumns; + } + + @Override + public int hashCode() + { + return Objects.hash(lowerOffset, orderByColumns, upperOffset); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Groups other = (Groups) obj; + return Objects.equals(lowerOffset, other.lowerOffset) + && Objects.equals(orderByColumns, other.orderByColumns) + && Objects.equals(upperOffset, other.upperOffset); + } + + @Override + public String toString() + { + return "WindowFrame.Groups [" + + "lowerOffset=" + lowerOffset + + ", upperOffset=" + upperOffset + + ", orderByColumns=" + orderByColumns + "]"; + } + } + + @SuppressWarnings("unchecked") + @Nullable + default T unwrap(Class clazz) + { + if (clazz.isInstance(this)) { + return (T) this; + } + return null; } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java index 83952873050..7130fafd867 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java @@ -21,6 +21,7 @@ package org.apache.druid.query.rowsandcols.semantic; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.Aggregator; @@ -28,6 +29,9 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.query.operator.window.WindowFrame; +import org.apache.druid.query.operator.window.WindowFrame.Groups; +import org.apache.druid.query.operator.window.WindowFrame.OffsetFrame; +import org.apache.druid.query.operator.window.WindowFrame.Rows; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.ColumnSelectorFactory; @@ -106,22 +110,38 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable public static Iterable buildIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) { int numRows = rac.numRows(); - if (frame.getLowerOffsetClamped(numRows) == -numRows && frame.getUpperOffsetClamped(numRows) == numRows) { - return buildUnboundedIteratorFor(rac, frame); - } else if (frame.getPeerType() == WindowFrame.PeerType.RANGE) { - return buildGroupIteratorFor(rac, frame); - } else { - return buildRowIteratorFor(rac, frame); + if (isEffectivelyUnbounded(frame, numRows)) { + return buildUnboundedIteratorFor(rac); } + Rows rowsFrame = frame.unwrap(WindowFrame.Rows.class); + if (rowsFrame != null) { + return buildRowIteratorFor(rac, rowsFrame); + } + Groups groupsFrame = frame.unwrap(WindowFrame.Groups.class); + if (groupsFrame != null) { + return buildGroupIteratorFor(rac, groupsFrame); + } + throw DruidException.defensive("Unable to handle WindowFrame [%s]!", frame); } - private static Iterable buildUnboundedIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) + private static boolean isEffectivelyUnbounded(WindowFrame frame, int numRows) { - int[] groupBoundaries = new int[]{0, rac.numRows()}; - return new GroupIteratorForWindowFrame(frame, groupBoundaries); + OffsetFrame offsetFrame = frame.unwrap(WindowFrame.OffsetFrame.class); + if (offsetFrame.getLowerOffsetClamped(numRows) == -numRows + && offsetFrame.getUpperOffsetClamped(numRows) == numRows) { + // regardless the actual mode; all rows will be inside the frame! + return true; + } + return false; } - private static Iterable buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) + private static Iterable buildUnboundedIteratorFor(AppendableRowsAndColumns rac) + { + int[] groupBoundaries = new int[] {0, rac.numRows()}; + return new GroupIteratorForWindowFrame(WindowFrame.rows(null, null), groupBoundaries); + } + + private static Iterable buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame.Rows frame) { int[] groupBoundaries = new int[rac.numRows() + 1]; for (int j = 0; j < groupBoundaries.length; j++) { @@ -130,9 +150,9 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable return new GroupIteratorForWindowFrame(frame, groupBoundaries); } - private static Iterable buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame) + private static Iterable buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame.Groups frame) { - int[] groupBoundaries = ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames()); + int[] groupBoundaries = ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColumns()); return new GroupIteratorForWindowFrame(frame, groupBoundaries); } @@ -145,7 +165,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable // upper exclusive private final int upperOffset; - public GroupIteratorForWindowFrame(WindowFrame frame, int[] groupBoundaries) + public GroupIteratorForWindowFrame(WindowFrame.OffsetFrame frame, int[] groupBoundaries) { this.groupBoundaries = groupBoundaries; numGroups = groupBoundaries.length - 1; diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFrameTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFrameTest.java new file mode 100644 index 00000000000..855f4694f43 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFrameTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator.window; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.query.operator.window.WindowFrame.OffsetFrame; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class WindowFrameTest +{ + @Test + public void testEqualsRows() + { + EqualsVerifier.forClass(WindowFrame.Rows.class) + .usingGetClass() + .verify(); + } + + @Test + public void testEqualsGroups() + { + EqualsVerifier.forClass(WindowFrame.Groups.class) + .usingGetClass() + .verify(); + } + + @Test + public void testOffsetFrameUnbounded() + { + OffsetFrame of = new WindowFrame.Rows(null, null); + assertEquals(-100, of.getLowerOffsetClamped(100)); + assertEquals(100, of.getUpperOffsetClamped(100)); + } + + @Test + public void testOffsetFrameNormal() + { + OffsetFrame of = new WindowFrame.Rows(-1, 2); + assertEquals(-1, of.getLowerOffsetClamped(100)); + assertEquals(2, of.getUpperOffsetClamped(100)); + } + + @Test + public void testOffsetFrameUnbounded2() + { + OffsetFrame of = new WindowFrame.Rows(-200, 200); + assertEquals(-100, of.getLowerOffsetClamped(100)); + assertEquals(100, of.getUpperOffsetClamped(100)); + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java index 5af321b53c8..9bae78bc2cc 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java @@ -46,7 +46,7 @@ public class WindowFramedAggregateProcessorTest @Test public void testIsPassThruWhenRACReturnsSemanticInterface() { - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0, null); + final WindowFrame theFrame = WindowFrame.rows(null, 0); final AggregatorFactory[] theAggs = { new LongMaxAggregatorFactory("cummMax", "intCol"), new DoubleSumAggregatorFactory("cummSum", "doubleCol") @@ -78,7 +78,7 @@ public class WindowFramedAggregateProcessorTest @Test public void testDoesStuffWhenNoSemanticInterfacesAvailable() { - final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0, null); + final WindowFrame theFrame = WindowFrame.rows(null, 0); final AggregatorFactory[] theAggs = { new LongSumAggregatorFactory("sum", "intCol") }; diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java index d5b11f7a612..41ceb315a04 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java @@ -25,10 +25,8 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.operator.window.WindowFrame; -import org.apache.druid.query.operator.window.WindowFrame.PeerType; import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; @@ -65,7 +63,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 0, null), + WindowFrame.rows(0, 0), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -91,7 +89,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 2, null), + WindowFrame.rows(-1, 2), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -117,7 +115,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 2, null), + WindowFrame.rows(0, 2), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -143,7 +141,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, -2, false, 0, null), + WindowFrame.rows(-2, 0), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -169,7 +167,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 7, null), + WindowFrame.rows(-5, 7), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -197,7 +195,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 1, null), + WindowFrame.rows(-5, 1), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -225,7 +223,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null), + WindowFrame.rows(-5, 0), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -253,7 +251,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 7, null), + WindowFrame.rows(-1, 7), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -281,7 +279,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 7, null), + WindowFrame.rows(0, 7), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -309,7 +307,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 7, null), + WindowFrame.rows(0, 7), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -337,7 +335,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null), + WindowFrame.rows(-5, 0), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), @@ -371,7 +369,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null), + WindowFrame.unbounded(), new AggregatorFactory[]{ new LongSumAggregatorFactory("sumFromLong", "intCol"), new LongSumAggregatorFactory("sumFromDouble", "doubleCol"), @@ -409,7 +407,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0, null), + WindowFrame.rows(null, 0), new AggregatorFactory[]{ new LongMaxAggregatorFactory("cummMax", "intCol"), new DoubleSumAggregatorFactory("cummSum", "doubleCol") @@ -443,7 +441,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac); final RowsAndColumns results = agger.aggregateAll( - new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, true, 0, null), + WindowFrame.rows(0, null), new AggregatorFactory[]{ new LongMaxAggregatorFactory("cummMax", "intCol"), new DoubleSumAggregatorFactory("cummSum", "doubleCol") @@ -465,7 +463,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase @Test public void testRangeOrderBy() { - WindowFrame frame = WindowFrame.forOrderBy(ColumnWithDirection.ascending("c1")); + WindowFrame frame = WindowFrame.forOrderBy("c1"); int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2}; int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2}; int[] resVals = new int[] {4, 4, 4, 8, 8, 8, 13, 13, 13, 13}; @@ -476,14 +474,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase @Test public void testRangeB1() { - WindowFrame frame = new WindowFrame( - PeerType.RANGE, - false, - -1, - false, - 0, - Collections.singletonList(ColumnWithDirection.ascending("c1")) - ); + WindowFrame frame = WindowFrame.groups(-1, 0, Collections.singletonList("c1")); int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5}; int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5}; @@ -495,14 +486,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase @Test public void testRangeA1() { - WindowFrame frame = new WindowFrame( - PeerType.RANGE, - false, - 0, - false, - 1, - Collections.singletonList(ColumnWithDirection.ascending("c1")) - ); + WindowFrame frame = WindowFrame.groups(0, 1, Collections.singletonList("c1")); int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5}; int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5}; @@ -514,14 +498,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase @Test public void testRangeB1A1() { - WindowFrame frame = new WindowFrame( - PeerType.RANGE, - false, - -1, - false, - 1, - Collections.singletonList(ColumnWithDirection.ascending("c1")) - ); + WindowFrame frame = WindowFrame.groups(-1, 1, Collections.singletonList("c1")); int[] c1Vals = new int[] {0, 1, 2, 3, 4, 5}; int[] c2Vals = new int[] {0, 1, 2, 3, 4, 5}; @@ -534,14 +511,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase @Test public void testRangeB1A1_2() { - WindowFrame frame = new WindowFrame( - PeerType.RANGE, - false, - -1, - false, - 1, - Collections.singletonList(ColumnWithDirection.ascending("c1")) - ); + WindowFrame frame = WindowFrame.groups(-1, 1, Collections.singletonList("c1")); int[] c1Vals = new int[] {0, 0, 1, 2, 3, 3, 4, 4, 5}; int[] c2Vals = new int[] {0, 0, 1, 2, 2, 1, 2, 2, 5}; @@ -553,14 +523,7 @@ public class FramedOnHeapAggregatableTest extends SemanticTestBase @Test public void testRangeB1A2() { - WindowFrame frame = new WindowFrame( - PeerType.RANGE, - false, - -1, - false, - 2, - Collections.singletonList(ColumnWithDirection.ascending("c1")) - ); + WindowFrame frame = WindowFrame.groups(-1, 2, Collections.singletonList("c1")); int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3}; int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1}; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 39c180530f0..8e1fc3ee275 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -449,19 +449,22 @@ public class Windowing if (group.lowerBound.isUnbounded() && group.upperBound.isUnbounded()) { return WindowFrame.unbounded(); } - return new WindowFrame( - group.isRows ? WindowFrame.PeerType.ROWS : WindowFrame.PeerType.RANGE, - group.lowerBound.isUnbounded(), - figureOutOffset(group.lowerBound), - group.upperBound.isUnbounded(), - figureOutOffset(group.upperBound), - group.isRows ? null : getOrdering() - ); + if (group.isRows) { + return WindowFrame.rows(getBoundAsInteger(group.lowerBound), getBoundAsInteger(group.upperBound)); + } else { + /* Right now we support GROUPS based framing in the native layer; + * but the SQL layer doesn't accept that as of now. + */ + return WindowFrame.groups(getBoundAsInteger(group.lowerBound), getBoundAsInteger(group.upperBound), getOrderingColumNames()); + } } - private int figureOutOffset(RexWindowBound bound) + private Integer getBoundAsInteger(RexWindowBound bound) { - if (bound.isUnbounded() || bound.isCurrentRow()) { + if (bound.isUnbounded()) { + return null; + } + if (bound.isCurrentRow()) { return 0; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 7b5749bd8a1..5ceba91eb37 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -15900,7 +15900,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest OperatorFactoryBuilders.naivePartitionOperator(), OperatorFactoryBuilders.windowOperators( OperatorFactoryBuilders.framedAggregateProcessor( - WindowFrame.forOrderBy(ColumnWithDirection.ascending("d0")), + WindowFrame.forOrderBy("d0"), new LongSumAggregatorFactory("w0", "a0") ) ) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index 9fdd73fb9c7..165b4aa3f63 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -45,8 +45,10 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.File; import java.net.URL; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import static org.junit.Assert.assertEquals; @@ -66,6 +68,11 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest private static final ObjectMapper YAML_JACKSON = new DefaultObjectMapper(new YAMLFactory(), "tests"); + private static final Map DEFAULT_QUERY_CONTEXT = ImmutableMap.of( + PlannerContext.CTX_ENABLE_WINDOW_FNS, true, + QueryContexts.ENABLE_DEBUG, true + ); + public static Object[] parametersForWindowQueryTest() throws Exception { final URL windowFolderUrl = ClassLoader.getSystemResource("calcite/tests/window"); @@ -184,6 +191,11 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest log.info("Actual results:\n%s", sb.toString()); } } + + public Map getQueryContext() + { + return input.queryContext == null ? Collections.emptyMap() : input.queryContext; + } } @MethodSource("parametersForWindowQueryTest") @@ -199,32 +211,11 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest testBuilder() .skipVectorize(true) .sql(testCase.getSql()) - .queryContext(ImmutableMap.of( - PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.ENABLE_DEBUG, true - )) - .addCustomVerification(QueryVerification.ofResults(testCase)) - .run(); - } - } - - @MethodSource("parametersForWindowQueryTest") - @ParameterizedTest(name = "{0}") - @SuppressWarnings("unchecked") - public void windowQueryTestWithCustomContextMaxSubqueryBytes(String filename) throws Exception - { - TestCase testCase = new TestCase(filename); - - assumeTrue(testCase.getType() != TestType.failingTest); - - if (testCase.getType() == TestType.operatorValidation) { - testBuilder() - .skipVectorize(true) - .sql(testCase.getSql()) - .queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true, - PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000" - ) + .queryContext( + ImmutableMap.builder() + .putAll(DEFAULT_QUERY_CONTEXT) + .putAll(testCase.getQueryContext()) + .build() ) .addCustomVerification(QueryVerification.ofResults(testCase)) .run(); @@ -241,10 +232,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest + "where countryName in ('Austria', 'Republic of Korea') " + "and (cityName in ('Vienna', 'Seoul') or cityName is null)\n" + "group by countryName, cityName, channel") - .queryContext(ImmutableMap.of( - PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.ENABLE_DEBUG, true - )) + .queryContext(DEFAULT_QUERY_CONTEXT) .expectedResults( ResultMatchMode.RELAX_NULLS, ImmutableList.of( @@ -277,9 +265,13 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest failingTest, operatorValidation } + @JsonProperty public TestType type; + @JsonProperty + public Map queryContext; + @JsonProperty public String sql; diff --git a/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest index 0c9d88b5041..a2f82ff2905 100644 --- a/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest @@ -15,13 +15,7 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: - peerType: "ROWS" - lowUnbounded: true - lowOffset: 0 - uppUnbounded: true - uppOffset: 0 - orderBy: null + frame: { type: "rows" } aggregations: - { "type": "doubleSum", "name": "w1", "fieldName": "_d1" } - type: "naiveSort" @@ -33,13 +27,7 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: - peerType: "ROWS" - lowUnbounded: true - lowOffset: 0 - uppUnbounded: true - uppOffset: 0 - orderBy: null + frame: { type: "rows" } aggregations: - { "type": "doubleSum", "name": "w0", "fieldName": "_d0" } expectedResults: diff --git a/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest b/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest index 16dbe924fdb..e65e27e8794 100644 --- a/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest @@ -13,7 +13,7 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 } + frame: { type: "rows" } aggregations: - { type: "count", name: "w0" } diff --git a/sql/src/test/resources/calcite/tests/window/defaultBoundCurrentRow.sqlTest b/sql/src/test/resources/calcite/tests/window/defaultBoundCurrentRow.sqlTest index d5a324c9e2d..aa0a4a2a019 100644 --- a/sql/src/test/resources/calcite/tests/window/defaultBoundCurrentRow.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/defaultBoundCurrentRow.sqlTest @@ -19,60 +19,43 @@ expectedOperators: processor: type: "framedAgg" frame: - peerType: "ROWS" - lowUnbounded: true - lowOffset: 0 - uppUnbounded: false - uppOffset: 0 - orderBy: null + type: "rows" + upperOffset: 0 aggregations: - { type: "count", name: "w0" } - type: "window" processor: type: "framedAgg" frame: - peerType: "ROWS" - lowUnbounded: false - lowOffset: -1 - uppUnbounded: false - uppOffset: 0 - orderBy: null + type: "rows" + lowerOffset: -1 + upperOffset: 0 aggregations: - { type: "count", name: "w1" } - type: "window" processor: type: "framedAgg" frame: - peerType: "ROWS" - lowUnbounded: false - lowOffset: 0 - uppUnbounded: false - uppOffset: 0 - orderBy: null + type: "rows" + lowerOffset: 0 + upperOffset: 0 aggregations: - { type: "count", name: "w2" } - type: "window" processor: type: "framedAgg" frame: - peerType: "ROWS" - lowUnbounded: false - lowOffset: 0 - uppUnbounded: false - uppOffset: 1 - orderBy: null + type: "rows" + lowerOffset: 0 + upperOffset: 1 aggregations: - { type: "count", name: "w3" } - type: "window" processor: type: "framedAgg" frame: - peerType: "ROWS" - lowUnbounded: false - lowOffset: 0 - uppUnbounded: true - uppOffset: 0 - orderBy: null + type: "rows" + lowerOffset: 0 aggregations: - { type: "count", name: "w4" } diff --git a/sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest b/sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest index 7c9dae4aad3..2b6f7f7fddb 100644 --- a/sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest @@ -1,9 +1,9 @@ -type: "failingTest" +type: "operatorValidation" sql: | SELECT m1, - COUNT(m1) OVER () cc + SUM(m1) OVER () cc FROM druid.foo expectedOperators: @@ -12,18 +12,16 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 } + frame: { type: "rows" } aggregations: - - type: "filtered" - aggregator: {"type":"count","name":"w0"} - filter: - type: not - field: {"type":"null","column":"m1"} - name: null + - type: doubleSum + name: w0 + fieldName: m1 + expectedResults: - - [1.0,6] - - [2.0,6] - - [3.0,6] - - [4.0,6] - - [5.0,6] - - [6.0,6] + - [1.0,21.0] + - [2.0,21.0] + - [3.0,21.0] + - [4.0,21.0] + - [5.0,21.0] + - [6.0,21.0] diff --git a/sql/src/test/resources/calcite/tests/window/no_grouping2.sqlTest b/sql/src/test/resources/calcite/tests/window/no_grouping2.sqlTest index 7a579c3fc33..4d78b197e31 100644 --- a/sql/src/test/resources/calcite/tests/window/no_grouping2.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/no_grouping2.sqlTest @@ -12,7 +12,7 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 } + frame: { type: rows } aggregations: - type: "doubleSum" name: "w0" diff --git a/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest b/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest index 0e66ed87460..1e4de22dfca 100644 --- a/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest @@ -1,5 +1,8 @@ type: "operatorValidation" +queryContext: + maxSubqueryBytes: 100000 + sql: | SELECT __time diff --git a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest index 9ca9f88e850..84bd5ca71af 100644 --- a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest @@ -14,12 +14,9 @@ expectedOperators: processor: type: "framedAgg" frame: - peerType: "RANGE" - lowUnbounded: true - lowOffset: 0 - uppUnbounded: false - uppOffset: 0 - orderBy: [ {column: "d0", direction: ASC} ] + type: groups + upperOffset: 0 + orderByColumns: [ "d0" ] aggregations: - { type: "longSum", name: "w0", fieldName: "a0" } diff --git a/sql/src/test/resources/calcite/tests/window/virtualColumns.sqlTest b/sql/src/test/resources/calcite/tests/window/virtualColumns.sqlTest index 0a86a691e26..9b5aa6e1365 100644 --- a/sql/src/test/resources/calcite/tests/window/virtualColumns.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/virtualColumns.sqlTest @@ -19,7 +19,7 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 } + frame: { type: "rows" } aggregations: - { type: "doubleMin", name: "w0", fieldName: "_v0" } - { type: "longMin", name: "w1", fieldName: "v1" } diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest index cc59868482a..b4ef8006ea9 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest @@ -16,12 +16,9 @@ expectedOperators: processor: type: "framedAgg" frame: - peerType: "ROWS" - lowUnbounded: false - lowOffset: -3 - uppUnbounded: false - uppOffset: 2 - orderBy: null + type: rows + lowerOffset: -3 + upperOffset: 2 aggregations: - { type: "longSum", name: "w0", fieldName: "a0" } - { type: "naiveSort", columns: [ { column: "d1", direction: "ASC" }, { column: "a0", direction: "ASC"} ]} diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaCumulativeOrdered.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaCumulativeOrdered.sqlTest index 9368f00e9b4..ebcc060eaa5 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaCumulativeOrdered.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaCumulativeOrdered.sqlTest @@ -39,14 +39,9 @@ expectedOperators: - { "type": "cumeDist", "group": [ "a0" ], "outputColumn": "w9" } - type: "framedAgg" frame: - peerType: "RANGE" - lowUnbounded: true - lowOffset: 0 - uppUnbounded: false - uppOffset: 0 - orderBy: - - column: a0 - direction: ASC + type: groups + upperOffset: 0 + orderByColumns: [ a0 ] aggregations: - { "type": "longSum", "name": "w0", "fieldName": "a0" } diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest index c25f1ff0352..87873d44c48 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest @@ -15,11 +15,9 @@ expectedOperators: processor: type: "framedAgg" frame: - peerType: "ROWS" - lowUnbounded: false - lowOffset: -3 - uppUnbounded: false - uppOffset: 2 + type: "rows" + lowerOffset: -3 + upperOffset: 2 orderBy: null aggregations: - { type: "longSum", name: "w0", fieldName: "a0" } diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartition.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartition.sqlTest index 1e75e69b97b..3843519aa79 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartition.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartition.sqlTest @@ -19,7 +19,7 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 } + frame: { type: "rows" } aggregations: - { "type": "longSum", "name": "w0", "fieldName": "a0" } - type: "window" diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartitionInitialSort.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartitionInitialSort.sqlTest index d310f6a8f1c..4939057621e 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartitionInitialSort.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartitionInitialSort.sqlTest @@ -28,7 +28,7 @@ expectedOperators: - { "type": "last", "inputColumn": "a0", "outputColumn": "w2" } - { "type": "percentile", "outputColumn": "w3", "numBuckets": 3 } - type: "framedAgg" - frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 } + frame: { type: "rows" } aggregations: - { "type": "longSum", "name": "w0", "fieldName": "a0" } - type: "window" diff --git a/sql/src/test/resources/calcite/tests/window/windowed_long_null.sqlTest b/sql/src/test/resources/calcite/tests/window/windowed_long_null.sqlTest index c96b979c0da..7c7fd03c3c8 100644 --- a/sql/src/test/resources/calcite/tests/window/windowed_long_null.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/windowed_long_null.sqlTest @@ -19,7 +19,7 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, uppUnbounded: true, uppOffset: 0 } + frame: { type: rows } aggregations: - { type: "longMin", name: "w0", fieldName: "l2" } - type: "naiveSort" @@ -31,7 +31,7 @@ expectedOperators: - type: "window" processor: type: "framedAgg" - frame: { peerType: "RANGE", lowUnbounded: true, lowOffset: 0, uppUnbounded: false, uppOffset: 0, orderBy: [{ column: l1, direction: ASC }] } + frame: { type: groups, upperOffset: 0, orderByColumns: [ l1 ] } aggregations: - { type: "longMin", name: "w1", fieldName: "l2" }