Support explain in decoupled planning and log native plan consistently with DruidHook (#17101)

* enables to use DruidHook for native plan logging
* qudiem tests doesn't necessarily need to run the query to get an explain - this helps during development as if there is a runtime issue it could still be explained in the test
This commit is contained in:
Zoltan Haindrich 2024-09-20 10:53:43 +02:00 committed by GitHub
parent 635e418131
commit 2eee470f6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 132 additions and 58 deletions

View File

@ -68,6 +68,7 @@ import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.sql.destination.ExportDestination;
import org.apache.druid.sql.destination.IngestDestination;
import org.apache.druid.sql.destination.TableDestination;
import org.apache.druid.sql.hook.DruidHook;
import org.apache.druid.sql.http.ResultFormat;
import org.joda.time.Interval;
@ -117,6 +118,8 @@ public class MSQTaskQueryMaker implements QueryMaker
public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
Hook.QUERY_PLAN.run(druidQuery.getQuery());
plannerContext.dispatchHook(DruidHook.NATIVE_PLAN, druidQuery.getQuery());
String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());
// SQL query context: context provided by the user, and potentially modified by handlers during planning.

View File

@ -80,6 +80,12 @@ public class MSQDrillWindowQueryTest extends DrillWindowQueryTest
{
return injector.getInstance(MSQTaskSqlEngine.class);
}
@Override
public Boolean isExplainSupported()
{
return false;
}
}
@Override

View File

@ -25,7 +25,6 @@ import java.io.File;
public class MSQQuidemTest extends DruidQuidemTestBase
{
public MSQQuidemTest()
{
super();

View File

@ -336,4 +336,10 @@ public class ExposedAsBrokerQueryComponentSupplierWrapper implements QueryCompon
}
);
}
@Override
public Boolean isExplainSupported()
{
return delegate.isExplainSupported();
}
}

View File

