add exploratory msqPlan cmd

This commit is contained in:
Zoltan Haindrich 2024-07-17 19:48:08 +00:00
parent 8b26e490e9
commit 70ff2a3e97
4 changed files with 265 additions and 1 deletions

View File

@ -20,6 +20,8 @@
package org.apache.druid.sql.avatica;
//package org.apache.druid.msq.exec;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.calcite.avatica.Meta;
@ -35,6 +37,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.apache.druid.sql.calcite.run.DruidHook;
import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.Collections;
@ -43,6 +46,7 @@ import java.util.List;
public class MSQDruidMeta extends DruidMeta
{
protected final MSQTestOverlordServiceClient overlordClient;
private final ObjectMapper objectMapper;
@Inject
public MSQDruidMeta(
@ -50,10 +54,13 @@ public class MSQDruidMeta extends DruidMeta
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final AuthenticatorMapper authMapper,
final MSQTestOverlordServiceClient overlordClient)
final MSQTestOverlordServiceClient overlordClient,
final ObjectMapper objectMapper
)
{
super(sqlStatementFactory, config, errorHandler, authMapper);
this.overlordClient = overlordClient;
this.objectMapper = objectMapper;
}
@Override
@ -84,6 +91,16 @@ public class MSQDruidMeta extends DruidMeta
// convertColumnAndTypeToRowSignature(payload.getResults().getSignature()), resultRows
// )
// );
try {
String str = objectMapper
.writerWithDefaultPrettyPrinter()
.writeValueAsString(payload.getStages());
DruidHook.dispatch(DruidHook.MSQ_PLAN, str);
}
catch (JsonProcessingException e) {
DruidHook.dispatch(DruidHook.MSQ_PLAN, "error happened during json serialization");
}
Signature signature = makeSignature(druidStatement, payload.getResults().getSignature());
@SuppressWarnings("unchecked")

View File

