diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/sql/avatica/MSQDruidMeta.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/sql/avatica/MSQDruidMeta.java index ced8d46766f..972ef5ea072 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/sql/avatica/MSQDruidMeta.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/sql/avatica/MSQDruidMeta.java @@ -20,6 +20,8 @@ package org.apache.druid.sql.avatica; //package org.apache.druid.msq.exec; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.calcite.avatica.Meta; @@ -35,6 +37,7 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.server.security.AuthenticatorMapper; import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.calcite.planner.DruidTypeSystem; +import org.apache.druid.sql.calcite.run.DruidHook; import org.apache.druid.sql.calcite.table.RowSignatures; import java.util.Collections; @@ -43,6 +46,7 @@ import java.util.List; public class MSQDruidMeta extends DruidMeta { protected final MSQTestOverlordServiceClient overlordClient; + private final ObjectMapper objectMapper; @Inject public MSQDruidMeta( @@ -50,10 +54,13 @@ public class MSQDruidMeta extends DruidMeta final AvaticaServerConfig config, final ErrorHandler errorHandler, final AuthenticatorMapper authMapper, - final MSQTestOverlordServiceClient overlordClient) + final MSQTestOverlordServiceClient overlordClient, + final ObjectMapper objectMapper + ) { super(sqlStatementFactory, config, errorHandler, authMapper); this.overlordClient = overlordClient; + this.objectMapper = objectMapper; } @Override @@ -84,6 +91,16 @@ public class MSQDruidMeta extends DruidMeta // convertColumnAndTypeToRowSignature(payload.getResults().getSignature()), resultRows // ) // ); + try { + String str = objectMapper + .writerWithDefaultPrettyPrinter() + .writeValueAsString(payload.getStages()); + DruidHook.dispatch(DruidHook.MSQ_PLAN, str); + } + catch (JsonProcessingException e) { + DruidHook.dispatch(DruidHook.MSQ_PLAN, "error happened during json serialization"); + } + Signature signature = makeSignature(druidStatement, payload.getResults().getSignature()); @SuppressWarnings("unchecked") diff --git a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.exec.MSQQuidemTest/decoupled.iq b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.exec.MSQQuidemTest/decoupled.iq index 393ce6e8f43..8b23c38b821 100644 --- a/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.exec.MSQQuidemTest/decoupled.iq +++ b/extensions-core/multi-stage-query/src/test/quidem/org.apache.druid.msq.exec.MSQQuidemTest/decoupled.iq @@ -16,6 +16,215 @@ order by 1; (2 rows) !ok +[ { + "stageNumber" : 0, + "definition" : { + "id" : "query-8e797c48-d9eb-4bfb-831a-35ebd71c32b0_0", + "input" : [ { + "type" : "table", + "dataSource" : "wikipedia", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ], + "filter" : { + "type" : "inType", + "column" : "cityName", + "matchValueType" : "STRING", + "sortedValues" : [ "Aarhus", "New York" ] + }, + "filterFields" : [ "cityName" ] + } ], + "processor" : { + "type" : "groupByPreShuffle", + "query" : { + "queryType" : "groupBy", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "filter" : { + "type" : "inType", + "column" : "cityName", + "matchValueType" : "STRING", + "sortedValues" : [ "Aarhus", "New York" ] + }, + "granularity" : { + "type" : "all" + }, + "dimensions" : [ { + "type" : "default", + "dimension" : "cityName", + "outputName" : "d0", + "outputType" : "STRING" + } ], + "aggregations" : [ { + "type" : "filtered", + "aggregator" : { + "type" : "count", + "name" : "a0" + }, + "filter" : { + "type" : "and", + "fields" : [ { + "type" : "not", + "field" : { + "type" : "null", + "column" : "channel" + } + }, { + "type" : "range", + "column" : "delta", + "matchValueType" : "LONG", + "lower" : 0, + "lowerOpen" : true + } ] + }, + "name" : "a0" + }, { + "type" : "count", + "name" : "a1" + } ], + "limitSpec" : { + "type" : "NoopLimitSpec" + }, + "context" : { + "__user" : null, + "finalize" : true, + "maxParseExceptions" : 0, + "plannerStrategy" : "DECOUPLED", + "sqlQueryId" : "8e797c48-d9eb-4bfb-831a-35ebd71c32b0", + "sqlStringifyArrays" : false + } + } + }, + "signature" : [ { + "name" : "d0", + "type" : "STRING" + }, { + "name" : "a0", + "type" : "LONG" + }, { + "name" : "a1", + "type" : "LONG" + } ], + "shuffleSpec" : { + "type" : "maxCount", + "clusterBy" : { + "columns" : [ { + "columnName" : "d0", + "order" : "ASCENDING" + } ] + }, + "partitions" : 1, + "aggregate" : true + }, + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "shuffle" : "globalSort", + "output" : "localStorage", + "startTime" : "2024-07-17T19:47:18.571Z", + "duration" : 664, + "sort" : true +}, { + "stageNumber" : 1, + "definition" : { + "id" : "query-8e797c48-d9eb-4bfb-831a-35ebd71c32b0_1", + "input" : [ { + "type" : "stage", + "stage" : 0 + } ], + "processor" : { + "type" : "groupByPostShuffle", + "query" : { + "queryType" : "groupBy", + "dataSource" : { + "type" : "inputNumber", + "inputNumber" : 0 + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "filter" : { + "type" : "inType", + "column" : "cityName", + "matchValueType" : "STRING", + "sortedValues" : [ "Aarhus", "New York" ] + }, + "granularity" : { + "type" : "all" + }, + "dimensions" : [ { + "type" : "default", + "dimension" : "cityName", + "outputName" : "d0", + "outputType" : "STRING" + } ], + "aggregations" : [ { + "type" : "filtered", + "aggregator" : { + "type" : "count", + "name" : "a0" + }, + "filter" : { + "type" : "and", + "fields" : [ { + "type" : "not", + "field" : { + "type" : "null", + "column" : "channel" + } + }, { + "type" : "range", + "column" : "delta", + "matchValueType" : "LONG", + "lower" : 0, + "lowerOpen" : true + } ] + }, + "name" : "a0" + }, { + "type" : "count", + "name" : "a1" + } ], + "limitSpec" : { + "type" : "NoopLimitSpec" + }, + "context" : { + "__user" : null, + "finalize" : true, + "maxParseExceptions" : 0, + "plannerStrategy" : "DECOUPLED", + "sqlQueryId" : "8e797c48-d9eb-4bfb-831a-35ebd71c32b0", + "sqlStringifyArrays" : false + } + } + }, + "signature" : [ { + "name" : "d0", + "type" : "STRING" + }, { + "name" : "a0", + "type" : "LONG" + }, { + "name" : "a1", + "type" : "LONG" + } ], + "maxWorkerCount" : 1 + }, + "phase" : "FINISHED", + "workerCount" : 1, + "partitionCount" : 1, + "output" : "localStorage", + "startTime" : "2024-07-17T19:47:19.234Z", + "duration" : 2 +} ] +!msqPlan LogicalSort(sort0=[$0], dir0=[ASC]) LogicalAggregate(group=[{0}], cnt=[COUNT($1)], aall=[COUNT()]) LogicalProject(cityName=[$2], $f1=[CASE(>($17, 0), $1, null:VARCHAR)]) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/DruidHook.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/DruidHook.java index 99aa44da0a3..19022fff148 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/DruidHook.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/DruidHook.java @@ -69,6 +69,7 @@ public interface DruidHook HookKey LOGICAL_PLAN = new HookKey<>("logicalPlan", RelNode.class); HookKey DRUID_PLAN = new HookKey<>("druidPlan", RelNode.class); HookKey SQL = new HookKey<>("sql", String.class); + HookKey MSQ_PLAN = new HookKey<>("msqPlan", String.class);; void invoke(HookKey key, T object); diff --git a/sql/src/test/java/org/apache/druid/quidem/DruidQuidemCommandHandler.java b/sql/src/test/java/org/apache/druid/quidem/DruidQuidemCommandHandler.java index 86876cb90ee..c4fb14553d1 100644 --- a/sql/src/test/java/org/apache/druid/quidem/DruidQuidemCommandHandler.java +++ b/sql/src/test/java/org/apache/druid/quidem/DruidQuidemCommandHandler.java @@ -64,6 +64,9 @@ public class DruidQuidemCommandHandler implements CommandHandler if (line.startsWith("nativePlan")) { return new NativePlanCommand(lines, content); } + if (line.startsWith("msqPlan")) { + return new MSQPlanCommand(lines, content); + } return null; } @@ -186,6 +189,33 @@ public class DruidQuidemCommandHandler implements CommandHandler } } + /** + * Handles plan commands captured via {@link Hook}. + */ + abstract static class AbstractXPlanCommand extends AbstractPlanCommand + { + HookKey hook; + + AbstractXPlanCommand(List lines, List content, DruidHook.HookKey hook) + { + super(lines, content); + this.hook = hook; + } + + @Override + protected final void executeExplain(Context x) throws IOException + { + List logged = new ArrayList<>(); + try (Closeable unhook = DruidHook.withHook(hook, (key, relNode) -> { + logged.add(relNode); + })) { + executeQuery(x); + } + + x.echo(logged); + } + } + static class LogicalPlanCommand extends AbstractRelPlanCommand { LogicalPlanCommand(List lines, List content) @@ -209,4 +239,11 @@ public class DruidQuidemCommandHandler implements CommandHandler super(lines, content, DruidHook.CONVERTED_PLAN); } } + static class MSQPlanCommand extends AbstractXPlanCommand + { + MSQPlanCommand(List lines, List content) + { + super(lines, content, DruidHook.MSQ_PLAN); + } + } }