This commit is contained in:
Zoltan Haindrich 2024-08-06 05:50:11 +00:00
parent c8f9147810
commit 12cfde805e
1 changed files with 23 additions and 4 deletions

View File

@ -22,9 +22,11 @@ package org.apache.druid.sql.avatica;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.calcite.avatica.Meta; import org.apache.calcite.avatica.Meta;
import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataType;
import org.apache.commons.lang3.RegExUtils;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.report.MSQResultsReport.ColumnAndType; import org.apache.druid.msq.indexing.report.MSQResultsReport.ColumnAndType;
@ -42,6 +44,9 @@ import org.apache.druid.sql.hook.DruidHookDispatcher;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;
public class MSQDruidMeta extends DruidMeta public class MSQDruidMeta extends DruidMeta
{ {
@ -93,10 +98,8 @@ public class MSQDruidMeta extends DruidMeta
String str = objectMapper String str = objectMapper
.writerWithDefaultPrettyPrinter() .writerWithDefaultPrettyPrinter()
.writeValueAsString(payload.getStages()); .writeValueAsString(payload.getStages());
str = str.replaceAll(taskId, "<taskId>")
.replaceAll("\"startTime\" : .*", "\"startTime\" : __TIMESTAMP__") str = maskMSQPlan(str, taskId);
.replaceAll("\"duration\" : .*", "\"duration\" : __DURATION__")
.replaceAll("\"sqlQueryId\" : .*", "\"sqlQueryId\" : __SQL_QUERY_ID__");
hookDispatcher.dispatch(DruidHook.MSQ_PLAN, str); hookDispatcher.dispatch(DruidHook.MSQ_PLAN, str);
} }
@ -120,6 +123,22 @@ public class MSQDruidMeta extends DruidMeta
); );
} }
private static final Map<Pattern, String> REPLACEMENT_MAP = ImmutableMap.<Pattern, String>builder()
.put(Pattern.compile("\"startTime\" : .*"), "\"startTime\" : __TIMESTAMP__")
.put(Pattern.compile("\"duration\" : .*"), "\"duration\" : __DURATION__")
.put(Pattern.compile("\"sqlQueryId\" : .*"), "\"sqlQueryId\" : __SQL_QUERY_ID__")
.build();
private String maskMSQPlan(String str, String taskId)
{
Pattern taskIdPattern = Pattern.compile(Pattern.quote(taskId));
str = RegExUtils.replaceAll(str, taskIdPattern, "<taskId>");
for (Entry<Pattern, String> entry : REPLACEMENT_MAP.entrySet()) {
str = RegExUtils.replaceAll(str, entry.getKey(), entry.getValue());
}
return str;
}
private Signature makeSignature(AbstractDruidJdbcStatement druidStatement, List<ColumnAndType> cat) private Signature makeSignature(AbstractDruidJdbcStatement druidStatement, List<ColumnAndType> cat)
{ {
RowSignature sig = ColumnAndType.toRowSignature(cat); RowSignature sig = ColumnAndType.toRowSignature(cat);