diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 2ae3eaf0383..8f8a0b569c9 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -22,6 +22,8 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.remote.TypedValue; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -704,6 +706,71 @@ public class MSQReplaceTest extends MSQTestBase .verifyResults(); } + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testReplaceWithDynamicParameters(String contextName, Map context) + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("m1", ColumnType.FLOAT) + .build(); + + testIngestQuery().setSql( + " REPLACE INTO foo OVERWRITE WHERE __time >= ? AND __time < ? " + + "SELECT __time, m1 " + + "FROM foo " + + "WHERE __time >= ? AND __time < ? " + + "PARTITIONED by DAY ") + .setDynamicParameters(ImmutableList.of( + TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-02").getMillis()), + TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-03").getMillis()), + TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-02").getMillis()), + TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-03").getMillis()) + )) + .setExpectedDataSource("foo") + .setExpectedDestinationIntervals(ImmutableList.of(Intervals.of( + "2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"))) + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedSegments(ImmutableSet.of(SegmentId.of( + "foo", + Intervals.of("2000-01-02T/P1D"), + "test", + 0 + ))) + .setExpectedResultRows(ImmutableList.of(new Object[]{946771200000L, 2.0f})) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().totalFiles(1), + 0, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 0, 0, "shuffle" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher + .with().rows(1).frames(1), + 1, 0, "input0" + ) + .setExpectedSegmentGenerationProgressCountersForStageWorker( + CounterSnapshotMatcher + .with().segmentRowsProcessed(1), + 1, 0 + ) + .setExpectedLastCompactionState( + expectedCompactionState( + context, + Collections.emptyList(), + Collections.singletonList(new FloatDimensionSchema("m1")), + GranularityType.DAY, + Intervals.of("2000-01-02T/P1D") + ) + ) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testReplaceOnFoo1WithAllExtern(String contextName, Map context) throws IOException diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index a5822f3a0b7..91a1983bfd6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -22,6 +22,8 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.remote.TypedValue; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.InlineInputSource; @@ -409,6 +411,53 @@ public class MSQSelectTest extends MSQTestBase .verifyResults(); } + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testSelectWithDynamicParameters(String contextName, Map context) + { + RowSignature resultSignature = RowSignature.builder() + .add("cnt", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .build(); + + // Filter [__time >= timestamp '3000-01-01 00:00:00'] matches no segments at all. + testSelectQuery() + .setSql("select cnt,dim1 from foo where __time >= ?") + .setExpectedMSQSpec( + MSQSpec.builder() + .query( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals( + querySegmentSpec( + Intervals.utc( + DateTimes.of("3000").getMillis(), + Intervals.ETERNITY.getEndMillis() + ) + ) + ) + .columns("cnt", "dim1") + .context(defaultScanQueryContext(context, resultSignature)) + .build() + ) + .columnMappings(ColumnMappings.identity(resultSignature)) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination(contextName, context) + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setDynamicParameters( + ImmutableList.of( + TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("3000-01-01").getMillis()) + ) + ) + .setQueryContext(context) + .setExpectedRowSignature(resultSignature) + .setExpectedResultRows(ImmutableList.of()) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testSelectOnFooWhereMatchesNoData(String contextName, Map context) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index e1ce49d8292..761a61337ea 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -36,6 +36,7 @@ import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.util.Modules; import com.google.inject.util.Providers; +import org.apache.calcite.avatica.remote.TypedValue; import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; @@ -217,7 +218,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -784,13 +784,13 @@ public class MSQTestBase extends BaseCalciteQueryTest ); } - private String runMultiStageQuery(String query, Map context) + private String runMultiStageQuery(String query, Map context, List parameters) { final DirectStatement stmt = sqlStatementFactory.directStatement( new SqlQueryPlus( query, context, - Collections.emptyList(), + parameters, CalciteTests.REGULAR_USER_AUTH_RESULT ) ); @@ -886,6 +886,7 @@ public class MSQTestBase extends BaseCalciteQueryTest protected String sql = null; protected MSQControllerTask taskSpec = null; protected Map queryContext = DEFAULT_MSQ_CONTEXT; + protected List dynamicParameters = new ArrayList<>(); protected List expectedRowSignature = null; protected MSQSpec expectedMSQSpec = null; protected MSQTuningConfig expectedTuningConfig = null; @@ -924,6 +925,12 @@ public class MSQTestBase extends BaseCalciteQueryTest return asBuilder(); } + public Builder setDynamicParameters(List dynamicParameters) + { + this.dynamicParameters = dynamicParameters; + return asBuilder(); + } + public Builder setExpectedRowSignature(List expectedRowSignature) { Preconditions.checkArgument(!expectedRowSignature.isEmpty(), "Row signature cannot be empty"); @@ -1057,7 +1064,7 @@ public class MSQTestBase extends BaseCalciteQueryTest final Throwable e = Assert.assertThrows( Throwable.class, - () -> runMultiStageQuery(sql, queryContext) + () -> runMultiStageQuery(sql, queryContext, dynamicParameters) ); assertThat(e, expectedValidationErrorMatcher); @@ -1209,7 +1216,7 @@ public class MSQTestBase extends BaseCalciteQueryTest String controllerId; if (sql != null) { // Run the sql command. - controllerId = runMultiStageQuery(sql, queryContext); + controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters); } else { // Run the task spec directly instead. controllerId = TEST_CONTROLLER_TASK_ID; @@ -1426,7 +1433,7 @@ public class MSQTestBase extends BaseCalciteQueryTest try { String controllerId; if (sql != null) { - controllerId = runMultiStageQuery(sql, queryContext); + controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters); } else { // Run the task spec directly instead. controllerId = TEST_CONTROLLER_TASK_ID; @@ -1468,7 +1475,7 @@ public class MSQTestBase extends BaseCalciteQueryTest Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null"); try { - String controllerId = runMultiStageQuery(sql, queryContext); + String controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters); if (expectedMSQFault != null || expectedMSQFaultClass != null) { MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index cf1d22eb39b..03ef94656c5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -64,7 +64,6 @@ import java.util.function.Function; */ public class DruidPlanner implements Closeable { - public static final Joiner SPACE_JOINER = Joiner.on(" "); public static final Joiner COMMA_JOINER = Joiner.on(", "); @@ -148,6 +147,7 @@ public class DruidPlanner implements Closeable catch (SqlParseException e1) { throw translateException(e1); } + root = rewriteParameters(root); hook.captureSqlNode(root); handler = createHandler(root); handler.validate(); @@ -158,6 +158,7 @@ public class DruidPlanner implements Closeable private SqlStatementHandler createHandler(final SqlNode node) { SqlNode query = node; + SqlExplain explain = null; if (query.getKind() == SqlKind.EXPLAIN) { explain = (SqlExplain) query; @@ -179,6 +180,27 @@ public class DruidPlanner implements Closeable throw InvalidSqlInput.exception("Unsupported SQL statement [%s]", node.getKind()); } + /** + * Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any + * {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link org.apache.calcite.sql.SqlLiteral} + * replacement. + * + * @return a rewritten {@link SqlNode} with any dynamic parameters rewritten in the provided {@code original} node, + * if they were present. + */ + private SqlNode rewriteParameters(final SqlNode original) + { + // Parameter replacement is done only if the client provides parameter values. + // If this is a PREPARE-only, then there will be no values even if the statement contains + // parameters. If this is a PLAN, then we'll catch later the case that the statement + // contains parameters, but no values were provided. + if (plannerContext.getParameters().isEmpty()) { + return original; + } else { + return original.accept(new SqlParameterizerShuttle(plannerContext)); // the rewrite happens here. + } + } + /** * Prepare a SQL query for execution, including some initial parsing and * validation and any dynamic parameter type resolution, to support prepared diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java index 92f2ef2ea81..e0b5ffdb08e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java @@ -341,7 +341,6 @@ public abstract class IngestHandler extends QueryHandler protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode) { SqlNode query = convertSourceQuery(sqlNode); - return DruidSqlReplace.create( new SqlInsert( sqlNode.getParserPosition(), diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index dfc374f51bb..a915833cd3f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -113,7 +113,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand CalcitePlanner planner = handlerContext.planner(); SqlNode validatedQueryNode; try { - validatedQueryNode = planner.validate(rewriteParameters(root)); + validatedQueryNode = planner.validate(root); } catch (ValidationException e) { throw DruidPlanner.translateException(e); @@ -129,24 +129,6 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand return validatedQueryNode; } - private SqlNode rewriteParameters(SqlNode original) - { - // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any - // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral} - // replacement. - // - // Parameter replacement is done only if the client provides parameter values. - // If this is a PREPARE-only, then there will be no values even if the statement contains - // parameters. If this is a PLAN, then we'll catch later the case that the statement - // contains parameters, but no values were provided. - PlannerContext plannerContext = handlerContext.plannerContext(); - if (plannerContext.getParameters().isEmpty()) { - return original; - } else { - return original.accept(new SqlParameterizerShuttle(plannerContext)); - } - } - @Override public void prepare() {