rerwrite node so dynamic parameter applies to ingest node as well. (#17126)

This commit is contained in:
Abhishek Radhakrishnan 2024-09-23 12:49:46 -07:00 committed by GitHub
parent 67d361c9bf
commit 37a2a12d79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 154 additions and 28 deletions

View File

@ -22,6 +22,8 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -704,6 +706,71 @@ public class MSQReplaceTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceWithDynamicParameters(String contextName, Map<String, Object> context)
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
.build();
testIngestQuery().setSql(
" REPLACE INTO foo OVERWRITE WHERE __time >= ? AND __time < ? "
+ "SELECT __time, m1 "
+ "FROM foo "
+ "WHERE __time >= ? AND __time < ? "
+ "PARTITIONED by DAY ")
.setDynamicParameters(ImmutableList.of(
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-02").getMillis()),
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-03").getMillis()),
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-02").getMillis()),
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-03").getMillis())
))
.setExpectedDataSource("foo")
.setExpectedDestinationIntervals(ImmutableList.of(Intervals.of(
"2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")))
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedSegments(ImmutableSet.of(SegmentId.of(
"foo",
Intervals.of("2000-01-02T/P1D"),
"test",
0
)))
.setExpectedResultRows(ImmutableList.of(new Object[]{946771200000L, 2.0f}))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(1),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.DAY,
Intervals.of("2000-01-02T/P1D")
)
)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFoo1WithAllExtern(String contextName, Map<String, Object> context) throws IOException

View File

@ -22,6 +22,8 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
@ -409,6 +411,53 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectWithDynamicParameters(String contextName, Map<String, Object> context)
{
RowSignature resultSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();
// Filter [__time >= timestamp '3000-01-01 00:00:00'] matches no segments at all.
testSelectQuery()
.setSql("select cnt,dim1 from foo where __time >= ?")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(
querySegmentSpec(
Intervals.utc(
DateTimes.of("3000").getMillis(),
Intervals.ETERNITY.getEndMillis()
)
)
)
.columns("cnt", "dim1")
.context(defaultScanQueryContext(context, resultSignature))
.build()
)
.columnMappings(ColumnMappings.identity(resultSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination(contextName, context)
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setDynamicParameters(
ImmutableList.of(
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("3000-01-01").getMillis())
)
)
.setQueryContext(context)
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectOnFooWhereMatchesNoData(String contextName, Map<String, Object> context)

View File

@ -36,6 +36,7 @@ import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.util.Modules;
import com.google.inject.util.Providers;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
@ -217,7 +218,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -784,13 +784,13 @@ public class MSQTestBase extends BaseCalciteQueryTest
);
}
private String runMultiStageQuery(String query, Map<String, Object> context)
private String runMultiStageQuery(String query, Map<String, Object> context, List<TypedValue> parameters)
{
final DirectStatement stmt = sqlStatementFactory.directStatement(
new SqlQueryPlus(
query,
context,
Collections.emptyList(),
parameters,
CalciteTests.REGULAR_USER_AUTH_RESULT
)
);
@ -886,6 +886,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected String sql = null;
protected MSQControllerTask taskSpec = null;
protected Map<String, Object> queryContext = DEFAULT_MSQ_CONTEXT;
protected List<TypedValue> dynamicParameters = new ArrayList<>();
protected List<MSQResultsReport.ColumnAndType> expectedRowSignature = null;
protected MSQSpec expectedMSQSpec = null;
protected MSQTuningConfig expectedTuningConfig = null;
@ -924,6 +925,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
public Builder setDynamicParameters(List<TypedValue> dynamicParameters)
{
this.dynamicParameters = dynamicParameters;
return asBuilder();
}
public Builder setExpectedRowSignature(List<MSQResultsReport.ColumnAndType> expectedRowSignature)
{
Preconditions.checkArgument(!expectedRowSignature.isEmpty(), "Row signature cannot be empty");
@ -1057,7 +1064,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
final Throwable e = Assert.assertThrows(
Throwable.class,
() -> runMultiStageQuery(sql, queryContext)
() -> runMultiStageQuery(sql, queryContext, dynamicParameters)
);
assertThat(e, expectedValidationErrorMatcher);
@ -1209,7 +1216,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
String controllerId;
if (sql != null) {
// Run the sql command.
controllerId = runMultiStageQuery(sql, queryContext);
controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
@ -1426,7 +1433,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
try {
String controllerId;
if (sql != null) {
controllerId = runMultiStageQuery(sql, queryContext);
controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
@ -1468,7 +1475,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null");
try {
String controllerId = runMultiStageQuery(sql, queryContext);
String controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters);
if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);

View File

@ -64,7 +64,6 @@ import java.util.function.Function;
*/
public class DruidPlanner implements Closeable
{
public static final Joiner SPACE_JOINER = Joiner.on(" ");
public static final Joiner COMMA_JOINER = Joiner.on(", ");
@ -148,6 +147,7 @@ public class DruidPlanner implements Closeable
catch (SqlParseException e1) {
throw translateException(e1);
}
root = rewriteParameters(root);
hook.captureSqlNode(root);
handler = createHandler(root);
handler.validate();
@ -158,6 +158,7 @@ public class DruidPlanner implements Closeable
private SqlStatementHandler createHandler(final SqlNode node)
{
SqlNode query = node;
SqlExplain explain = null;
if (query.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) query;
@ -179,6 +180,27 @@ public class DruidPlanner implements Closeable
throw InvalidSqlInput.exception("Unsupported SQL statement [%s]", node.getKind());
}
/**
* Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
* {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link org.apache.calcite.sql.SqlLiteral}
* replacement.
*
* @return a rewritten {@link SqlNode} with any dynamic parameters rewritten in the provided {@code original} node,
* if they were present.
*/
private SqlNode rewriteParameters(final SqlNode original)
{
// Parameter replacement is done only if the client provides parameter values.
// If this is a PREPARE-only, then there will be no values even if the statement contains
// parameters. If this is a PLAN, then we'll catch later the case that the statement
// contains parameters, but no values were provided.
if (plannerContext.getParameters().isEmpty()) {
return original;
} else {
return original.accept(new SqlParameterizerShuttle(plannerContext)); // the rewrite happens here.
}
}
/**
* Prepare a SQL query for execution, including some initial parsing and
* validation and any dynamic parameter type resolution, to support prepared

View File

@ -341,7 +341,6 @@ public abstract class IngestHandler extends QueryHandler
protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode)
{
SqlNode query = convertSourceQuery(sqlNode);
return DruidSqlReplace.create(
new SqlInsert(
sqlNode.getParserPosition(),

View File

@ -113,7 +113,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
CalcitePlanner planner = handlerContext.planner();
SqlNode validatedQueryNode;
try {
validatedQueryNode = planner.validate(rewriteParameters(root));
validatedQueryNode = planner.validate(root);
}
catch (ValidationException e) {
throw DruidPlanner.translateException(e);
@ -129,24 +129,6 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
return validatedQueryNode;
}
private SqlNode rewriteParameters(SqlNode original)
{
// Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
// {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
// replacement.
//
// Parameter replacement is done only if the client provides parameter values.
// If this is a PREPARE-only, then there will be no values even if the statement contains
// parameters. If this is a PLAN, then we'll catch later the case that the statement
// contains parameters, but no values were provided.
PlannerContext plannerContext = handlerContext.plannerContext();
if (plannerContext.getParameters().isEmpty()) {
return original;
} else {
return original.accept(new SqlParameterizerShuttle(plannerContext));
}
}
@Override
public void prepare()
{