SQL version of unnest native druid function (#13576)

* adds the SQL component of the native unnest functionality in Druid to unnest SQL queries on a table dimension, virtual column or a constant array and convert them into native Druid queries
* unnest in SQL is implemented as a combination of Correlate (the comma join part) and Uncollect (the unnest part)
This commit is contained in:
somu-imply 2023-01-23 12:53:31 -08:00 committed by GitHub
parent f76acccff2
commit 90d445536d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1720 additions and 44 deletions

View File

@ -215,6 +215,18 @@ public class UnnestDataSource implements DataSource
{
return Objects.hash(base, column, outputName);
}
@Override
public String toString()
{
return "UnnestDataSource{" +
"base=" + base +
", column='" + column + '\'' +
", outputName='" + outputName + '\'' +
", allowList=" + allowList +
'}';
}
}

View File

@ -100,7 +100,31 @@ public class UnnestColumnValueSelectorCursor implements Cursor
if (!outputName.equals(dimensionSpec.getDimension())) {
return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
}
throw new UOE("Unsupported dimension selector while using column value selector for column [%s]", outputName);
// this is done to support virtual columns
// In future a developer should move towards making sure that
// for all dictionary encoded cases we only get the dimension selector
return new BaseSingleValueDimensionSelector()
{
final ColumnValueSelector colSelector = makeColumnValueSelector(dimensionSpec.getDimension());
@Nullable
@Override
protected String getValue()
{
final Object returnedObj = colSelector.getObject();
if (returnedObj == null) {
return null;
} else {
return String.valueOf(returnedObj);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
colSelector.inspectRuntimeShape(inspector);
}
};
}
@Override

View File