@ -16,6 +16,215 @@ order by 1;
(2 rows)
!ok
[ {
"stageNumber" : 0,
"definition" : {
"id" : "query-8e797c48-d9eb-4bfb-831a-35ebd71c32b0_0",
"input" : [ {
"type" : "table",
"dataSource" : "wikipedia",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"filterFields" : [ "cityName" ]
} ],
"processor" : {
"type" : "groupByPreShuffle",
"query" : {
"queryType" : "groupBy",
"dataSource" : {
"type" : "inputNumber",
"inputNumber" : 0
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"granularity" : {
"type" : "all"
},
"dimensions" : [ {
"type" : "default",
"dimension" : "cityName",
"outputName" : "d0",
"outputType" : "STRING"
} ],
"aggregations" : [ {
"type" : "filtered",
"aggregator" : {
"type" : "count",
"name" : "a0"
},
"filter" : {
"type" : "and",
"fields" : [ {
"type" : "not",
"field" : {
"type" : "null",
"column" : "channel"
}
}, {
"type" : "range",
"column" : "delta",
"matchValueType" : "LONG",
"lower" : 0,
"lowerOpen" : true
} ]
},
"name" : "a0"
}, {
"type" : "count",
"name" : "a1"
} ],
"limitSpec" : {
"type" : "NoopLimitSpec"
},
"context" : {
"__user" : null,
"finalize" : true,
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : "8e797c48-d9eb-4bfb-831a-35ebd71c32b0",
"sqlStringifyArrays" : false
}
}
},
"signature" : [ {
"name" : "d0",
"type" : "STRING"
}, {
"name" : "a0",
"type" : "LONG"
}, {
"name" : "a1",
"type" : "LONG"
} ],
"shuffleSpec" : {
"type" : "maxCount",
"clusterBy" : {
"columns" : [ {
"columnName" : "d0",
"order" : "ASCENDING"
} ]
},
"partitions" : 1,
"aggregate" : true
},
"maxWorkerCount" : 1
},
"phase" : "FINISHED",
"workerCount" : 1,
"partitionCount" : 1,
"shuffle" : "globalSort",
"output" : "localStorage",
"startTime" : "2024-07-17T19:47:18.571Z",
"duration" : 664,
"sort" : true
}, {
"stageNumber" : 1,
"definition" : {
"id" : "query-8e797c48-d9eb-4bfb-831a-35ebd71c32b0_1",
"input" : [ {
"type" : "stage",
"stage" : 0
} ],
"processor" : {
"type" : "groupByPostShuffle",
"query" : {
"queryType" : "groupBy",
"dataSource" : {
"type" : "inputNumber",
"inputNumber" : 0
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"granularity" : {
"type" : "all"
},
"dimensions" : [ {
"type" : "default",
"dimension" : "cityName",
"outputName" : "d0",
"outputType" : "STRING"
} ],
"aggregations" : [ {
"type" : "filtered",
"aggregator" : {
"type" : "count",
"name" : "a0"
},
"filter" : {
"type" : "and",
"fields" : [ {
"type" : "not",
"field" : {
"type" : "null",
"column" : "channel"
}
}, {
"type" : "range",
"column" : "delta",
"matchValueType" : "LONG",
"lower" : 0,
"lowerOpen" : true
} ]
},
"name" : "a0"
}, {
"type" : "count",
"name" : "a1"
} ],
"limitSpec" : {
"type" : "NoopLimitSpec"
},
"context" : {
"__user" : null,
"finalize" : true,
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : "8e797c48-d9eb-4bfb-831a-35ebd71c32b0",
"sqlStringifyArrays" : false
}
}
},
"signature" : [ {
"name" : "d0",
"type" : "STRING"
}, {
"name" : "a0",
"type" : "LONG"
}, {
"name" : "a1",
"type" : "LONG"
} ],
"maxWorkerCount" : 1
},
"phase" : "FINISHED",
"workerCount" : 1,
"partitionCount" : 1,
"output" : "localStorage",
"startTime" : "2024-07-17T19:47:19.234Z",
"duration" : 2
} ]
!msqPlan
LogicalSort(sort0=[$0], dir0=[ASC])
LogicalAggregate(group=[{0}], cnt=[COUNT($1)], aall=[COUNT()])
LogicalProject(cityName=[$2], $f1=[CASE(>($17, 0), $1, null:VARCHAR)])

View File

@ -69,6 +69,7 @@ public interface DruidHook<T>
HookKey<RelNode> LOGICAL_PLAN = new HookKey<>("logicalPlan", RelNode.class);
HookKey<RelNode> DRUID_PLAN = new HookKey<>("druidPlan", RelNode.class);
HookKey<String> SQL = new HookKey<>("sql", String.class);
HookKey<String> MSQ_PLAN = new HookKey<>("msqPlan", String.class);;
void invoke(HookKey<T> key, T object);

View File

@ -64,6 +64,9 @@ public class DruidQuidemCommandHandler implements CommandHandler
if (line.startsWith("nativePlan")) {
return new NativePlanCommand(lines, content);
}
if (line.startsWith("msqPlan")) {
return new MSQPlanCommand(lines, content);
}
return null;
}
@ -186,6 +189,33 @@ public class DruidQuidemCommandHandler implements CommandHandler
}
}
/**
* Handles plan commands captured via {@link Hook}.
*/
abstract static class AbstractXPlanCommand extends AbstractPlanCommand
{
HookKey<String> hook;
AbstractXPlanCommand(List<String> lines, List<String> content, DruidHook.HookKey<String> hook)
{
super(lines, content);
this.hook = hook;
}
@Override
protected final void executeExplain(Context x) throws IOException
{
List<String> logged = new ArrayList<>();
try (Closeable unhook = DruidHook.withHook(hook, (key, relNode) -> {
logged.add(relNode);
})) {
executeQuery(x);
}
x.echo(logged);
}
}
static class LogicalPlanCommand extends AbstractRelPlanCommand
{
LogicalPlanCommand(List<String> lines, List<String> content)
@ -209,4 +239,11 @@ public class DruidQuidemCommandHandler implements CommandHandler
super(lines, content, DruidHook.CONVERTED_PLAN);
}
}
static class MSQPlanCommand extends AbstractXPlanCommand
{
MSQPlanCommand(List<String> lines, List<String> content)
{
super(lines, content, DruidHook.MSQ_PLAN);
}
}
}