add exploratory msqPlan cmd

This commit is contained in:
Zoltan Haindrich 2024-07-17 19:48:08 +00:00
parent 8b26e490e9
commit 70ff2a3e97
4 changed files with 265 additions and 1 deletions

View File

@ -20,6 +20,8 @@
package org.apache.druid.sql.avatica; package org.apache.druid.sql.avatica;
//package org.apache.druid.msq.exec; //package org.apache.druid.msq.exec;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.calcite.avatica.Meta; import org.apache.calcite.avatica.Meta;
@ -35,6 +37,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.AuthenticatorMapper; import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem; import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.apache.druid.sql.calcite.run.DruidHook;
import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.Collections; import java.util.Collections;
@ -43,6 +46,7 @@ import java.util.List;
public class MSQDruidMeta extends DruidMeta public class MSQDruidMeta extends DruidMeta
{ {
protected final MSQTestOverlordServiceClient overlordClient; protected final MSQTestOverlordServiceClient overlordClient;
private final ObjectMapper objectMapper;
@Inject @Inject
public MSQDruidMeta( public MSQDruidMeta(
@ -50,10 +54,13 @@ public class MSQDruidMeta extends DruidMeta
final AvaticaServerConfig config, final AvaticaServerConfig config,
final ErrorHandler errorHandler, final ErrorHandler errorHandler,
final AuthenticatorMapper authMapper, final AuthenticatorMapper authMapper,
final MSQTestOverlordServiceClient overlordClient) final MSQTestOverlordServiceClient overlordClient,
final ObjectMapper objectMapper
)
{ {
super(sqlStatementFactory, config, errorHandler, authMapper); super(sqlStatementFactory, config, errorHandler, authMapper);
this.overlordClient = overlordClient; this.overlordClient = overlordClient;
this.objectMapper = objectMapper;
} }
@Override @Override
@ -84,6 +91,16 @@ public class MSQDruidMeta extends DruidMeta
// convertColumnAndTypeToRowSignature(payload.getResults().getSignature()), resultRows // convertColumnAndTypeToRowSignature(payload.getResults().getSignature()), resultRows
// ) // )
// ); // );
try {
String str = objectMapper
.writerWithDefaultPrettyPrinter()
.writeValueAsString(payload.getStages());
DruidHook.dispatch(DruidHook.MSQ_PLAN, str);
}
catch (JsonProcessingException e) {
DruidHook.dispatch(DruidHook.MSQ_PLAN, "error happened during json serialization");
}
Signature signature = makeSignature(druidStatement, payload.getResults().getSignature()); Signature signature = makeSignature(druidStatement, payload.getResults().getSignature());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -16,6 +16,215 @@ order by 1;
(2 rows) (2 rows)
!ok !ok
[ {
"stageNumber" : 0,
"definition" : {
"id" : "query-8e797c48-d9eb-4bfb-831a-35ebd71c32b0_0",
"input" : [ {
"type" : "table",
"dataSource" : "wikipedia",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"filterFields" : [ "cityName" ]
} ],
"processor" : {
"type" : "groupByPreShuffle",
"query" : {
"queryType" : "groupBy",
"dataSource" : {
"type" : "inputNumber",
"inputNumber" : 0
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"granularity" : {
"type" : "all"
},
"dimensions" : [ {
"type" : "default",
"dimension" : "cityName",
"outputName" : "d0",
"outputType" : "STRING"
} ],
"aggregations" : [ {
"type" : "filtered",
"aggregator" : {
"type" : "count",
"name" : "a0"
},
"filter" : {
"type" : "and",
"fields" : [ {
"type" : "not",
"field" : {
"type" : "null",
"column" : "channel"
}
}, {
"type" : "range",
"column" : "delta",
"matchValueType" : "LONG",
"lower" : 0,
"lowerOpen" : true
} ]
},
"name" : "a0"
}, {
"type" : "count",
"name" : "a1"
} ],
"limitSpec" : {
"type" : "NoopLimitSpec"
},
"context" : {
"__user" : null,
"finalize" : true,
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : "8e797c48-d9eb-4bfb-831a-35ebd71c32b0",
"sqlStringifyArrays" : false
}
}
},
"signature" : [ {
"name" : "d0",
"type" : "STRING"
}, {
"name" : "a0",
"type" : "LONG"
}, {
"name" : "a1",
"type" : "LONG"
} ],
"shuffleSpec" : {
"type" : "maxCount",
"clusterBy" : {
"columns" : [ {
"columnName" : "d0",
"order" : "ASCENDING"
} ]
},
"partitions" : 1,
"aggregate" : true
},
"maxWorkerCount" : 1
},
"phase" : "FINISHED",
"workerCount" : 1,
"partitionCount" : 1,
"shuffle" : "globalSort",
"output" : "localStorage",
"startTime" : "2024-07-17T19:47:18.571Z",
"duration" : 664,
"sort" : true
}, {
"stageNumber" : 1,
"definition" : {
"id" : "query-8e797c48-d9eb-4bfb-831a-35ebd71c32b0_1",
"input" : [ {
"type" : "stage",
"stage" : 0
} ],
"processor" : {
"type" : "groupByPostShuffle",
"query" : {
"queryType" : "groupBy",
"dataSource" : {
"type" : "inputNumber",
"inputNumber" : 0
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"granularity" : {
"type" : "all"
},
"dimensions" : [ {
"type" : "default",
"dimension" : "cityName",
"outputName" : "d0",
"outputType" : "STRING"
} ],
"aggregations" : [ {
"type" : "filtered",
"aggregator" : {
"type" : "count",
"name" : "a0"
},
"filter" : {
"type" : "and",
"fields" : [ {
"type" : "not",
"field" : {
"type" : "null",
"column" : "channel"
}
}, {
"type" : "range",
"column" : "delta",
"matchValueType" : "LONG",
"lower" : 0,
"lowerOpen" : true
} ]
},
"name" : "a0"
}, {
"type" : "count",
"name" : "a1"
} ],
"limitSpec" : {
"type" : "NoopLimitSpec"
},
"context" : {
"__user" : null,
"finalize" : true,
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : "8e797c48-d9eb-4bfb-831a-35ebd71c32b0",
"sqlStringifyArrays" : false
}
}
},
"signature" : [ {
"name" : "d0",
"type" : "STRING"
}, {
"name" : "a0",
"type" : "LONG"
}, {
"name" : "a1",
"type" : "LONG"
} ],
"maxWorkerCount" : 1
},
"phase" : "FINISHED",
"workerCount" : 1,
"partitionCount" : 1,
"output" : "localStorage",
"startTime" : "2024-07-17T19:47:19.234Z",
"duration" : 2
} ]
!msqPlan
LogicalSort(sort0=[$0], dir0=[ASC]) LogicalSort(sort0=[$0], dir0=[ASC])
LogicalAggregate(group=[{0}], cnt=[COUNT($1)], aall=[COUNT()]) LogicalAggregate(group=[{0}], cnt=[COUNT($1)], aall=[COUNT()])
LogicalProject(cityName=[$2], $f1=[CASE(>($17, 0), $1, null:VARCHAR)]) LogicalProject(cityName=[$2], $f1=[CASE(>($17, 0), $1, null:VARCHAR)])

View File

@ -69,6 +69,7 @@ public interface DruidHook<T>
HookKey<RelNode> LOGICAL_PLAN = new HookKey<>("logicalPlan", RelNode.class); HookKey<RelNode> LOGICAL_PLAN = new HookKey<>("logicalPlan", RelNode.class);
HookKey<RelNode> DRUID_PLAN = new HookKey<>("druidPlan", RelNode.class); HookKey<RelNode> DRUID_PLAN = new HookKey<>("druidPlan", RelNode.class);
HookKey<String> SQL = new HookKey<>("sql", String.class); HookKey<String> SQL = new HookKey<>("sql", String.class);
HookKey<String> MSQ_PLAN = new HookKey<>("msqPlan", String.class);;
void invoke(HookKey<T> key, T object); void invoke(HookKey<T> key, T object);

View File

@ -64,6 +64,9 @@ public class DruidQuidemCommandHandler implements CommandHandler
if (line.startsWith("nativePlan")) { if (line.startsWith("nativePlan")) {
return new NativePlanCommand(lines, content); return new NativePlanCommand(lines, content);
} }
if (line.startsWith("msqPlan")) {
return new MSQPlanCommand(lines, content);
}
return null; return null;
} }
@ -186,6 +189,33 @@ public class DruidQuidemCommandHandler implements CommandHandler
} }
} }
/**
* Handles plan commands captured via {@link Hook}.
*/
abstract static class AbstractXPlanCommand extends AbstractPlanCommand
{
HookKey<String> hook;
AbstractXPlanCommand(List<String> lines, List<String> content, DruidHook.HookKey<String> hook)
{
super(lines, content);
this.hook = hook;
}
@Override
protected final void executeExplain(Context x) throws IOException
{
List<String> logged = new ArrayList<>();
try (Closeable unhook = DruidHook.withHook(hook, (key, relNode) -> {
logged.add(relNode);
})) {
executeQuery(x);
}
x.echo(logged);
}
}
static class LogicalPlanCommand extends AbstractRelPlanCommand static class LogicalPlanCommand extends AbstractRelPlanCommand
{ {
LogicalPlanCommand(List<String> lines, List<String> content) LogicalPlanCommand(List<String> lines, List<String> content)
@ -209,4 +239,11 @@ public class DruidQuidemCommandHandler implements CommandHandler
super(lines, content, DruidHook.CONVERTED_PLAN); super(lines, content, DruidHook.CONVERTED_PLAN);
} }
} }
static class MSQPlanCommand extends AbstractXPlanCommand
{
MSQPlanCommand(List<String> lines, List<String> content)
{
super(lines, content, DruidHook.MSQ_PLAN);
}
}
} }