@ -83,7 +83,7 @@ public class UnnestDimensionCursor implements Cursor
private final BitSet allowedBitSet;
private final ColumnSelectorFactory baseColumnSelectorFactory;
private int index;
private IndexedInts indexedIntsForCurrentRow;
@Nullable private IndexedInts indexedIntsForCurrentRow;
private boolean needInitialization;
private SingleIndexInts indexIntsForRow;
@ -181,7 +181,7 @@ public class UnnestDimensionCursor implements Cursor
@Override
public Object getObject()
{
if (indexedIntsForCurrentRow == null) {
if (indexedIntsForCurrentRow == null || indexedIntsForCurrentRow.size() == 0) {
return null;
}
if (allowedBitSet.isEmpty()) {
@ -319,6 +319,7 @@ public class UnnestDimensionCursor implements Cursor
* This would also create a bitset for dictonary encoded columns to
* check for matching values specified in allowedList of UnnestDataSource.
*/
@Nullable
private void initialize()
{
IdLookup idLookup = dimSelector.idLookup();
@ -409,7 +410,12 @@ public class UnnestDimensionCursor implements Cursor
@Override
public int get(int idx)
{
return indexedIntsForCurrentRow.get(index);
// need to get value from the indexed ints
// only if it is non null and has at least 1 value
if (indexedIntsForCurrentRow != null && indexedIntsForCurrentRow.size() > 0) {
return indexedIntsForCurrentRow.get(index);
}
return 0;
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
@ -62,7 +61,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int j = 0;
while (!unnestCursor.isDone()) {
Object colSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(colSelectorVal.toString(), String.valueOf(j));
Assert.assertEquals(String.valueOf(j), colSelectorVal.toString());
j++;
unnestCursor.advance();
}
@ -96,7 +95,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString());
k++;
unnestCursor.advance();
}
@ -112,8 +111,6 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
Collections.singletonList(null)
);
List<String> expectedResults = Arrays.asList(null, null, null, null);
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
@ -168,9 +165,9 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
if (valueSelectorVal == null) {
Assert.assertEquals(null, expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), null);
} else {
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString());
}
k++;
unnestCursor.advance();
@ -201,7 +198,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString());
k++;
unnestCursor.advance();
}
@ -231,7 +228,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString());
k++;
unnestCursor.advance();
}
@ -265,7 +262,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString());
Assert.assertEquals(expectedResults.get(k).toString(), valueSelectorVal.toString());
k++;
unnestCursor.advance();
}
@ -306,7 +303,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!parentCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString());
Assert.assertEquals(expectedResults.get(k).toString(), valueSelectorVal.toString());
k++;
parentCursor.advance();
}
@ -344,12 +341,12 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
if (valueSelectorVal == null) {
Assert.assertEquals(null, expectedResults.get(k));
} else {
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString());
}
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, expectedResults.size());
Assert.assertEquals(expectedResults.size(), k);
}
@Test
@ -382,7 +379,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
if (valueSelectorVal == null) {
Assert.assertEquals(null, expectedResults.get(k));
} else {
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString());
}
k++;
unnestCursor.advance();
@ -420,7 +417,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
if (valueSelectorVal == null) {
Assert.assertEquals(null, expectedResults.get(k));
} else {
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString());
}
k++;
unnestCursor.advance();
@ -455,7 +452,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!unnestCursor.isDone()) {
Double valueSelectorVal = unnestColumnValueSelector.getDouble();
Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal);
k++;
unnestCursor.advance();
}
@ -489,7 +486,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!unnestCursor.isDone()) {
Float valueSelectorVal = unnestColumnValueSelector.getFloat();
Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal);
k++;
unnestCursor.advance();
}
@ -526,7 +523,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
Object obj = unnestColumnValueSelector.getObject();
Assert.assertNotNull(obj);
Long valueSelectorVal = unnestColumnValueSelector.getLong();
Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
Assert.assertEquals(expectedResults.get(k), valueSelectorVal);
k++;
unnestCursor.advance();
}
@ -561,7 +558,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString());
Assert.assertEquals(expectedResults.get(k).toString(), valueSelectorVal.toString());
k++;
unnestCursor.advance();
}
@ -570,17 +567,16 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
Assert.assertFalse(unnestCursor.isDoneOrInterrupted());
}
@Test(expected = UOE.class)
@Test
public void test_list_unnest_cursors_dimSelector()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("e", "f", "g", "h", "i"),
Collections.singletonList("j")
Collections.singletonList(null)
);
List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j");
List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
@ -592,7 +588,25 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
OUTPUT_NAME,
IGNORE_SET
);
unnestCursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_NAME));
// should return a column value selector for this case
BaseSingleValueDimensionSelector unnestDimSelector = (BaseSingleValueDimensionSelector) unnestCursor.getColumnSelectorFactory()
.makeDimensionSelector(
DefaultDimensionSpec.of(
OUTPUT_NAME));
unnestDimSelector.inspectRuntimeShape(null);
int k = 0;
while (!unnestCursor.isDone()) {
if (k < 8) {
Assert.assertEquals(expectedResults.get(k).toString(), unnestDimSelector.getValue());
} else {
Assert.assertNull(unnestDimSelector.getValue());
}
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 9);
unnestCursor.reset();
Assert.assertNotNull(unnestDimSelector);
}
@Test
@ -622,7 +636,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString());
Assert.assertEquals(expectedResults.get(k).toString(), valueSelectorVal.toString());
k++;
unnestCursor.advance();
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
@ -34,6 +35,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
@ -58,6 +60,7 @@ import org.apache.druid.sql.calcite.filtration.Bounds;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.joda.time.Interval;
@ -214,12 +217,51 @@ public class Expressions
return rexCallToDruidExpression(plannerContext, rowSignature, rexNode, postAggregatorVisitor);
} else if (kind == SqlKind.LITERAL) {
return literalToDruidExpression(plannerContext, rexNode);
} else if (kind == SqlKind.FIELD_ACCESS) {
return fieldAccessToDruidExpression(rowSignature, rexNode);
} else {
// Can't translate.
return null;
}
}
private static DruidExpression fieldAccessToDruidExpression(
final RowSignature rowSignature,
final RexNode rexNode
)
{
// Translate field references.
final RexFieldAccess ref = (RexFieldAccess) rexNode;
if (ref.getField().getIndex() > rowSignature.size()) {
// This case arises in the case of a correlation where the rexNode points to a table from the left subtree
// while the underlying datasource is the scan stub created from LogicalValuesRule
// In such a case we throw a CannotBuildQueryException so that Calcite does not go ahead with this path
// This exception is caught while returning false from isValidDruidQuery() method
throw new CannotBuildQueryException(StringUtils.format(
"Cannot build query as column name [%s] does not exist in row [%s]", ref.getField().getName(), rowSignature)
);
}
final String columnName = ref.getField().getName();
final int index = rowSignature.indexOf(columnName);
// This case arises when the rexNode has a name which is not in the underlying stub created using DruidUnnestDataSourceRule
// The column name has name ZERO with rowtype as LONG
// causes the index to be -1. In such a case we cannot build the query
// and throw an exception while returning false from isValidDruidQuery() method
if (index < 0) {
throw new CannotBuildQueryException(StringUtils.format(
"Expression referred to nonexistent index[%d] in row[%s]",
index,
rowSignature
));
}
final Optional<ColumnType> columnType = rowSignature.getColumnType(index);
return DruidExpression.ofColumn(columnType.get(), columnName);
}
private static DruidExpression inputRefToDruidExpression(
final RowSignature rowSignature,
final RexNode rexNode

View File

@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Correlate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This is the DruidRel to handle correlated rel nodes to be used for unnest.
* Each correlate can be perceived as a join with the join type being inner
* the left of a correlate as seen in the rule {@link org.apache.druid.sql.calcite.rule.DruidCorrelateUnnestRule}
* is the {@link DruidQueryRel} while the right will always be an {@link DruidUnnestDatasourceRel}.
*
* Since this is a subclass of DruidRel it is automatically considered by other rules that involves DruidRels.
* Some example being SELECT_PROJECT and SORT_PROJECT rules in {@link org.apache.druid.sql.calcite.rule.DruidRules.DruidQueryRule}
*/
public class DruidCorrelateUnnestRel extends DruidRel<DruidCorrelateUnnestRel>
{
private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("unnest");
private final Filter leftFilter;
private final PartialDruidQuery partialQuery;
private final PlannerConfig plannerConfig;
private final Correlate correlateRel;
private RelNode left;
private RelNode right;
private DruidCorrelateUnnestRel(
RelOptCluster cluster,
RelTraitSet traitSet,
Correlate correlateRel,
PartialDruidQuery partialQuery,
Filter baseFilter,
PlannerContext plannerContext
)
{
super(cluster, traitSet, plannerContext);
this.correlateRel = correlateRel;
this.partialQuery = partialQuery;
this.left = correlateRel.getLeft();
this.right = correlateRel.getRight();
this.leftFilter = baseFilter;
this.plannerConfig = plannerContext.getPlannerConfig();
}
/**
* Create an instance from a Correlate that is based on a {@link DruidRel} and a {@link DruidUnnestDatasourceRel} inputs.
*/
public static DruidCorrelateUnnestRel create(
final Correlate correlateRel,
final Filter leftFilter,
final PlannerContext plannerContext
)
{
return new DruidCorrelateUnnestRel(
correlateRel.getCluster(),
correlateRel.getTraitSet(),
correlateRel,
PartialDruidQuery.create(correlateRel),
leftFilter,
plannerContext
);
}
@Nullable
@Override
public PartialDruidQuery getPartialDruidQuery()
{
return partialQuery;
}
@Override
public DruidCorrelateUnnestRel withPartialQuery(PartialDruidQuery newQueryBuilder)
{
return new DruidCorrelateUnnestRel(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
correlateRel,
newQueryBuilder,
leftFilter,
getPlannerContext()
);
}
@Override
public DruidQuery toDruidQuery(boolean finalizeAggregations)
{
final DruidRel<?> druidQueryRel = (DruidRel<?>) left;
final DruidQuery leftQuery = Preconditions.checkNotNull((druidQueryRel).toDruidQuery(false), "leftQuery");
final DataSource leftDataSource;
if (DruidJoinQueryRel.computeLeftRequiresSubquery(druidQueryRel)) {
leftDataSource = new QueryDataSource(leftQuery.getQuery());
} else {
leftDataSource = leftQuery.getDataSource();
}
final DruidUnnestDatasourceRel unnestDatasourceRel = (DruidUnnestDatasourceRel) right;
final RowSignature rowSignature = RowSignatures.fromRelDataType(
correlateRel.getRowType().getFieldNames(),
correlateRel.getRowType()
);
final DruidExpression expression = Expressions.toDruidExpression(
getPlannerContext(),
rowSignature,
unnestDatasourceRel.getUnnestProject().getProjects().get(0)
);
LogicalProject unnestProject = LogicalProject.create(
this,
ImmutableList.of(unnestDatasourceRel.getUnnestProject()
.getProjects()
.get(0)),
unnestDatasourceRel.getUnnestProject().getRowType()
);
// placeholder for dimension or expression to be unnested
final String dimOrExpToUnnest;
final VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
rowSignature,
getPlannerContext().getExprMacroTable(),
getPlannerContext().getPlannerConfig().isForceExpressionVirtualColumns()
);
// the unnest project is needed in case of a virtual column
// unnest(mv_to_array(dim_1)) is reconciled as unnesting a MVD dim_1 not requiring a virtual column
// while unnest(array(dim_2,dim_3)) is understood as unnesting a virtual column which is an array over dim_2 and dim_3 elements
boolean unnestProjectNeeded = false;
getPlannerContext().setJoinExpressionVirtualColumnRegistry(virtualColumnRegistry);
// handling for case when mv_to_array is used
// No need to use virtual column in such a case
if (StringUtils.toLowerCase(expression.getExpression()).startsWith("mv_to_array")) {
dimOrExpToUnnest = expression.getArguments().get(0).getSimpleExtraction().getColumn();
} else {
if (expression.isDirectColumnAccess()) {
dimOrExpToUnnest = expression.getDirectColumn();
} else {
// buckle up time to create virtual columns on expressions
unnestProjectNeeded = true;
dimOrExpToUnnest = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
expression,
expression.getDruidType()
);
}
}
// add the unnest project to the partial query if required
// This is necessary to handle the virtual columns on the unnestProject
// Also create the unnest datasource to be used by the partial query
PartialDruidQuery partialDruidQuery = unnestProjectNeeded ? partialQuery.withUnnest(unnestProject) : partialQuery;
return partialDruidQuery.build(
UnnestDataSource.create(
leftDataSource,
dimOrExpToUnnest,
unnestDatasourceRel.getUnnestProject().getRowType().getFieldNames().get(0),
null
),
rowSignature,
getPlannerContext(),
getCluster().getRexBuilder(),
finalizeAggregations,
virtualColumnRegistry
);
}
@Override
protected DruidCorrelateUnnestRel clone()
{
return DruidCorrelateUnnestRel.create(correlateRel, leftFilter, getPlannerContext());
}
@Override
protected RelDataType deriveRowType()
{
return partialQuery.getRowType();
}
@Override
public DruidQuery toDruidQueryForExplaining()
{
return partialQuery.build(
DUMMY_DATA_SOURCE,
RowSignatures.fromRelDataType(
correlateRel.getRowType().getFieldNames(),
correlateRel.getRowType()
),
getPlannerContext(),
getCluster().getRexBuilder(),
false
);
}
// This is required to be overwritten as Calcite uses this method
// to maintain a map of equivalent DruidCorrelateUnnestRel or in general any Rel nodes.
// Without this method overwritten multiple RelNodes will produce the same key
// which makes the planner plan incorrectly.
@Override
public RelWriter explainTerms(RelWriter pw)
{
final String queryString;
final DruidQuery druidQuery = toDruidQueryForExplaining();
try {
queryString = getPlannerContext().getJsonMapper().writeValueAsString(druidQuery.getQuery());
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return pw.item("query", queryString)
.item("signature", druidQuery.getOutputRowSignature());
}
// This is called from the DruidRelToDruidRule which converts from the NONE convention to the DRUID convention
@Override
public DruidCorrelateUnnestRel asDruidConvention()
{
return new DruidCorrelateUnnestRel(
getCluster(),
getTraitSet().replace(DruidConvention.instance()),
correlateRel.copy(
correlateRel.getTraitSet(),
correlateRel.getInputs()
.stream()
.map(input -> RelOptRule.convert(input, DruidConvention.instance()))
.collect(Collectors.toList())
),
partialQuery,
leftFilter,
getPlannerContext()
);
}
@Override
public List<RelNode> getInputs()
{
return ImmutableList.of(left, right);
}
@Override
public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs)
{
return new DruidCorrelateUnnestRel(
getCluster(),
traitSet,
correlateRel.copy(correlateRel.getTraitSet(), inputs),
getPartialDruidQuery(),
leftFilter,
getPlannerContext()
);
}
@Override
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
{
double cost;
if (DruidJoinQueryRel.computeLeftRequiresSubquery(DruidJoinQueryRel.getSomeDruidChild(left))) {
cost = CostEstimates.COST_SUBQUERY;
} else {
cost = partialQuery.estimateCost();
if (correlateRel.getJoinType() == JoinRelType.INNER && plannerConfig.isComputeInnerJoinCostAsFilter()) {
cost *= CostEstimates.MULTIPLIER_FILTER;
}
}
return planner.getCostFactory().makeCost(cost, 0, 0);
}
@Override
public Set<String> getDataSourceNames()
{
final Set<String> retVal = new HashSet<>();
retVal.addAll(((DruidRel<?>) left).getDataSourceNames());
retVal.addAll(((DruidRel<?>) right).getDataSourceNames());
return retVal;
}
}

