Restore context flag for window functions (#16229)

This commit is contained in:
Soumyava 2024-04-03 01:27:13 -07:00 committed by GitHub
parent 218513ad55
commit 4bea865697
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 58 additions and 3 deletions

View File

@ -76,6 +76,7 @@ import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest; import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig; import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CacheTestHelperModule.ResultCacheMode; import org.apache.druid.sql.calcite.util.CacheTestHelperModule.ResultCacheMode;
import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.sql.calcite.util.TestDataBuilder;
@ -1163,6 +1164,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
public void testHllWithOrderedWindowing() public void testHllWithOrderedWindowing()
{ {
testBuilder() testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql( .sql(
"SELECT dim1,coalesce(cast(l1 as integer),-999)," "SELECT dim1,coalesce(cast(l1 as integer),-999),"
+ " HLL_SKETCH_ESTIMATE( DS_HLL(dim1) OVER ( ORDER BY l1 ), true)" + " HLL_SKETCH_ESTIMATE( DS_HLL(dim1) OVER ( ORDER BY l1 ), true)"
@ -1189,6 +1191,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
skipVectorize(); skipVectorize();
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
testBuilder() testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql( .sql(
"SELECT " "SELECT "
+ " TIME_FLOOR(__time, 'P1D') as dayLvl,\n" + " TIME_FLOOR(__time, 'P1D') as dayLvl,\n"

View File

@ -168,6 +168,7 @@ import org.apache.druid.sql.calcite.external.LocalOperatorConversion;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.run.SqlEngine;
@ -259,6 +260,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2) .put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0) .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
.put(MSQTaskQueryMaker.USER_KEY, "allowAll") .put(MSQTaskQueryMaker.USER_KEY, "allowAll")
.put(PlannerContext.CTX_ENABLE_WINDOW_FNS, true)
.build(); .build();
public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT = public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =

View File

@ -65,6 +65,7 @@ import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils; import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier; import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.DatasourceTable; import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
@ -621,6 +622,15 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
@Override @Override
public void validateCall(SqlCall call, SqlValidatorScope scope) public void validateCall(SqlCall call, SqlValidatorScope scope)
{ {
if (call.getKind() == SqlKind.OVER) {
if (!plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) {
throw buildCalciteContextException(
StringUtils.format(
"The query contains window functions; To run these window functions, specify [%s] in query context.",
PlannerContext.CTX_ENABLE_WINDOW_FNS),
call);
}
}
if (call.getKind() == SqlKind.NULLS_FIRST) { if (call.getKind() == SqlKind.NULLS_FIRST) {
SqlNode op0 = call.getOperandList().get(0); SqlNode op0 = call.getOperandList().get(0);
if (op0.getKind() == SqlKind.DESCENDING) { if (op0.getKind() == SqlKind.DESCENDING) {

View File

@ -84,6 +84,10 @@ public class PlannerContext
*/ */
public static final String CTX_SQL_OUTER_LIMIT = "sqlOuterLimit"; public static final String CTX_SQL_OUTER_LIMIT = "sqlOuterLimit";
/**
* Key to enable window functions.
*/
public static final String CTX_ENABLE_WINDOW_FNS = "enableWindowing";
/** /**
* Context key for {@link PlannerContext#isUseBoundsAndSelectors()}. * Context key for {@link PlannerContext#isUseBoundsAndSelectors()}.
@ -571,9 +575,15 @@ public class PlannerContext
* Checks if the current {@link SqlEngine} supports a particular feature. * Checks if the current {@link SqlEngine} supports a particular feature.
* *
* When executing a specific query, use this method instead of {@link SqlEngine#featureAvailable(EngineFeature)} * When executing a specific query, use this method instead of {@link SqlEngine#featureAvailable(EngineFeature)}
* because it also verifies feature flags such as {@link #CTX_ENABLE_WINDOW_FNS}.
*/ */
public boolean featureAvailable(final EngineFeature feature) public boolean featureAvailable(final EngineFeature feature)
{ {
if (feature == EngineFeature.WINDOW_FUNCTIONS &&
!QueryContexts.getAsBoolean(CTX_ENABLE_WINDOW_FNS, queryContext.get(CTX_ENABLE_WINDOW_FNS), false)) {
// Short-circuit: feature requires context flag.
return false;
}
if (feature == EngineFeature.TIME_BOUNDARY_QUERY && !queryContext().isTimeBoundaryPlanningEnabled()) { if (feature == EngineFeature.TIME_BOUNDARY_QUERY && !queryContext().isTimeBoundaryPlanningEnabled()) {
// Short-circuit: feature requires context flag. // Short-circuit: feature requires context flag.
return false; return false;

View File

@ -14927,10 +14927,22 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)); ));
} }
@Test
public void testWindowingErrorWithoutFeatureFlag()
{
DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, false))
.sql("SELECT dim1,ROW_NUMBER() OVER () from druid.foo")
.run());
assertThat(e, invalidSqlIs("The query contains window functions; To run these window functions, specify [enableWindowing] in query context. (line [1], column [13])"));
}
@Test @Test
public void testUnSupportedNullsFirst() public void testUnSupportedNullsFirst()
{ {
DruidException e = assertThrows(DruidException.class, () -> testBuilder() DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 DESC NULLS FIRST) from druid.foo") .sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 DESC NULLS FIRST) from druid.foo")
.run()); .run());
@ -14941,6 +14953,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testUnSupportedNullsLast() public void testUnSupportedNullsLast()
{ {
DruidException e = assertThrows(DruidException.class, () -> testBuilder() DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 NULLS LAST) from druid.foo") .sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 NULLS LAST) from druid.foo")
.run()); .run());
assertThat(e, invalidSqlIs("ASCENDING ordering with NULLS LAST is not supported! (line [1], column [41])")); assertThat(e, invalidSqlIs("ASCENDING ordering with NULLS LAST is not supported! (line [1], column [41])"));
@ -14952,6 +14965,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS); assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);
DruidException e = assertThrows(DruidException.class, () -> testBuilder() DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) from druid.foo") .sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) from druid.foo")
.run()); .run());
assertThat(e, invalidSqlIs("The query contains a window frame which may return incorrect results. To disregard this warning, set [windowingStrictValidation] to false in the query context. (line [1], column [31])")); assertThat(e, invalidSqlIs("The query contains a window frame which may return incorrect results. To disregard this warning, set [windowingStrictValidation] to false in the query context. (line [1], column [31])"));
@ -14963,6 +14977,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS); assumeFeatureAvailable(EngineFeature.WINDOW_FUNCTIONS);
DruidException e = assertThrows(DruidException.class, () -> testBuilder() DruidException e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN dim1 PRECEDING AND dim1 FOLLOWING) from druid.foo") .sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN dim1 PRECEDING AND dim1 FOLLOWING) from druid.foo")
.run()); .run());
assertThat(e, invalidSqlIs("Window frames with expression based lower/upper bounds are not supported. (line [1], column [31])")); assertThat(e, invalidSqlIs("Window frames with expression based lower/upper bounds are not supported. (line [1], column [31])"));
@ -14976,11 +14991,13 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
DruidException e; DruidException e;
e = assertThrows(DruidException.class, () -> testBuilder() e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) from druid.foo") .sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) from druid.foo")
.run()); .run());
assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])")); assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])"));
e = assertThrows(DruidException.class, () -> testBuilder() e = assertThrows(DruidException.class, () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) from druid.foo") .sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) from druid.foo")
.run()); .run());
assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])")); assertThat(e, invalidSqlIs("Query bounds with both lower and upper bounds as PRECEDING or FOLLOWING is not supported. (line [1], column [31])"));
@ -14994,6 +15011,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
DruidException e = assertThrows( DruidException e = assertThrows(
DruidException.class, DruidException.class,
() -> testBuilder() () -> testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT ntile(4) OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND CURRENT ROW) from druid.foo") .sql("SELECT ntile(4) OVER (ORDER BY dim1 ROWS BETWEEN 1 FOLLOWING AND CURRENT ROW) from druid.foo")
.run() .run()
); );
@ -15201,6 +15219,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
testBuilder() testBuilder()
.sql(sql) .sql(sql)
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.expectedQuery( .expectedQuery(
WindowOperatorQueryBuilder.builder() WindowOperatorQueryBuilder.builder()
.setDataSource( .setDataSource(
@ -15288,6 +15307,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
) )
.queryContext( .queryContext(
ImmutableMap.of( ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true QueryContexts.ENABLE_DEBUG, true
) )
) )
@ -15388,6 +15408,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
) )
.queryContext( .queryContext(
ImmutableMap.of( ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true QueryContexts.ENABLE_DEBUG, true
) )
) )

