Throw exceptions in SqlValidator when DISTINCT used over WINDOW (#16738)

* Throw exception if DISTINCT used with window functions aggregate call
* Improve error message when unsupported aggregations are used with window functions
This commit is contained in:
Sree Charan Manamala 2024-07-22 19:59:46 +05:30 committed by GitHub
parent c9aae9d8e6
commit 149d7c5207
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 123 additions and 96 deletions

View File

@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema; import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.ExprMacroTable;
@ -64,6 +65,7 @@ import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer; import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier; import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.sql.calcite.util.TestDataBuilder;
@ -73,6 +75,9 @@ import org.junit.jupiter.api.Test;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@SqlTestFrameworkConfig.ComponentSupplier(VarianceComponentSupplier.class) @SqlTestFrameworkConfig.ComponentSupplier(VarianceComponentSupplier.class)
public class VarianceSqlAggregatorTest extends BaseCalciteQueryTest public class VarianceSqlAggregatorTest extends BaseCalciteQueryTest
{ {
@ -724,4 +729,23 @@ public class VarianceSqlAggregatorTest extends BaseCalciteQueryTest
)) ))
.run(); .run();
} }
@Test
public void testStddevNotSupportedOverWindow()
{
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);
DruidException e = assertThrows(
DruidException.class,
() -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT stddev(m1) OVER () from numfoo")
.run()
);
assertEquals(
"Query could not be planned. A possible reason is [Aggregation [STDDEV] is currently not supported for window functions]",
e.getMessage()
);
}
} }

View File

@ -212,15 +212,14 @@ public class ArrayListRowsAndColumns<RowType> implements AppendableRowsAndColumn
@Override @Override
public void addColumn(String name, Column column) public void addColumn(String name, Column column)
{ {
if (rows.size() == numRows()) { if (rows.size() == numRows() && column.as(ColumnValueSwapper.class) != null) {
extraColumns.put(name, column); extraColumns.put(name, column);
columnNames.add(name); columnNames.add(name);
return; return;
} }
// When an ArrayListRowsAndColumns is only a partial view, but adds a column, it believes that the same column // When an ArrayListRowsAndColumns is only a partial view, but adds a column, it believes that the same column
// will eventually be added for all of the rows so we pre-allocate storage for the entire set of data and // will eventually be added for all the rows so we pre-allocate storage for the entire set of data and copy.
// copy.
final ColumnAccessor columnAccessor = column.toAccessor(); final ColumnAccessor columnAccessor = column.toAccessor();
if (columnAccessor.numRows() != numRows()) { if (columnAccessor.numRows() != numRows()) {

View File

@ -36,6 +36,7 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlSelectKeyword;
import org.apache.calcite.sql.SqlUpdate; import org.apache.calcite.sql.SqlUpdate;
import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindow; import org.apache.calcite.sql.SqlWindow;
@ -110,6 +111,10 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
@Override @Override
public void validateWindow(SqlNode windowOrId, SqlValidatorScope scope, @Nullable SqlCall call) public void validateWindow(SqlNode windowOrId, SqlValidatorScope scope, @Nullable SqlCall call)
{ {
if (isSqlCallDistinct(call)) {
throw buildCalciteContextException("DISTINCT is not supported for window functions", windowOrId);
}
final SqlWindow targetWindow; final SqlWindow targetWindow;
switch (windowOrId.getKind()) { switch (windowOrId.getKind()) {
case IDENTIFIER: case IDENTIFIER:
@ -857,4 +862,11 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
} }
return src; return src;
} }
private boolean isSqlCallDistinct(@Nullable SqlCall call)
{
return call != null
&& call.getFunctionQuantifier() != null
&& call.getFunctionQuantifier().getValue() == SqlSelectKeyword.DISTINCT;
}
} }

View File

@ -160,15 +160,13 @@ public class Windowing
Collections.emptyList(), Collections.emptyList(),
aggName, aggName,
aggregateCall, aggregateCall,
false // Windowed aggregations don't currently finalize. This means that sketches won't work as expected. false // Windowed aggregations finalize later when we write the computed value to result RAC
); );
if (aggregation == null if (aggregation == null
|| aggregation.getPostAggregator() != null || aggregation.getPostAggregator() != null
|| aggregation.getAggregatorFactories().size() != 1) { || aggregation.getAggregatorFactories().size() != 1) {
if (null == plannerContext.getPlanningError()) { plannerContext.setPlanningError("Aggregation [%s] is currently not supported for window functions", aggregateCall.getAggregation().getName());
plannerContext.setPlanningError("Aggregation [%s] is not supported", aggregateCall);
}
throw new CannotBuildQueryException(window, aggregateCall); throw new CannotBuildQueryException(window, aggregateCall);
} }

