wire-in hookdispatcher thru connection/etc

This commit is contained in:
Zoltan Haindrich 2024-07-30 12:29:36 +00:00
parent 78b75d3e8e
commit 9ac26e3a89
9 changed files with 58 additions and 18 deletions

View File

@ -22,6 +22,7 @@ package org.apache.druid.quidem;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import javax.inject.Named;
import javax.ws.rs.GET;
@ -40,11 +41,13 @@ public class QuidemCaptureResource
.getPathFromProjectRoot("quidem-it/src/test/quidem/org.apache.druid.quidem.QTest");
private URI quidemURI;
private QuidemRecorder recorder = null;
private DruidHookDispatcher hookDispatcher;
@Inject
public QuidemCaptureResource(@Named("quidem") URI quidemURI)
public QuidemCaptureResource(@Named("quidem") URI quidemURI, DruidHookDispatcher hookDispatcher)
{
this.quidemURI = quidemURI;
this.hookDispatcher = hookDispatcher;
}
@GET
@ -55,6 +58,7 @@ public class QuidemCaptureResource
stopIfRunning();
recorder = new QuidemRecorder(
quidemURI,
hookDispatcher,
genRecordFilePath()
);
return recorder.toString();

View File

@ -20,6 +20,7 @@
package org.apache.druid.quidem;
import org.apache.druid.sql.hook.DruidHook;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import java.io.File;
import java.io.FileNotFoundException;
@ -34,9 +35,11 @@ public class QuidemRecorder implements AutoCloseable, DruidHook<String>
{
private PrintStream printStream;
private File file;
private DruidHookDispatcher hookDispatcher;
public QuidemRecorder(URI quidemURI, File file)
public QuidemRecorder(URI quidemURI, DruidHookDispatcher hookDispatcher, File file)
{
this.hookDispatcher = hookDispatcher;
this.file = file;
try {
this.printStream = new PrintStream(new FileOutputStream(file), true, StandardCharsets.UTF_8.name());
@ -47,7 +50,7 @@ public class QuidemRecorder implements AutoCloseable, DruidHook<String>
printStream.println("#started " + new Date());
printStream.println("!use " + quidemURI);
printStream.println("!set outputformat mysql");
DruidHook.register(DruidHook.SQL, this);
hookDispatcher.register(DruidHook.SQL, this);
}
@Override
@ -57,7 +60,7 @@ public class QuidemRecorder implements AutoCloseable, DruidHook<String>
printStream.close();
printStream = null;
}
DruidHook.unregister(DruidHook.SQL, this);
hookDispatcher.unregister(DruidHook.SQL, this);
}
@Override

View File

@ -395,7 +395,8 @@ public class CalciteRulesManager
public RelNode run(RelOptPlanner planner, RelNode rel, RelTraitSet requiredOutputTraits,
List<RelOptMaterialization> materializations, List<RelOptLattice> lattices)
{
DruidHook.dispatch(DruidHook.LOGICAL_PLAN, rel);
PlannerContext pctx = planner.getContext().unwrapOrThrow(PlannerContext.class);
pctx.dispatchHook(DruidHook.LOGICAL_PLAN, rel);
return rel;
}
}

View File