@ -406,7 +406,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) {
DruidRel<?> druidRel = (DruidRel<?>) rel;
try {
explanation = explainSqlPlanAsNativeQueries(relRoot, druidRel);
explanation = explainSqlPlanAsNativeQueries(plannerContext, relRoot, druidRel);
}
catch (Exception ex) {
log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan.");
@ -453,7 +453,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
* @return A string representing an array of native queries that correspond to the given SQL query, in JSON format
* @throws JsonProcessingException
*/
private String explainSqlPlanAsNativeQueries(final RelRoot relRoot, DruidRel<?> rel) throws JsonProcessingException
private String explainSqlPlanAsNativeQueries(PlannerContext plannerContext, final RelRoot relRoot, DruidRel<?> rel) throws JsonProcessingException
{
ObjectMapper jsonMapper = handlerContext.jsonMapper();
List<DruidQuery> druidQueryList;
@ -470,6 +470,9 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
for (DruidQuery druidQuery : druidQueryList) {
Query<?> nativeQuery = druidQuery.getQuery();
plannerContext.dispatchHook(DruidHook.NATIVE_PLAN, nativeQuery);
ObjectNode objectNode = jsonMapper.createObjectNode();
objectNode.set("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class));
objectNode.set("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class));
@ -582,6 +585,11 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
DruidQuery finalBaseQuery = baseQuery;
final Supplier<QueryResponse<Object[]>> resultsSupplier = () -> plannerContext.getQueryMaker().runQuery(finalBaseQuery);
if (explain != null) {
plannerContext.dispatchHook(DruidHook.NATIVE_PLAN, finalBaseQuery.getQuery());
return planExplanation(possiblyLimitedRoot, newRoot, true);
}
return new PlannerResult(resultsSupplier, finalBaseQuery.getOutputRowType());
} else {
final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(

View File

@ -51,6 +51,7 @@ import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.hook.DruidHook;
import org.joda.time.Interval;
import java.util.ArrayList;
@ -171,6 +172,7 @@ public class NativeQueryMaker implements QueryMaker
)
{
Hook.QUERY_PLAN.run(query);
plannerContext.dispatchHook(DruidHook.NATIVE_PLAN, query);
if (query.getId() == null) {
final String queryId = UUID.randomUUID().toString();

View File

@ -21,6 +21,7 @@ package org.apache.druid.sql.hook;
import com.google.errorprone.annotations.Immutable;
import org.apache.calcite.rel.RelNode;
import org.apache.druid.query.Query;
import java.util.Objects;
/**
@ -36,6 +37,8 @@ public interface DruidHook<T>
HookKey<RelNode> DRUID_PLAN = new HookKey<>("druidPlan", RelNode.class);
HookKey<String> SQL = new HookKey<>("sql", String.class);
HookKey<String> MSQ_PLAN = new HookKey<>("msqPlan", String.class);
@SuppressWarnings("rawtypes")
HookKey<Query> NATIVE_PLAN = new HookKey<>("nativePlan", Query.class);
@Immutable
class HookKey<T>

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import org.apache.calcite.avatica.server.AbstractAvaticaHandler;
import org.apache.druid.guice.DruidInjectorBuilder;
@ -120,9 +121,13 @@ public class DruidAvaticaTestDriver implements Driver
@Provides
@LazySingleton
public DruidConnectionExtras getConnectionExtras(ObjectMapper objectMapper, DruidHookDispatcher druidHookDispatcher)
public DruidConnectionExtras getConnectionExtras(
ObjectMapper objectMapper,
DruidHookDispatcher druidHookDispatcher,
@Named("isExplainSupported") Boolean isExplainSupported
)
{
return new DruidConnectionExtras.DruidConnectionExtrasImpl(objectMapper, druidHookDispatcher);
return new DruidConnectionExtras.DruidConnectionExtrasImpl(objectMapper, druidHookDispatcher, isExplainSupported);
}
@Provides
@ -281,6 +286,12 @@ public class DruidAvaticaTestDriver implements Driver
{
return delegate.getPlannerComponentSupplier();
}
@Override
public Boolean isExplainSupported()
{
return delegate.isExplainSupported();
}
}
protected File createTempFolder(String prefix)

View File

@ -30,15 +30,19 @@ public interface DruidConnectionExtras
DruidHookDispatcher getDruidHookDispatcher();
boolean isExplainSupported();
class DruidConnectionExtrasImpl implements DruidConnectionExtras
{
private final ObjectMapper objectMapper;
private final DruidHookDispatcher druidHookDispatcher;
private final boolean isExplainSupported;
public DruidConnectionExtrasImpl(ObjectMapper objectMapper, DruidHookDispatcher druidHookDispatcher)
public DruidConnectionExtrasImpl(ObjectMapper objectMapper, DruidHookDispatcher druidHookDispatcher, boolean isExplainSupported)
{
this.objectMapper = objectMapper;
this.druidHookDispatcher = druidHookDispatcher;
this.isExplainSupported = isExplainSupported;
}
@Override
@ -52,6 +56,12 @@ public interface DruidConnectionExtras
{
return druidHookDispatcher;
}
@Override
public boolean isExplainSupported()
{
return isExplainSupported;
}
}
static DruidConnectionExtras unwrapOrThrow(Connection connection)
@ -61,4 +71,5 @@ public interface DruidConnectionExtras
}
throw new UnsupportedOperationException("Expected DruidConnectionExtras to be implemented by connection!");
}
}

View File

@ -34,7 +34,6 @@ import org.apache.calcite.util.Util;
import org.apache.druid.query.Query;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.hook.DruidHook;
import org.apache.druid.sql.hook.DruidHook.HookKey;
import org.apache.druid.sql.hook.DruidHookDispatcher;
@ -45,11 +44,9 @@ import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class DruidQuidemCommandHandler implements CommandHandler
{
@Override
public Command parseCommand(List<String> lines, List<String> content, String line)
{
@ -83,33 +80,63 @@ public class DruidQuidemCommandHandler implements CommandHandler
}
@Override
public final String describe(Context x)
public final String describe(Context context)
{
return commandName() + " [sql: " + x.previousSqlCommand().sql + "]";
return commandName() + " [sql: " + context.previousSqlCommand().sql + "]";
}
@Override
public final void execute(Context x, boolean execute)
public final void execute(Context context, boolean execute)
{
if (execute) {
try {
executeExplain(x);
executeExplain(context);
}
catch (Exception e) {
throw new Error(e);
}
} else {
x.echo(content);
context.echo(content);
}
x.echo(lines);
context.echo(lines);
}
protected final void executeQuery(Context x)
protected final <T> List<T> executeExplainCollectHookValues(Context context, HookKey<T> hook) throws IOException
{
DruidHookDispatcher dhp = unwrapDruidHookDispatcher(context);
List<T> logged = new ArrayList<>();
try (Closeable unhook = dhp.withHook(hook, (key, value) -> {
logged.add(value);
})) {
executeExplainQuery(context);
}
return logged;
}
protected final void executeQuery(Context context)
{
final SqlCommand sqlCommand = context.previousSqlCommand();
executeQuery(context, sqlCommand.sql);
}
protected final void executeExplainQuery(Context context)
{
boolean isExplainSupported = DruidConnectionExtras.unwrapOrThrow(context.connection()).isExplainSupported();
final SqlCommand sqlCommand = context.previousSqlCommand();
if (isExplainSupported) {
executeQuery(context, "explain plan for " + sqlCommand.sql);
} else {
executeQuery(context, sqlCommand.sql);
}
}
protected final void executeQuery(Context context, String sql)
{
final SqlCommand sqlCommand = x.previousSqlCommand();
try (
final Statement statement = x.connection().createStatement();
final ResultSet resultSet = statement.executeQuery(sqlCommand.sql)) {
final Statement statement = context.connection().createStatement();
final ResultSet resultSet = statement.executeQuery(sql)) {
// throw away all results
while (resultSet.next()) {
Util.discard(false);
@ -120,12 +147,12 @@ public class DruidQuidemCommandHandler implements CommandHandler
}
}
protected final DruidHookDispatcher unwrapDruidHookDispatcher(Context x)
protected final DruidHookDispatcher unwrapDruidHookDispatcher(Context context)
{
return DruidConnectionExtras.unwrapOrThrow(x.connection()).getDruidHookDispatcher();
return DruidConnectionExtras.unwrapOrThrow(context.connection()).getDruidHookDispatcher();
}
protected abstract void executeExplain(Context x) throws Exception;
protected abstract void executeExplain(Context context) throws Exception;
}
/** Command that prints the plan for the current query. */
@ -137,27 +164,18 @@ public class DruidQuidemCommandHandler implements CommandHandler
}
@Override
protected void executeExplain(Context x) throws Exception
@SuppressWarnings("rawtypes")
protected void executeExplain(Context context) throws Exception
{
DruidConnectionExtras connectionExtras = (DruidConnectionExtras) x.connection();
DruidConnectionExtras connectionExtras = DruidConnectionExtras.unwrapOrThrow(context.connection());
ObjectMapper objectMapper = connectionExtras.getObjectMapper();
QueryLogHook qlh = new QueryLogHook(objectMapper);
qlh.logQueriesForGlobal(
() -> {
executeQuery(x);
}
);
List<Query<?>> queries = qlh.getRecordedQueries();
List<Query> logged = executeExplainCollectHookValues(context, DruidHook.NATIVE_PLAN);
queries = queries
.stream()
.map(q -> BaseCalciteQueryTest.recursivelyClearContext(q, objectMapper))
.collect(Collectors.toList());
for (Query<?> query : queries) {
for (Query<?> query : logged) {
query = BaseCalciteQueryTest.recursivelyClearContext(query, objectMapper);
String str = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(query);
x.echo(ImmutableList.of(str));
context.echo(ImmutableList.of(str));
}
}
}
@ -176,22 +194,16 @@ public class DruidQuidemCommandHandler implements CommandHandler
}
@Override
protected final void executeExplain(Context x) throws IOException
protected final void executeExplain(Context context) throws IOException
{
DruidHookDispatcher dhp = unwrapDruidHookDispatcher(x);
List<RelNode> logged = new ArrayList<>();
try (Closeable unhook = dhp.withHook(hook, (key, relNode) -> {
logged.add(relNode);
})) {
executeQuery(x);
}
List<RelNode> logged = executeExplainCollectHookValues(context, hook);
for (RelNode node : logged) {
if (node instanceof DruidRel<?>) {
node = ((DruidRel) node).unwrapLogicalPlan();
node = ((DruidRel<?>) node).unwrapLogicalPlan();
}
String str = RelOptUtil.dumpPlan("", node, SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
x.echo(ImmutableList.of(str));
context.echo(ImmutableList.of(str));
}
}
}
@ -210,17 +222,10 @@ public class DruidQuidemCommandHandler implements CommandHandler
}
@Override
protected final void executeExplain(Context x) throws IOException
protected final void executeExplain(Context context) throws IOException
{
DruidHookDispatcher dhp = unwrapDruidHookDispatcher(x);
List<String> logged = new ArrayList<>();
try (Closeable unhook = dhp.withHook(hook, (key, relNode) -> {
logged.add(relNode);
})) {
executeQuery(x);
}
x.echo(logged);
List<String> logged = executeExplainCollectHookValues(context, hook);
context.echo(logged);
}
}

View File

@ -197,6 +197,13 @@ public class SqlTestFramework
{
configureGuice(injectorBuilder);
}
/**
* Communicates if explain are supported.
*
* MSQ right now needs a full query run.
*/
Boolean isExplainSupported();
}
public interface PlannerComponentSupplier
@ -335,6 +342,12 @@ public class SqlTestFramework
{
tempDirProducer.close();
}
@Override
public Boolean isExplainSupported()
{
return true;
}
}
public static class StandardPlannerComponentSupplier implements PlannerComponentSupplier
@ -668,6 +681,13 @@ public class SqlTestFramework
{
return getTestConfig().getDruidTestURI();
}
@Provides
@Named("isExplainSupported")
public Boolean isExplainSupported()
{
return builder.componentSupplier.isExplainSupported();
}
}
public static final DruidViewMacroFactory DRUID_VIEW_MACRO_FACTORY = new TestDruidViewMacroFactory();