View File

@ -353,7 +353,7 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
}
}
private static boolean computeLeftRequiresSubquery(final DruidRel<?> left)
public static boolean computeLeftRequiresSubquery(final DruidRel<?> left)
{
// Left requires a subquery unless it's a scan or mapping on top of any table or a join.
return !DruidRels.isScanOrMapping(left, true);
@ -372,7 +372,7 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
* Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from
* applying that prefix.
*/
private static Pair<String, RowSignature> computeJoinRowSignature(
static Pair<String, RowSignature> computeJoinRowSignature(
final RowSignature leftSignature,
final RowSignature rightSignature
)

View File

@ -139,6 +139,9 @@ public class DruidQuery
@Nullable
private final Projection selectProjection;
@Nullable
private final Projection unnestProjection;
@Nullable
private final Grouping grouping;
@ -159,6 +162,7 @@ public class DruidQuery
final PlannerContext plannerContext,
@Nullable final DimFilter filter,
@Nullable final Projection selectProjection,
@Nullable final Projection unnestProjection,
@Nullable final Grouping grouping,
@Nullable final Sorting sorting,
@Nullable final Windowing windowing,
@ -171,6 +175,7 @@ public class DruidQuery
this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext");
this.filter = filter;
this.selectProjection = selectProjection;
this.unnestProjection = unnestProjection;
this.grouping = grouping;
this.sorting = sorting;
this.windowing = windowing;
@ -210,6 +215,7 @@ public class DruidQuery
// Now the fun begins.
final DimFilter filter;
final Projection selectProjection;
final Projection unnestProjection;
final Grouping grouping;
final Sorting sorting;
final Windowing windowing;
@ -290,11 +296,25 @@ public class DruidQuery
windowing = null;
}
if (partialQuery.getUnnestProject() != null) {
unnestProjection = Preconditions.checkNotNull(
computeUnnestProjection(
partialQuery,
plannerContext,
computeOutputRowSignature(sourceRowSignature, null, null, null, null),
virtualColumnRegistry
)
);
} else {
unnestProjection = null;
}
return new DruidQuery(
dataSource,
plannerContext,
filter,
selectProjection,
unnestProjection,
grouping,
sorting,
windowing,
@ -371,6 +391,18 @@ public class DruidQuery
}
}
@Nonnull
private static Projection computeUnnestProjection(
final PartialDruidQuery partialQuery,
final PlannerContext plannerContext,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry
)
{
final Project project = Preconditions.checkNotNull(partialQuery.getUnnestProject(), "unnestProject");
return Projection.preAggregation(project, plannerContext, rowSignature, virtualColumnRegistry);
}
@Nonnull
private static Grouping computeGrouping(
final PartialDruidQuery partialQuery,
@ -762,6 +794,16 @@ public class DruidQuery
}
}
if (unnestProjection != null) {
for (String columnName : unnestProjection.getVirtualColumns()) {
if (virtualColumnRegistry.isVirtualColumnDefined(columnName)) {
virtualColumns.add(virtualColumnRegistry.getVirtualColumn(columnName));
}
}
}
for (String columnName : specialized) {
if (virtualColumnRegistry.isVirtualColumnDefined(columnName)) {
virtualColumns.add(virtualColumnRegistry.getVirtualColumn(columnName));

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Uncollect;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.math.expr.InputBindings;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Set;
/**
* The Rel node to capture the unnest (or uncollect) part in a query. This covers 2 cases:
*
* Case 1:
* If this is an unnest on a constant and no input table is required, the final query is built using
* an UnnestDataSource with a base InlineDataSource in this rel.
*
* Case 2:
* If the unnest has an input table, this rel resolves the unnest part and delegates the rel to be consumed by other
* rule ({@link org.apache.druid.sql.calcite.rule.DruidCorrelateUnnestRule}
*/
public class DruidUnnestDatasourceRel extends DruidRel<DruidUnnestDatasourceRel>
{
private final Uncollect uncollect;
private final DruidQueryRel druidQueryRel;
private final LogicalProject unnestProject;
public DruidUnnestDatasourceRel(
Uncollect uncollect,
DruidQueryRel queryRel,
LogicalProject unnestProject,
PlannerContext plannerContext
)
{
super(uncollect.getCluster(), uncollect.getTraitSet(), plannerContext);
this.uncollect = uncollect;
this.druidQueryRel = queryRel;
this.unnestProject = unnestProject;
}
public LogicalProject getUnnestProject()
{
return unnestProject;
}
@Nullable
@Override
public PartialDruidQuery getPartialDruidQuery()
{
return druidQueryRel.getPartialDruidQuery();
}
@Override
public DruidUnnestDatasourceRel withPartialQuery(PartialDruidQuery newQueryBuilder)
{
return new DruidUnnestDatasourceRel(
uncollect,
druidQueryRel.withPartialQuery(newQueryBuilder),
unnestProject,
getPlannerContext()
);
}
@Override
public DruidQuery toDruidQuery(boolean finalizeAggregations)
{
VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
druidQueryRel.getDruidTable().getRowSignature(),
getPlannerContext().getExprMacroTable(),
getPlannerContext().getPlannerConfig().isForceExpressionVirtualColumns()
);
getPlannerContext().setJoinExpressionVirtualColumnRegistry(virtualColumnRegistry);
final DruidExpression expression = Expressions.toDruidExpression(
getPlannerContext(),
druidQueryRel.getDruidTable().getRowSignature(),
unnestProject.getProjects().get(0)
);
if (expression == null) {
return null;
}
Expr parsed = expression.parse(getPlannerContext().getExprMacroTable());
ExprEval eval = parsed.eval(InputBindings.nilBindings());
// If query unnests a constant expression and not use any table
// the unnest would be on an inline data source
// with the input column being called "inline" in the native query
UnnestDataSource dataSource = UnnestDataSource.create(
InlineDataSource.fromIterable(
Collections.singletonList(new Object[]{eval.value()}),
RowSignature.builder().add("inline", ExpressionType.toColumnType(eval.type())).build()
),
"inline",
druidQueryRel.getRowType().getFieldNames().get(0),
null
);
DruidQuery query = druidQueryRel.getPartialDruidQuery().build(
dataSource,
RowSignatures.fromRelDataType(uncollect.getRowType().getFieldNames(), uncollect.getRowType()),
getPlannerContext(),
getCluster().getRexBuilder(),
finalizeAggregations
);
getPlannerContext().setJoinExpressionVirtualColumnRegistry(null);
return query;
}
@Override
public DruidQuery toDruidQueryForExplaining()
{
return toDruidQuery(false);
}
@Override
public DruidUnnestDatasourceRel asDruidConvention()
{
return new DruidUnnestDatasourceRel(
new Uncollect(getCluster(), traitSet.replace(DruidConvention.instance()), uncollect.getInput(), false),
druidQueryRel.asDruidConvention(),
unnestProject,
getPlannerContext()
);
}
@Override
public RelWriter explainTerms(RelWriter pw)
{
return super.explainTerms(pw);
}
@Override
public Set<String> getDataSourceNames()
{
return druidQueryRel.getDruidTable().getDataSource().getTableNames();
}
@Override
protected RelDataType deriveRowType()
{
return uncollect.getRowType();
}
@Override
protected DruidUnnestDatasourceRel clone()
{
return new DruidUnnestDatasourceRel(uncollect, druidQueryRel, unnestProject, getPlannerContext());
}
}

View File

@ -54,6 +54,7 @@ public class PartialDruidQuery
private final RelNode scan;
private final Filter whereFilter;
private final Project selectProject;
private final Project unnestProject;
private final Aggregate aggregate;
private final Filter havingFilter;
private final Project aggregateProject;
@ -80,7 +81,9 @@ public class PartialDruidQuery
SORT_PROJECT,
// WINDOW may be present only together with SCAN.
WINDOW
WINDOW,
UNNEST_PROJECT
}
private PartialDruidQuery(
@ -93,7 +96,8 @@ public class PartialDruidQuery
final Filter havingFilter,
final Sort sort,
final Project sortProject,
final Window window
final Window window,
final Project unnestProject
)
{
this.builderSupplier = Preconditions.checkNotNull(builderSupplier, "builderSupplier");
@ -106,6 +110,7 @@ public class PartialDruidQuery
this.sort = sort;
this.sortProject = sortProject;
this.window = window;
this.unnestProject = unnestProject;
}
public static PartialDruidQuery create(final RelNode scanRel)
@ -114,7 +119,7 @@ public class PartialDruidQuery
scanRel.getCluster(),
scanRel.getTable() != null ? scanRel.getTable().getRelOptSchema() : null
);
return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null, null);
return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null, null, null);
}
public RelNode getScan()
@ -132,6 +137,11 @@ public class PartialDruidQuery
return selectProject;
}
public Project getUnnestProject()
{
return unnestProject;
}
public Aggregate getAggregate()
{
return aggregate;
@ -175,7 +185,8 @@ public class PartialDruidQuery
havingFilter,
sort,
sortProject,
window
window,
unnestProject
);
}
@ -218,7 +229,8 @@ public class PartialDruidQuery
havingFilter,
sort,
sortProject,
window
window,
unnestProject
);
}
@ -235,7 +247,8 @@ public class PartialDruidQuery
havingFilter,
sort,
sortProject,
window
window,
unnestProject
);
}
@ -252,7 +265,8 @@ public class PartialDruidQuery
newHavingFilter,
sort,
sortProject,
window
window,
unnestProject
);
}
@ -269,7 +283,8 @@ public class PartialDruidQuery
havingFilter,
sort,
sortProject,
window
window,
unnestProject
);
}
@ -286,7 +301,8 @@ public class PartialDruidQuery
havingFilter,
newSort,
sortProject,
window
window,
unnestProject
);
}
@ -303,7 +319,8 @@ public class PartialDruidQuery
havingFilter,
sort,
newSortProject,
window
window,
unnestProject
);
}
@ -320,7 +337,25 @@ public class PartialDruidQuery
havingFilter,
sort,
sortProject,
newWindow
newWindow,
unnestProject
);
}
public PartialDruidQuery withUnnest(final Project newUnnestProject)
{
return new PartialDruidQuery(
builderSupplier,
scan,
whereFilter,
selectProject,
aggregate,
aggregateProject,
havingFilter,
sort,
sortProject,
window,
newUnnestProject
);
}
@ -575,6 +610,7 @@ public class PartialDruidQuery
", aggregateProject=" + aggregateProject +
", sort=" + sort +
", sortProject=" + sortProject +
", unnestProject=" + unnestProject +
'}';
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rule;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Correlate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.tools.RelBuilder;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidCorrelateUnnestRel;
import org.apache.druid.sql.calcite.rel.DruidQueryRel;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidUnnestDatasourceRel;
import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
import java.util.ArrayList;
import java.util.List;
/**
* This class creates the rule to abide by for creating correlations during unnest.
* Typically, Calcite plans the unnest query such as
* SELECT * from numFoo, unnest(dim3) in the following way:
* 80:LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
* 6:LogicalTableScan(subset=[rel#74:Subset#0.NONE.[]], table=[[druid, numfoo]])
* 78:Uncollect(subset=[rel#79:Subset#3.NONE.[]])
* 76:LogicalProject(subset=[rel#77:Subset#2.NONE.[]], EXPR$0=[MV_TO_ARRAY($cor0.dim3)])
* 7:LogicalValues(subset=[rel#75:Subset#1.NONE.[0]], tuples=[[{ 0 }]])
*
* {@link DruidUnnestDatasourceRule} takes care of the Uncollect(last 3 lines) to generate a {@link DruidUnnestDatasourceRel}
* thereby reducing the logical plan to:
* LogicalCorrelate
* / \
* DruidRel DruidUnnestDataSourceRel
*
* This forms the premise of this rule. The goal is to transform the above-mentioned structure in the tree
* with a new rel {@link DruidCorrelateUnnestRel} which shall be created here.
*
*/
public class DruidCorrelateUnnestRule extends RelOptRule
{
private final PlannerContext plannerContext;
private final boolean enableLeftScanDirect;
public DruidCorrelateUnnestRule(final PlannerContext plannerContext)
{
super(
operand(
Correlate.class,
operand(DruidRel.class, any()),
operand(DruidUnnestDatasourceRel.class, any())
)
);
this.plannerContext = plannerContext;
this.enableLeftScanDirect = plannerContext.queryContext().getEnableJoinLeftScanDirect();
}
@Override
public boolean matches(RelOptRuleCall call)
{
final DruidRel<?> druidRel = call.rel(1);
final DruidRel<?> uncollectRel = call.rel(2);
return druidRel.getPartialDruidQuery() != null
&& uncollectRel.getPartialDruidQuery() != null;
}
@Override
public void onMatch(RelOptRuleCall call)
{
final Correlate correlate = call.rel(0);
final DruidRel<?> druidRel = call.rel(1);
DruidUnnestDatasourceRel druidUnnestDatasourceRel = call.rel(2);
final RexBuilder rexBuilder = correlate.getCluster().getRexBuilder();
final Filter druidRelFilter;
final DruidRel<?> newDruidRelFilter;
final List<RexNode> newProjectExprs = new ArrayList<>();
final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (druidRel instanceof DruidQueryRel);
if (druidRel.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT
&& (isLeftDirectAccessPossible || druidRel.getPartialDruidQuery().getWhereFilter() == null)) {
// Swap the druidRel-side projection above the correlate, so the druidRel side is a simple scan or mapping.
// This helps us avoid subqueries.
final RelNode leftScan = druidRel.getPartialDruidQuery().getScan();
final Project leftProject = druidRel.getPartialDruidQuery().getSelectProject();
druidRelFilter = druidRel.getPartialDruidQuery().getWhereFilter();
// Left-side projection expressions rewritten to be on top of the correlate.
newProjectExprs.addAll(leftProject.getProjects());
newDruidRelFilter = druidRel.withPartialQuery(PartialDruidQuery.create(leftScan));
} else {
// Leave druidRel as-is. Write input refs that do nothing.
for (int i = 0; i < druidRel.getRowType().getFieldCount(); i++) {
newProjectExprs.add(rexBuilder.makeInputRef(correlate.getRowType().getFieldList().get(i).getType(), i));
}
newDruidRelFilter = druidRel;
druidRelFilter = null;
}
if (druidUnnestDatasourceRel.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT) {
for (final RexNode rexNode : RexUtil.shift(
druidUnnestDatasourceRel.getPartialDruidQuery()
.getSelectProject()
.getProjects(),
newDruidRelFilter.getRowType().getFieldCount()
)) {
newProjectExprs.add(rexNode);
}
} else {
for (int i = 0; i < druidUnnestDatasourceRel.getRowType().getFieldCount(); i++) {
newProjectExprs.add(
rexBuilder.makeInputRef(
correlate.getRowType()
.getFieldList()
.get(druidRel.getRowType().getFieldCount() + i)
.getType(),
newDruidRelFilter.getRowType().getFieldCount() + i
)
);
}
}
final DruidCorrelateUnnestRel druidCorr = DruidCorrelateUnnestRel.create(
correlate.copy(
correlate.getTraitSet(),
newDruidRelFilter,
druidUnnestDatasourceRel,
correlate.getCorrelationId(),
correlate.getRequiredColumns(),
correlate.getJoinType()
),
druidRelFilter,
plannerContext
);
final RelBuilder relBuilder =
call.builder()
.push(druidCorr)
.project(RexUtil.fixUp(
rexBuilder,
newProjectExprs,
RelOptUtil.getFieldTypeList(druidCorr.getRowType())
));
call.transformTo(relBuilder.build());
}
}