View File

@ -15562,6 +15562,22 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
assertThat(e, invalidSqlContains("Framing of NTILE is not supported")); assertThat(e, invalidSqlContains("Framing of NTILE is not supported"));
} }
@Test
public void testDistinctNotSupportedWithWindow()
{
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);
DruidException e = assertThrows(
DruidException.class,
() -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT count(distinct dim1) OVER () from druid.foo")
.run()
);
assertThat(e, invalidSqlContains("DISTINCT is not supported for window functions"));
}
@Test @Test
public void testInGroupByLimitOutGroupByOrderBy() public void testInGroupByLimitOutGroupByOrderBy()
{ {

View File

@ -231,95 +231,6 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
} }
} }
@Test
public void testEmptyWindowInSubquery()
{
testBuilder()
.sql(
"select c from (\n"
+ " select channel, row_number() over () as c\n"
+ " from wikipedia\n"
+ " group by channel\n"
+ ") LIMIT 5"
)
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true
))
.expectedResults(ImmutableList.of(
new Object[]{1L},
new Object[]{2L},
new Object[]{3L},
new Object[]{4L},
new Object[]{5L}
))
.run();
}
@Test
public void testWindow()
{
testBuilder()
.sql("SELECT\n" +
"(rank() over (order by count(*) desc)),\n" +
"(rank() over (order by count(*) desc))\n" +
"FROM \"wikipedia\"")
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true
))
.expectedResults(ImmutableList.of(
new Object[]{1L, 1L}
))
.run();
}
@Test
public void testWindowAllBoundsCombination()
{
testBuilder()
.sql("select\n"
+ "cityName,\n"
+ "count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 preceding) c1,\n"
+ "count(*) over (partition by cityName order by countryName rows between unbounded preceding and current row) c2,\n"
+ "count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 following) c3,\n"
+ "count(*) over (partition by cityName order by countryName rows between unbounded preceding and unbounded following) c4,\n"
+ "count(*) over (partition by cityName order by countryName rows between 3 preceding and 1 preceding) c5,\n"
+ "count(*) over (partition by cityName order by countryName rows between 1 preceding and current row) c6,\n"
+ "count(*) over (partition by cityName order by countryName rows between 1 preceding and 1 FOLLOWING) c7,\n"
+ "count(*) over (partition by cityName order by countryName rows between 1 preceding and unbounded FOLLOWING) c8,\n"
+ "count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and unbounded FOLLOWING) c9,\n"
+ "count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and 3 FOLLOWING) c10,\n"
+ "count(*) over (partition by cityName order by countryName rows between current row and 1 following) c11,\n"
+ "count(*) over (partition by cityName order by countryName rows between current row and unbounded following) c12\n"
+ "from wikipedia\n"
+ "where cityName in ('Vienna', 'Seoul')\n"
+ "group by countryName, cityName, added")
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true
))
.expectedResults(ImmutableList.of(
new Object[]{"Seoul", 0L, 1L, 2L, 13L, 0L, 1L, 2L, 13L, 12L, 3L, 2L, 13L},
new Object[]{"Seoul", 1L, 2L, 3L, 13L, 1L, 2L, 3L, 13L, 11L, 3L, 2L, 12L},
new Object[]{"Seoul", 2L, 3L, 4L, 13L, 2L, 2L, 3L, 12L, 10L, 3L, 2L, 11L},
new Object[]{"Seoul", 3L, 4L, 5L, 13L, 3L, 2L, 3L, 11L, 9L, 3L, 2L, 10L},
new Object[]{"Seoul", 4L, 5L, 6L, 13L, 3L, 2L, 3L, 10L, 8L, 3L, 2L, 9L},
new Object[]{"Seoul", 5L, 6L, 7L, 13L, 3L, 2L, 3L, 9L, 7L, 3L, 2L, 8L},
new Object[]{"Seoul", 6L, 7L, 8L, 13L, 3L, 2L, 3L, 8L, 6L, 3L, 2L, 7L},
new Object[]{"Seoul", 7L, 8L, 9L, 13L, 3L, 2L, 3L, 7L, 5L, 3L, 2L, 6L},
new Object[]{"Seoul", 8L, 9L, 10L, 13L, 3L, 2L, 3L, 6L, 4L, 3L, 2L, 5L},
new Object[]{"Seoul", 9L, 10L, 11L, 13L, 3L, 2L, 3L, 5L, 3L, 3L, 2L, 4L},
new Object[]{"Seoul", 10L, 11L, 12L, 13L, 3L, 2L, 3L, 4L, 2L, 2L, 2L, 3L},
new Object[]{"Seoul", 11L, 12L, 13L, 13L, 3L, 2L, 3L, 3L, 1L, 1L, 2L, 2L},
new Object[]{"Seoul", 12L, 13L, 13L, 13L, 3L, 2L, 2L, 2L, 0L, 0L, 1L, 1L},
new Object[]{"Vienna", 0L, 1L, 2L, 3L, 0L, 1L, 2L, 3L, 2L, 2L, 2L, 3L},
new Object[]{"Vienna", 1L, 2L, 3L, 3L, 1L, 2L, 3L, 3L, 1L, 1L, 2L, 2L},
new Object[]{"Vienna", 2L, 3L, 3L, 3L, 2L, 2L, 2L, 2L, 0L, 0L, 1L, 1L}
))
.run();
}
@Test @Test
public void testWithArrayConcat() public void testWithArrayConcat()
{ {

View File

@ -0,0 +1,38 @@
type: "operatorValidation"
sql: |
select
cityName,
count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 preceding) c1,
count(*) over (partition by cityName order by countryName rows between unbounded preceding and current row) c2,
count(*) over (partition by cityName order by countryName rows between unbounded preceding and 1 following) c3,
count(*) over (partition by cityName order by countryName rows between unbounded preceding and unbounded following) c4,
count(*) over (partition by cityName order by countryName rows between 3 preceding and 1 preceding) c5,
count(*) over (partition by cityName order by countryName rows between 1 preceding and current row) c6,
count(*) over (partition by cityName order by countryName rows between 1 preceding and 1 FOLLOWING) c7,
count(*) over (partition by cityName order by countryName rows between 1 preceding and unbounded FOLLOWING) c8,
count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and unbounded FOLLOWING) c9,
count(*) over (partition by cityName order by countryName rows between 1 FOLLOWING and 3 FOLLOWING) c10,
count(*) over (partition by cityName order by countryName rows between current row and 1 following) c11,
count(*) over (partition by cityName order by countryName rows between current row and unbounded following) c12
from wikipedia
where cityName in ('Vienna', 'Seoul')
group by countryName, cityName, added
expectedResults:
- ["Seoul",0,1,2,13,0,1,2,13,12,3,2,13]
- ["Seoul",1,2,3,13,1,2,3,13,11,3,2,12]
- ["Seoul",2,3,4,13,2,2,3,12,10,3,2,11]
- ["Seoul",3,4,5,13,3,2,3,11,9,3,2,10]
- ["Seoul",4,5,6,13,3,2,3,10,8,3,2,9]
- ["Seoul",5,6,7,13,3,2,3,9,7,3,2,8]
- ["Seoul",6,7,8,13,3,2,3,8,6,3,2,7]
- ["Seoul",7,8,9,13,3,2,3,7,5,3,2,6]
- ["Seoul",8,9,10,13,3,2,3,6,4,3,2,5]
- ["Seoul",9,10,11,13,3,2,3,5,3,3,2,4]
- ["Seoul",10,11,12,13,3,2,3,4,2,2,2,3]
- ["Seoul",11,12,13,13,3,2,3,3,1,1,2,2]
- ["Seoul",12,13,13,13,3,2,2,2,0,0,1,1]
- ["Vienna",0,1,2,3,0,1,2,3,2,2,2,3]
- ["Vienna",1,2,3,3,1,2,3,3,1,1,2,2]
- ["Vienna",2,3,3,3,2,2,2,2,0,0,1,1]

View File

@ -0,0 +1,10 @@
type: "operatorValidation"
sql: |
select
rank() over (order by count(*) desc),
rank() over (order by count(*) desc)
from wikipedia
expectedResults:
- [1,1]

View File

@ -0,0 +1,19 @@
type: "operatorValidation"
sql: |
select
c
from
(
select channel, row_number() over () as c
from wikipedia
group by channel
)
LIMIT 5
expectedResults:
- [1]
- [2]
- [3]
- [4]
- [5]