View File

@ -20,8 +20,10 @@
package org.apache.druid.sql.calcite; package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.sql.calcite.NotYetSupported.Modes; import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor; import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -52,6 +54,7 @@ public class CalciteSysQueryTest extends BaseCalciteQueryTest
msqIncompatible(); msqIncompatible();
testBuilder() testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("select datasource, sum(duration) over () from sys.tasks group by datasource") .sql("select datasource, sum(duration) over () from sys.tasks group by datasource")
.expectedResults(ImmutableList.of( .expectedResults(ImmutableList.of(
new Object[]{"foo", 11L}, new Object[]{"foo", 11L},

View File

@ -35,6 +35,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.CalciteWindowQueryTest.WindowQueryTestInputClass.TestType; import org.apache.druid.sql.calcite.CalciteWindowQueryTest.WindowQueryTestInputClass.TestType;
import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults; import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
import org.apache.druid.sql.calcite.QueryVerification.QueryResultsVerifier; import org.apache.druid.sql.calcite.QueryVerification.QueryResultsVerifier;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.junit.Assert; import org.junit.Assert;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -197,6 +198,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
.skipVectorize(true) .skipVectorize(true)
.sql(testCase.getSql()) .sql(testCase.getSql())
.queryContext(ImmutableMap.of( .queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true, QueryContexts.ENABLE_DEBUG, true,
QueryContexts.WINDOWING_STRICT_VALIDATION, false QueryContexts.WINDOWING_STRICT_VALIDATION, false
)) ))
@ -219,6 +221,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
.skipVectorize(true) .skipVectorize(true)
.sql(testCase.getSql()) .sql(testCase.getSql())
.queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true, .queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true,
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000", QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000",
QueryContexts.WINDOWING_STRICT_VALIDATION, false QueryContexts.WINDOWING_STRICT_VALIDATION, false
) )

View File

@ -57,6 +57,7 @@ import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor; import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults; import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
import org.apache.druid.sql.calcite.planner.PlannerCaptureHook; import org.apache.druid.sql.calcite.planner.PlannerCaptureHook;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -480,8 +481,10 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
testBuilder() testBuilder()
.skipVectorize(true) .skipVectorize(true)
.queryContext(ImmutableMap.of( .queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
PlannerCaptureHook.NEED_CAPTURE_HOOK, true, PlannerCaptureHook.NEED_CAPTURE_HOOK, true,
QueryContexts.ENABLE_DEBUG, true) QueryContexts.ENABLE_DEBUG, true
)
) )
.sql(testCase.getQueryString()) .sql(testCase.getQueryString())
.expectedResults(new TextualResultsVerifier(testCase.getExpectedResults(), null)) .expectedResults(new TextualResultsVerifier(testCase.getExpectedResults(), null))