View File

@ -50,7 +50,8 @@ public class DruidRelToDruidRule extends ConverterRule
public RelNode convert(RelNode rel)
{
try {
return ((DruidRel<?>) rel).asDruidConvention();
final RelNode newRel = ((DruidRel<?>) rel).asDruidConvention();
return newRel;
}
catch (Exception e) {
log.error(e, "Conversion failed");

View File

@ -97,7 +97,9 @@ public class DruidRules
new DruidUnionRule(plannerContext),
new DruidUnionDataSourceRule(plannerContext),
DruidSortUnionRule.instance(),
DruidJoinRule.instance(plannerContext)
DruidJoinRule.instance(plannerContext),
new DruidUnnestDatasourceRule(plannerContext),
new DruidCorrelateUnnestRule(plannerContext)
)
);

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rule;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Uncollect;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.tools.RelBuilder;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQueryRel;
import org.apache.druid.sql.calcite.rel.DruidUnnestDatasourceRel;
/**
* This class creates the rule to abide by for creating unnest (internally uncollect) in Calcite.
* Typically, Calcite plans the *unnest* part of the query involving a table such as
* SELECT * from numFoo, unnest(dim3)
* or even a standalone unnest query such as
* SELECT * from unnest(ARRAY[1,2,3]) in the following way:
* 78:Uncollect(subset=[rel#79:Subset#3.NONE.[]])
* 76:LogicalProject(subset=[rel#77:Subset#2.NONE.[]], EXPR$0=[MV_TO_ARRAY($cor0.dim3)])
* 7:LogicalValues(subset=[rel#75:Subset#1.NONE.[0]], tuples=[[{ 0 }]])
*
* Calcite tackles plans bottom up. Therefore,
* {@link DruidLogicalValuesRule} converts the LogicalValues part into a leaf level {@link DruidQueryRel}
* thereby creating the following subtree in the call tree
*
* Uncollect
* \
* LogicalProject
* \
* DruidQueryRel
*
*
* This forms the premise of this rule. The goal is to transform the above-mentioned structure in the tree
* with a new rel {@link DruidUnnestDatasourceRel} which shall be created here.
*
*/
public class DruidUnnestDatasourceRule extends RelOptRule
{
private final PlannerContext plannerContext;
public DruidUnnestDatasourceRule(PlannerContext plannerContext)
{
super(
operand(
Uncollect.class,
operand(LogicalProject.class, operand(DruidQueryRel.class, none()))
)
);
this.plannerContext = plannerContext;
}
@Override
public boolean matches(RelOptRuleCall call)
{
return true;
}
@Override
public void onMatch(final RelOptRuleCall call)
{
final Uncollect uncollectRel = call.rel(0);
final LogicalProject logicalProject = call.rel(1);
final DruidQueryRel druidQueryRel = call.rel(2);
final RexBuilder rexBuilder = logicalProject.getCluster().getRexBuilder();
final LogicalProject queryProject = LogicalProject.create(
uncollectRel,
ImmutableList.of(rexBuilder.makeInputRef(uncollectRel.getRowType().getFieldList().get(0).getType(), 0)),
uncollectRel.getRowType()
);
DruidUnnestDatasourceRel unnestDatasourceRel = new DruidUnnestDatasourceRel(
uncollectRel,
druidQueryRel.withPartialQuery(druidQueryRel.getPartialDruidQuery().withSelectProject(queryProject)),
logicalProject,
plannerContext
);
final RelBuilder relBuilder =
call.builder()
.push(unnestDatasourceRel);
call.transformTo(relBuilder.build());
}
}

