Fix the error case when there are multi top level unions (#12017)

This is a follow up to the PR #11908. This fixes the bug in top level union all queries when there are more than 2 SQL subqueries are present.
This commit is contained in:
Laksh Singla 2021-12-07 01:12:02 +05:30 committed by GitHub
parent 590cf993c0
commit 44b2fb71ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 102 additions and 21 deletions

View File

@ -66,7 +66,6 @@ import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
@ -399,8 +398,13 @@ public class DruidPlanner implements Closeable
// Show the native queries instead of Calcite's explain if the legacy flag is turned off
if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) {
DruidRel<?> druidRel = (DruidRel<?>) rel;
try {
explanation = explainSqlPlanAsNativeQueries(druidRel);
}
catch (Exception ex) {
log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan");
}
}
}
final Set<Resource> resources =
plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet());
@ -411,10 +415,6 @@ public class DruidPlanner implements Closeable
log.error(jpe, "Encountered exception while serializing Resources for explain output");
resourcesString = null;
}
catch (ISE ise) {
log.error(ise, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan");
resourcesString = null;
}
final Supplier<Sequence<Object[]>> resultsSupplier = Suppliers.ofInstance(
Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString})));
return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()));
@ -431,25 +431,17 @@ public class DruidPlanner implements Closeable
*/
private String explainSqlPlanAsNativeQueries(DruidRel<?> rel) throws JsonProcessingException
{
// Only if rel is an instance of DruidUnionRel, do we run multiple native queries corresponding to single SQL query
// Also, DruidUnionRel can only be a top level node, so we don't need to check for this condition in the subsequent
// child nodes
ObjectMapper jsonMapper = plannerContext.getJsonMapper();
List<DruidQuery> druidQueryList;
if (rel instanceof DruidUnionRel) {
druidQueryList = rel.getInputs().stream().map(childRel -> (DruidRel<?>) childRel).map(childRel -> {
if (childRel instanceof DruidUnionRel) {
log.error("DruidUnionRel can only be the outermost RelNode. This error shouldn't be encountered");
throw new ISE("DruidUnionRel is only supported at the outermost RelNode.");
}
return childRel.toDruidQuery(false);
}).collect(Collectors.toList());
} else {
druidQueryList = ImmutableList.of(rel.toDruidQuery(false));
}
druidQueryList = flattenOutermostRel(rel)
.stream()
.map(druidRel -> druidRel.toDruidQuery(false))
.collect(Collectors.toList());
// Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when
// serializing the "queryType"
// serializing the "queryType". Another method would be to create a POJO containing query and signature, and then
// serializing it using normal list method.
ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode();
for (DruidQuery druidQuery : druidQueryList) {
@ -463,6 +455,47 @@ public class DruidPlanner implements Closeable
return jsonMapper.writeValueAsString(nativeQueriesArrayNode);
}
/**
* Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel}
* It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel}
* node
* For eg, a DruidRel structure of kind:
* DruidUnionRel
* DruidUnionRel
* DruidRel (A)
* DruidRel (B)
* DruidRel(C)
* will return [DruidRel(A), DruidRel(B), DruidRel(C)]
* @param outermostDruidRel The outermost rel which is to be flattened
* @return a list of DruidRel's which donot have a DruidUnionRel nested in between them
*/
private List<DruidRel<?>> flattenOutermostRel(DruidRel<?> outermostDruidRel)
{
List<DruidRel<?>> druidRels = new ArrayList<>();
flattenOutermostRel(outermostDruidRel, druidRels);
return druidRels;
}
/**
* Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if
* they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the
* nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()}
* @param druidRel The current relNode
* @param flattendListAccumulator Accumulator list which needs to be appended by this method
*/
private void flattenOutermostRel(DruidRel<?> druidRel, List<DruidRel<?>> flattendListAccumulator)
{
if (druidRel instanceof DruidUnionRel) {
DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel;
druidUnionRel.getInputs().forEach(innerRelNode -> {
DruidRel<?> innerDruidRelNode = (DruidRel<?>) innerRelNode; // This type conversion should always be possible
flattenOutermostRel(innerDruidRelNode, flattendListAccumulator);
});
} else {
flattendListAccumulator.add(druidRel);
}
}
/**
* This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
* is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in

View File

@ -133,6 +133,7 @@ import java.util.stream.Collectors;
public class CalciteQueryTest extends BaseCalciteQueryTest
{
@Test
public void testGroupByWithPostAggregatorReferencingTimeFloorColumnOnTimeseries() throws Exception
{
@ -6876,6 +6877,53 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testExplainMultipleTopLevelUnionAllQueries() throws Exception
{
// Skip vectorization since otherwise the "context" will change for each subtest.
skipVectorize();
final String query = "EXPLAIN PLAN FOR SELECT dim1 FROM druid.foo\n"
+ "UNION ALL (SELECT dim1 FROM druid.foo WHERE dim1 = '42'\n"
+ "UNION ALL SELECT dim1 FROM druid.foo WHERE dim1 = '44')";
final String legacyExplanation = "DruidUnionRel(limit=[-1])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":null,\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING}])\n"
+ " DruidUnionRel(limit=[-1])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"42\",\"extractionFn\":null},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING}])\n"
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"44\",\"extractionFn\":null},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING}])\n";
final String explanation = "["
+ "{"
+ "\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":null,\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}},"
+ "\"signature\":[{\"name\":\"dim1\",\"type\":\"STRING\"}]"
+ "},"
+ "{"
+ "\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"42\",\"extractionFn\":null},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}},"
+ "\"signature\":[{\"name\":\"dim1\",\"type\":\"STRING\"}]"
+ "},"
+ "{"
+ "\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"filter\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"44\",\"extractionFn\":null},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}},"
+ "\"signature\":[{\"name\":\"dim1\",\"type\":\"STRING\"}]"
+ "}]";
final String resources = "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
testQuery(
query,
ImmutableList.of(),
ImmutableList.of(
new Object[]{legacyExplanation, resources}
)
);
testQuery(
PLANNER_CONFIG_NATIVE_QUERY_EXPLAIN,
query,
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(),
ImmutableList.of(
new Object[]{explanation, resources}
)
);
}
@Test
public void testExactCountDistinctUsingSubqueryWithWherePushDown() throws Exception
{