diff --git a/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java b/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java index 687e26e9ff2..9c62e2969ca 100644 --- a/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java +++ b/extensions-core/stats/src/test/java/org/apache/druid/query/aggregation/variance/sql/VarianceSqlAggregatorTest.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DoubleDimensionSchema; import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.error.DruidException; import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.math.expr.ExprMacroTable; @@ -64,6 +65,7 @@ import org.apache.druid.sql.calcite.SqlTestFrameworkConfig; import org.apache.druid.sql.calcite.TempDirProducer; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.run.EngineFeature; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier; import org.apache.druid.sql.calcite.util.TestDataBuilder; @@ -73,6 +75,9 @@ import org.junit.jupiter.api.Test; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + @SqlTestFrameworkConfig.ComponentSupplier(VarianceComponentSupplier.class) public class VarianceSqlAggregatorTest extends BaseCalciteQueryTest { @@ -724,4 +729,23 @@ public class VarianceSqlAggregatorTest extends BaseCalciteQueryTest )) .run(); } + + @Test + public void testStddevNotSupportedOverWindow() + { + assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS); + + DruidException e = assertThrows( + DruidException.class, + () -> testBuilder() + .queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true)) + .sql("SELECT stddev(m1) OVER () from numfoo") + .run() + ); + + assertEquals( + "Query could not be planned. A possible reason is [Aggregation [STDDEV] is currently not supported for window functions]", + e.getMessage() + ); + } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java index 04f9eddbff0..05b9dee5458 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java @@ -212,15 +212,14 @@ public class ArrayListRowsAndColumns implements AppendableRowsAndColumn @Override public void addColumn(String name, Column column) { - if (rows.size() == numRows()) { + if (rows.size() == numRows() && column.as(ColumnValueSwapper.class) != null) { extraColumns.put(name, column); columnNames.add(name); return; } // When an ArrayListRowsAndColumns is only a partial view, but adds a column, it believes that the same column - // will eventually be added for all of the rows so we pre-allocate storage for the entire set of data and - // copy. + // will eventually be added for all the rows so we pre-allocate storage for the entire set of data and copy. final ColumnAccessor columnAccessor = column.toAccessor(); if (columnAccessor.numRows() != numRows()) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index 75778daf559..18638b32afd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -36,6 +36,7 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlSelectKeyword; import org.apache.calcite.sql.SqlUpdate; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWindow; @@ -110,6 +111,10 @@ public class DruidSqlValidator extends BaseDruidSqlValidator @Override public void validateWindow(SqlNode windowOrId, SqlValidatorScope scope, @Nullable SqlCall call) { + if (isSqlCallDistinct(call)) { + throw buildCalciteContextException("DISTINCT is not supported for window functions", windowOrId); + } + final SqlWindow targetWindow; switch (windowOrId.getKind()) { case IDENTIFIER: @@ -857,4 +862,11 @@ public class DruidSqlValidator extends BaseDruidSqlValidator } return src; } + + private boolean isSqlCallDistinct(@Nullable SqlCall call) + { + return call != null + && call.getFunctionQuantifier() != null + && call.getFunctionQuantifier().getValue() == SqlSelectKeyword.DISTINCT; + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index afd775ef4ee..39c180530f0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -160,15 +160,13 @@ public class Windowing Collections.emptyList(), aggName, aggregateCall, - false // Windowed aggregations don't currently finalize. This means that sketches won't work as expected. + false // Windowed aggregations finalize later when we write the computed value to result RAC ); if (aggregation == null || aggregation.getPostAggregator() != null || aggregation.getAggregatorFactories().size() != 1) { - if (null == plannerContext.getPlanningError()) { - plannerContext.setPlanningError("Aggregation [%s] is not supported", aggregateCall); - } + plannerContext.setPlanningError("Aggregation [%s] is currently not supported for window functions", aggregateCall.getAggregation().getName()); throw new CannotBuildQueryException(window, aggregateCall); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 0ead2f248aa..29751279339 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -15562,6 +15562,22 @@ public class CalciteQueryTest extends BaseCalciteQueryTest assertThat(e, invalidSqlContains("Framing of NTILE is not supported")); } + @Test + public void testDistinctNotSupportedWithWindow() + { + assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS); + + DruidException e = assertThrows( + DruidException.class, + () -> testBuilder() + .queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true)) + .sql("SELECT count(distinct dim1) OVER () from druid.foo") + .run() + ); + + assertThat(e, invalidSqlContains("DISTINCT is not supported for window functions")); + } + @Test public void testInGroupByLimitOutGroupByOrderBy() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index ab20925107f..9fdd73fb9c7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -231,95 +231,6 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest } } - @Test - public void testEmptyWindowInSubquery() - { - testBuilder() - .sql( - "select c from (\n" - + " select channel, row_number() over () as c\n" - + " from wikipedia\n" - + " group by channel\n" - + ") LIMIT 5" - ) - .queryContext(ImmutableMap.of( - PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.ENABLE_DEBUG, true - )) - .expectedResults(ImmutableList.of( - new Object[]{1L}, - new Object[]{2L}, - new Object[]{3L}, - new Object[]{4L}, - new Object[]{5L} - )) - .run(); - } - - @Test - public void testWindow() - { - testBuilder() - .sql("SELECT\n" + - "(rank() over (order by count(*) desc)),\n" + - "(rank() over (order by count(*) desc))\n" + - "FROM \"wikipedia\"") - .queryContext(ImmutableMap.of( - PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.ENABLE_DEBUG, true - )) - .expectedResults(ImmutableList.of( - new Object[]{1L, 1L} - )) - .run(); - } - - @Test - public void testWindowAllBoundsCombination() - { - testBuilder() - .sql("select\n" - + "cityName,\n" - + "count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 preceding) c1,\n" - + "count(*) over (partition by cityName order by countryName rows between unbounded preceding and current row) c2,\n" - + "count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 following) c3,\n" - + "count(*) over (partition by cityName order by countryName rows between unbounded preceding and unbounded following) c4,\n" - + "count(*) over (partition by cityName order by countryName rows between 3 preceding and 1 preceding) c5,\n" - + "count(*) over (partition by cityName order by countryName rows between 1 preceding and current row) c6,\n" - + "count(*) over (partition by cityName order by countryName rows between 1 preceding and 1 FOLLOWING) c7,\n" - + "count(*) over (partition by cityName order by countryName rows between 1 preceding and unbounded FOLLOWING) c8,\n" - + "count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and unbounded FOLLOWING) c9,\n" - + "count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and 3 FOLLOWING) c10,\n" - + "count(*) over (partition by cityName order by countryName rows between current row and 1 following) c11,\n" - + "count(*) over (partition by cityName order by countryName rows between current row and unbounded following) c12\n" - + "from wikipedia\n" - + "where cityName in ('Vienna', 'Seoul')\n" - + "group by countryName, cityName, added") - .queryContext(ImmutableMap.of( - PlannerContext.CTX_ENABLE_WINDOW_FNS, true, - QueryContexts.ENABLE_DEBUG, true - )) - .expectedResults(ImmutableList.of( - new Object[]{"Seoul", 0L, 1L, 2L, 13L, 0L, 1L, 2L, 13L, 12L, 3L, 2L, 13L}, - new Object[]{"Seoul", 1L, 2L, 3L, 13L, 1L, 2L, 3L, 13L, 11L, 3L, 2L, 12L}, - new Object[]{"Seoul", 2L, 3L, 4L, 13L, 2L, 2L, 3L, 12L, 10L, 3L, 2L, 11L}, - new Object[]{"Seoul", 3L, 4L, 5L, 13L, 3L, 2L, 3L, 11L, 9L, 3L, 2L, 10L}, - new Object[]{"Seoul", 4L, 5L, 6L, 13L, 3L, 2L, 3L, 10L, 8L, 3L, 2L, 9L}, - new Object[]{"Seoul", 5L, 6L, 7L, 13L, 3L, 2L, 3L, 9L, 7L, 3L, 2L, 8L}, - new Object[]{"Seoul", 6L, 7L, 8L, 13L, 3L, 2L, 3L, 8L, 6L, 3L, 2L, 7L}, - new Object[]{"Seoul", 7L, 8L, 9L, 13L, 3L, 2L, 3L, 7L, 5L, 3L, 2L, 6L}, - new Object[]{"Seoul", 8L, 9L, 10L, 13L, 3L, 2L, 3L, 6L, 4L, 3L, 2L, 5L}, - new Object[]{"Seoul", 9L, 10L, 11L, 13L, 3L, 2L, 3L, 5L, 3L, 3L, 2L, 4L}, - new Object[]{"Seoul", 10L, 11L, 12L, 13L, 3L, 2L, 3L, 4L, 2L, 2L, 2L, 3L}, - new Object[]{"Seoul", 11L, 12L, 13L, 13L, 3L, 2L, 3L, 3L, 1L, 1L, 2L, 2L}, - new Object[]{"Seoul", 12L, 13L, 13L, 13L, 3L, 2L, 2L, 2L, 0L, 0L, 1L, 1L}, - new Object[]{"Vienna", 0L, 1L, 2L, 3L, 0L, 1L, 2L, 3L, 2L, 2L, 2L, 3L}, - new Object[]{"Vienna", 1L, 2L, 3L, 3L, 1L, 2L, 3L, 3L, 1L, 1L, 2L, 2L}, - new Object[]{"Vienna", 2L, 3L, 3L, 3L, 2L, 2L, 2L, 2L, 0L, 0L, 1L, 1L} - )) - .run(); - } - @Test public void testWithArrayConcat() { diff --git a/sql/src/test/resources/calcite/tests/window/allBoundsCombination.sqlTest b/sql/src/test/resources/calcite/tests/window/allBoundsCombination.sqlTest new file mode 100644 index 00000000000..af6b0451761 --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/allBoundsCombination.sqlTest @@ -0,0 +1,38 @@ +type: "operatorValidation" + +sql: | + select + cityName, + count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 preceding) c1, + count(*) over (partition by cityName order by countryName rows between unbounded preceding and current row) c2, + count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 following) c3, + count(*) over (partition by cityName order by countryName rows between unbounded preceding and unbounded following) c4, + count(*) over (partition by cityName order by countryName rows between 3 preceding and 1 preceding) c5, + count(*) over (partition by cityName order by countryName rows between 1 preceding and current row) c6, + count(*) over (partition by cityName order by countryName rows between 1 preceding and 1 FOLLOWING) c7, + count(*) over (partition by cityName order by countryName rows between 1 preceding and unbounded FOLLOWING) c8, + count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and unbounded FOLLOWING) c9, + count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and 3 FOLLOWING) c10, + count(*) over (partition by cityName order by countryName rows between current row and 1 following) c11, + count(*) over (partition by cityName order by countryName rows between current row and unbounded following) c12 + from wikipedia + where cityName in ('Vienna', 'Seoul') + group by countryName, cityName, added + +expectedResults: + - ["Seoul",0,1,2,13,0,1,2,13,12,3,2,13] + - ["Seoul",1,2,3,13,1,2,3,13,11,3,2,12] + - ["Seoul",2,3,4,13,2,2,3,12,10,3,2,11] + - ["Seoul",3,4,5,13,3,2,3,11,9,3,2,10] + - ["Seoul",4,5,6,13,3,2,3,10,8,3,2,9] + - ["Seoul",5,6,7,13,3,2,3,9,7,3,2,8] + - ["Seoul",6,7,8,13,3,2,3,8,6,3,2,7] + - ["Seoul",7,8,9,13,3,2,3,7,5,3,2,6] + - ["Seoul",8,9,10,13,3,2,3,6,4,3,2,5] + - ["Seoul",9,10,11,13,3,2,3,5,3,3,2,4] + - ["Seoul",10,11,12,13,3,2,3,4,2,2,2,3] + - ["Seoul",11,12,13,13,3,2,3,3,1,1,2,2] + - ["Seoul",12,13,13,13,3,2,2,2,0,0,1,1] + - ["Vienna",0,1,2,3,0,1,2,3,2,2,2,3] + - ["Vienna",1,2,3,3,1,2,3,3,1,1,2,2] + - ["Vienna",2,3,3,3,2,2,2,2,0,0,1,1] diff --git a/sql/src/test/resources/calcite/tests/window/duplicateAggregation.sqlTest b/sql/src/test/resources/calcite/tests/window/duplicateAggregation.sqlTest new file mode 100644 index 00000000000..fd79234a562 --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/duplicateAggregation.sqlTest @@ -0,0 +1,10 @@ +type: "operatorValidation" + +sql: | + select + rank() over (order by count(*) desc), + rank() over (order by count(*) desc) + from wikipedia + +expectedResults: + - [1,1] diff --git a/sql/src/test/resources/calcite/tests/window/windowInsideSubquery.sqlTest b/sql/src/test/resources/calcite/tests/window/windowInsideSubquery.sqlTest new file mode 100644 index 00000000000..5672a5a17f3 --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/windowInsideSubquery.sqlTest @@ -0,0 +1,19 @@ +type: "operatorValidation" + +sql: | + select + c + from + ( + select channel, row_number() over () as c + from wikipedia + group by channel + ) + LIMIT 5 + +expectedResults: + - [1] + - [2] + - [3] + - [4] + - [5]