View File

@ -29,9 +29,11 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.Druids;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
@ -51,7 +53,9 @@ import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.topn.DimensionTopNMetricSpec;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
@ -2613,4 +2617,703 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
Assert.assertEquals(path, expected, actual);
}
}
@Test
public void testUnnestInline()
{
skipVectorize();
cannotVectorize();
testQuery(
"SELECT * FROM UNNEST(ARRAY[1,2,3])",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(
UnnestDataSource.create(
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{new Object[]{1L, 2L, 3L}}),
RowSignature.builder().add("inline", ColumnType.LONG_ARRAY).build()
),
"inline",
"EXPR$0",
null
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.build()
),
ImmutableList.of(
new Object[]{1},
new Object[]{2},
new Object[]{3}
)
);
}
@Test
public void testUnnest()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE3),
"dim3",
"EXPR$0",
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"b"},
new Object[]{"c"},
new Object[]{"d"},
new Object[]{""},
new Object[]{""},
new Object[]{""}
) :
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"b"},
new Object[]{"c"},
new Object[]{"d"},
new Object[]{""},
new Object[]{null},
new Object[]{null}
)
);
}
@Test
public void testUnnestWithGroupBy()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) GROUP BY d3 ",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE3),
"dim3",
"EXPR$0",
null
))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setContext(QUERY_CONTEXT_DEFAULT)
.setDimensions(new DefaultDimensionSpec("EXPR$0", "_d0", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{""},
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"c"},
new Object[]{"d"}
) :
ImmutableList.of(
new Object[]{null},
new Object[]{""},
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"c"},
new Object[]{"d"}
)
);
}
@Test
public void testUnnestWithGroupByOrderBy()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT d3, COUNT(*) FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) AS unnested(d3) GROUP BY d3 ORDER BY d3 DESC ",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE3),
"dim3",
"EXPR$0",
null
))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setContext(QUERY_CONTEXT_DEFAULT)
.setDimensions(new DefaultDimensionSpec("EXPR$0", "_d0", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setLimitSpec(
DefaultLimitSpec
.builder()
.orderBy(new OrderByColumnSpec(
"_d0",
OrderByColumnSpec.Direction.DESCENDING,
StringComparators.LEXICOGRAPHIC
))
.build()
)
.setAggregatorSpecs(new CountAggregatorFactory("a0"))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"d", 1L},
new Object[]{"c", 1L},
new Object[]{"b", 2L},
new Object[]{"a", 1L},
new Object[]{"", 3L}
) :
ImmutableList.of(
new Object[]{"d", 1L},
new Object[]{"c", 1L},
new Object[]{"b", 2L},
new Object[]{"a", 1L},
new Object[]{"", 1L},
new Object[]{null, 2L}
)
);
}
@Test
public void testUnnestWithGroupByOrderByWithLimit()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT d3, COUNT(*) FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) AS unnested(d3) GROUP BY d3 ORDER BY d3 ASC LIMIT 4 ",
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE3),
"dim3",
"EXPR$0",
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.dimension(new DefaultDimensionSpec("EXPR$0", "_d0", ColumnType.STRING))
.metric(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC))
.threshold(4)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"", 3L},
new Object[]{"a", 1L},
new Object[]{"b", 2L},
new Object[]{"c", 1L}
) :
ImmutableList.of(
new Object[]{null, 2L},
new Object[]{"", 1L},
new Object[]{"a", 1L},
new Object[]{"b", 2L}
)
);
}
@Test
public void testUnnestWithLimit()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) LIMIT 3",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE3),
"dim3",
"EXPR$0",
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.limit(3)
.build()
),
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"b"}
)
);
}
@Test
public void testUnnestFirstQueryOnSelect()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT d3 FROM (select dim1, dim2, dim3 from druid.numfoo), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE3),
"dim3",
"EXPR$0",
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"b"},
new Object[]{"c"},
new Object[]{"d"},
new Object[]{""},
new Object[]{""},
new Object[]{""}
) :
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"b"},
new Object[]{"c"},
new Object[]{"d"},
new Object[]{""},
new Object[]{null},
new Object[]{null}
)
);
}
@Test
public void testUnnestWithFilters()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT d3 FROM (select * from druid.numfoo where dim2='a'), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(
new TableDataSource(CalciteTests.DATASOURCE3)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.filters(new SelectorDimFilter("dim2", "a", null))
.columns(
"__time",
"cnt",
"d1",
"d2",
"dim1",
"dim3",
"dim4",
"dim5",
"dim6",
"f1",
"f2",
"l1",
"l2",
"m1",
"m2",
"unique_dim1",
"v0"
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
"dim3",
"EXPR$0",
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.build()
),
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{""}
)
);
}
@Test
public void testUnnestWithInFilters()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT d3 FROM (select * from druid.numfoo where dim2 IN ('a','b','ab','abc')), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new QueryDataSource(
newScanQueryBuilder()
.dataSource(
new TableDataSource(CalciteTests.DATASOURCE3)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.filters(new InDimFilter("dim2", ImmutableList.of("a", "b", "ab", "abc"), null))
.columns(
"__time",
"cnt",
"d1",
"d2",
"dim1",
"dim2",
"dim3",
"dim4",
"dim5",
"dim6",
"f1",
"f2",
"l1",
"l2",
"m1",
"m2",
"unique_dim1"
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
"dim3",
"EXPR$0",
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.build()
),
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{""},
useDefault ?
new Object[]{""} : new Object[]{null}
)
);
}
@Test
public void testUnnestVirtualWithColumns()
{
// This tells the test to skip generating (vectorize = force) path
// Generates only 1 native query with vectorize = false
skipVectorize();
// This tells that both vectorize = force and vectorize = false takes the same path of non vectorization
// Generates 2 native queries with 2 different values of vectorize
cannotVectorize();
testQuery(
"SELECT strings FROM druid.numfoo, UNNEST(ARRAY[dim4, dim5]) as unnested (strings)",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE3),
"v0",
"EXPR$0",
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.build()
),
ImmutableList.of(
new Object[]{"a"},
new Object[]{"aa"},
new Object[]{"a"},
new Object[]{"ab"},
new Object[]{"a"},
new Object[]{"ba"},
new Object[]{"b"},
new Object[]{"ad"},
new Object[]{"b"},
new Object[]{"aa"},
new Object[]{"b"},
new Object[]{"ab"}
)
);
}
@Test
public void testUnnestWithGroupByOrderByOnVirtualColumn()
{
skipVectorize();
cannotVectorize();
testQuery(
"SELECT d24, COUNT(*) FROM druid.numfoo, UNNEST(ARRAY[dim2, dim4]) AS unnested(d24) GROUP BY d24 ORDER BY d24 DESC ",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE3),
"v0",
"EXPR$0",
null
))
.setVirtualColumns(expressionVirtualColumn(
"v0",
"array(\"dim2\",\"dim4\")",
ColumnType.STRING_ARRAY
))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setContext(QUERY_CONTEXT_DEFAULT)
.setDimensions(new DefaultDimensionSpec("EXPR$0", "_d0", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setLimitSpec(
DefaultLimitSpec
.builder()
.orderBy(new OrderByColumnSpec(
"_d0",
OrderByColumnSpec.Direction.DESCENDING,
StringComparators.LEXICOGRAPHIC
))
.build()
)
.setAggregatorSpecs(new CountAggregatorFactory("a0"))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"b", 3L},
new Object[]{"abc", 1L},
new Object[]{"a", 5L},
new Object[]{"", 3L}
) :
ImmutableList.of(
new Object[]{"b", 3L},
new Object[]{"abc", 1L},
new Object[]{"a", 5L},
new Object[]{"", 1L},
new Object[]{null, 2L}
)
);
}
@Test
public void testUnnestWithJoinOnTheLeft()
{
skipVectorize();
cannotVectorize();
testQuery(
"SELECT d3 from (SELECT * from druid.numfoo JOIN (select dim2 as t from druid.numfoo where dim2 IN ('a','b','ab','abc')) ON dim2=t), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(UnnestDataSource.create(
join(
new TableDataSource(CalciteTests.DATASOURCE3),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(
new TableDataSource(CalciteTests.DATASOURCE3)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.filters(new InDimFilter("dim2", ImmutableList.of("a", "b", "ab", "abc"), null))
.columns(
"dim2"
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
"j0.",
"(\"dim2\" == \"j0.dim2\")",
JoinType.INNER
),
"dim3",
"EXPR$0",
null
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"a"},
new Object[]{"b"},
new Object[]{""},
new Object[]{""},
new Object[]{""}
) :
ImmutableList.of(
new Object[]{"a"},
new Object[]{"b"},
new Object[]{"a"},
new Object[]{"b"},
new Object[]{""},
new Object[]{""},
new Object[]{null}
)
);
}
@Test
public void testUnnestWithConstant()
{
// Since there is a constant on the right,
// Druid will plan this as a join query
// as there is nothing to correlate between left and right
skipVectorize();
cannotVectorize();
testQuery(
"SELECT longs FROM druid.numfoo, UNNEST(ARRAY[1,2,3]) as unnested (longs)",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE3),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(
UnnestDataSource.create(
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{new Object[]{1L, 2L, 3L}}),
RowSignature.builder().add("inline", ColumnType.LONG_ARRAY).build()
),
"inline",
"EXPR$0",
null
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"EXPR$0"
))
.build()
),
"j0.",
"1",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.columns(ImmutableList.of(
"j0.EXPR$0"
))
.build()
),
ImmutableList.of(
new Object[]{1},
new Object[]{2},
new Object[]{3},
new Object[]{1},
new Object[]{2},
new Object[]{3},
new Object[]{1},
new Object[]{2},
new Object[]{3},
new Object[]{1},
new Object[]{2},
new Object[]{3},
new Object[]{1},
new Object[]{2},
new Object[]{3},
new Object[]{1},
new Object[]{2},
new Object[]{3}
)
);
}
}