@ -155,7 +155,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
isPrepared = true;
SqlNode validatedQueryNode = validatedQueryNode();
rootQueryRel = handlerContext.planner().rel(validatedQueryNode);
DruidHook.dispatch(DruidHook.CONVERTED_PLAN, rootQueryRel.rel);
handlerContext.plannerContext().dispatchHook(DruidHook.CONVERTED_PLAN, rootQueryRel.rel);
handlerContext.hook().captureQueryRel(rootQueryRel);
final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
final SqlValidator validator = handlerContext.planner().getValidator();
@ -592,7 +592,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
handlerContext.hook().captureDruidRel(druidRel);
DruidHook.dispatch(DruidHook.DRUID_PLAN, druidRel);
plannerContext.dispatchHook(DruidHook.DRUID_PLAN, druidRel);
if (explain != null) {
return planExplanation(possiblyLimitedRoot, druidRel, true);

View File

@ -78,31 +78,31 @@ public interface DruidHook<T>
@SuppressFBWarnings({"MS_OOI_PKGPROTECT"})
Map<HookKey<?>, List<DruidHook<?>>> GLOBAL = new HashMap<>();
static void register(HookKey<?> label, DruidHook<?> hook)
static void register1(HookKey<?> label, DruidHook<?> hook)
{
GLOBAL.computeIfAbsent(label, k -> new ArrayList<>()).add(hook);
}
static void unregister(HookKey<?> key, DruidHook<?> hook)
static void unregister1(HookKey<?> key, DruidHook<?> hook)
{
GLOBAL.get(key).remove(hook);
}
static <T> Closeable withHook(HookKey<T> key, DruidHook<T> hook)
static <T> Closeable withHook1(HookKey<T> key, DruidHook<T> hook)
{
register(key, hook);
register1(key, hook);
return new Closeable()
{
@Override
public void close()
{
unregister(key, hook);
unregister1(key, hook);
}
};
}
@SuppressWarnings({"rawtypes", "unchecked"})
static <T> void dispatch(HookKey<T> key, T object)
static <T> void dispatch12(HookKey<T> key, T object)
{
List<DruidHook<?>> hooks = GLOBAL.get(key);
if (hooks != null) {

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.hook;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.sql.hook.DruidHook.HookKey;
import java.io.Closeable;
@ -28,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
@LazySingleton
public class DruidHookDispatcher
{
@Inject

View File

@ -70,6 +70,7 @@ import org.apache.druid.sql.calcite.util.SqlTestFramework.Builder;
import org.apache.druid.sql.calcite.util.SqlTestFramework.PlannerComponentSupplier;
import org.apache.druid.sql.calcite.util.SqlTestFramework.QueryComponentSupplier;
import org.apache.druid.sql.guice.SqlModule;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.server.Server;
@ -138,9 +139,9 @@ public class DruidAvaticaTestDriver implements Driver
@Provides
@LazySingleton
public DruidConnectionExtras getConnectionExtras(ObjectMapper objectMapper)
public DruidConnectionExtras getConnectionExtras(ObjectMapper objectMapper, DruidHookDispatcher druidHookDispatcher)
{
return new DruidConnectionExtras.DruidConnectionExtrasImpl(objectMapper);
return new DruidConnectionExtras.DruidConnectionExtrasImpl(objectMapper, druidHookDispatcher);
}
@Provides

View File

@ -20,18 +20,25 @@
package org.apache.druid.quidem;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import java.sql.Connection;
public interface DruidConnectionExtras
{
ObjectMapper getObjectMapper();
DruidHookDispatcher getDruidHookDispatcher();
class DruidConnectionExtrasImpl implements DruidConnectionExtras
{
private final ObjectMapper objectMapper;
private final DruidHookDispatcher druidHookDispatcher;
public DruidConnectionExtrasImpl(ObjectMapper objectMapper)
public DruidConnectionExtrasImpl(ObjectMapper objectMapper, DruidHookDispatcher druidHookDispatcher)
{
this.objectMapper = objectMapper;
this.druidHookDispatcher = druidHookDispatcher;
}
@Override
@ -39,5 +46,20 @@ public interface DruidConnectionExtras
{
return objectMapper;
}
@Override
public DruidHookDispatcher getDruidHookDispatcher()
{
return druidHookDispatcher;
}
}
static DruidConnectionExtras unwrapOrThrow(Connection connection)
{
if(connection instanceof DruidConnectionExtras ) {
return (DruidConnectionExtras) connection;
}
throw new UnsupportedOperationException("Expected DruidConnectionExtras to be implemented by connection!");
}
}

View File

@ -37,10 +37,12 @@ import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.hook.DruidHook;
import org.apache.druid.sql.hook.DruidHook.HookKey;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import java.io.Closeable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
@ -167,10 +169,11 @@ public class DruidQuidemCommandHandler implements CommandHandler
}
@Override
protected final void executeExplain(Context x) throws IOException
protected final void executeExplain(Context x) throws IOException, SQLException
{
DruidHookDispatcher dhp = unwrapDruidHookDispatcher(x);
List<RelNode> logged = new ArrayList<>();
try (Closeable unhook = DruidHook.withHook(hook, (key, relNode) -> {
try (Closeable unhook = dhp.withHook(hook, (key, relNode) -> {
logged.add(relNode);
})) {
executeQuery(x);
@ -184,6 +187,10 @@ public class DruidQuidemCommandHandler implements CommandHandler
x.echo(ImmutableList.of(str));
}
}
protected final DruidHookDispatcher unwrapDruidHookDispatcher(Context x) {
return DruidConnectionExtras.unwrapOrThrow(x.connection()).getDruidHookDispatcher();
}
}
static class LogicalPlanCommand extends AbstractRelPlanCommand