From 90d445536da54fdd48631feea73eff63fb6d476f Mon Sep 17 00:00:00 2001 From: somu-imply <93540295+somu-imply@users.noreply.github.com> Date: Mon, 23 Jan 2023 12:53:31 -0800 Subject: [PATCH] SQL version of unnest native druid function (#13576) * adds the SQL component of the native unnest functionality in Druid to unnest SQL queries on a table dimension, virtual column or a constant array and convert them into native Druid queries * unnest in SQL is implemented as a combination of Correlate (the comma join part) and Uncollect (the unnest part) --- .../apache/druid/query/UnnestDataSource.java | 12 + .../UnnestColumnValueSelectorCursor.java | 26 +- .../druid/segment/UnnestDimensionCursor.java | 12 +- .../UnnestColumnValueSelectorCursorTest.java | 64 +- .../sql/calcite/expression/Expressions.java | 42 ++ .../calcite/rel/DruidCorrelateUnnestRel.java | 330 ++++++++ .../sql/calcite/rel/DruidJoinQueryRel.java | 4 +- .../druid/sql/calcite/rel/DruidQuery.java | 42 ++ .../calcite/rel/DruidUnnestDatasourceRel.java | 180 +++++ .../sql/calcite/rel/PartialDruidQuery.java | 58 +- .../rule/DruidCorrelateUnnestRule.java | 177 +++++ .../sql/calcite/rule/DruidRelToDruidRule.java | 3 +- .../druid/sql/calcite/rule/DruidRules.java | 4 +- .../rule/DruidUnnestDatasourceRule.java | 107 +++ .../sql/calcite/CalciteArraysQueryTest.java | 703 ++++++++++++++++++ 15 files changed, 1720 insertions(+), 44 deletions(-) create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnnestDatasourceRel.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidCorrelateUnnestRule.java create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnnestDatasourceRule.java diff --git a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java index f548944948b..407aea5c39b 100644 --- a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java @@ -215,6 +215,18 @@ public class UnnestDataSource implements DataSource { return Objects.hash(base, column, outputName); } + + @Override + public String toString() + { + return "UnnestDataSource{" + + "base=" + base + + ", column='" + column + '\'' + + ", outputName='" + outputName + '\'' + + ", allowList=" + allowList + + '}'; + } + } diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java b/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java index db4acb893ea..5d434032989 100644 --- a/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java +++ b/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java @@ -100,7 +100,31 @@ public class UnnestColumnValueSelectorCursor implements Cursor if (!outputName.equals(dimensionSpec.getDimension())) { return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); } - throw new UOE("Unsupported dimension selector while using column value selector for column [%s]", outputName); + // this is done to support virtual columns + // In future a developer should move towards making sure that + // for all dictionary encoded cases we only get the dimension selector + return new BaseSingleValueDimensionSelector() + { + final ColumnValueSelector colSelector = makeColumnValueSelector(dimensionSpec.getDimension()); + + @Nullable + @Override + protected String getValue() + { + final Object returnedObj = colSelector.getObject(); + if (returnedObj == null) { + return null; + } else { + return String.valueOf(returnedObj); + } + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + colSelector.inspectRuntimeShape(inspector); + } + }; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java index 46a2c626caf..93a56767bbf 100644 --- a/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java +++ b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java @@ -83,7 +83,7 @@ public class UnnestDimensionCursor implements Cursor private final BitSet allowedBitSet; private final ColumnSelectorFactory baseColumnSelectorFactory; private int index; - private IndexedInts indexedIntsForCurrentRow; + @Nullable private IndexedInts indexedIntsForCurrentRow; private boolean needInitialization; private SingleIndexInts indexIntsForRow; @@ -181,7 +181,7 @@ public class UnnestDimensionCursor implements Cursor @Override public Object getObject() { - if (indexedIntsForCurrentRow == null) { + if (indexedIntsForCurrentRow == null || indexedIntsForCurrentRow.size() == 0) { return null; } if (allowedBitSet.isEmpty()) { @@ -319,6 +319,7 @@ public class UnnestDimensionCursor implements Cursor * This would also create a bitset for dictonary encoded columns to * check for matching values specified in allowedList of UnnestDataSource. */ + @Nullable private void initialize() { IdLookup idLookup = dimSelector.idLookup(); @@ -409,7 +410,12 @@ public class UnnestDimensionCursor implements Cursor @Override public int get(int idx) { - return indexedIntsForCurrentRow.get(index); + // need to get value from the indexed ints + // only if it is non null and has at least 1 value + if (indexedIntsForCurrentRow != null && indexedIntsForCurrentRow.size() > 0) { + return indexedIntsForCurrentRow.get(index); + } + return 0; } } } diff --git a/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java b/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java index b3346e1e562..cf4a98c8803 100644 --- a/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java +++ b/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java @@ -19,7 +19,6 @@ package org.apache.druid.segment; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; @@ -62,7 +61,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int j = 0; while (!unnestCursor.isDone()) { Object colSelectorVal = unnestColumnValueSelector.getObject(); - Assert.assertEquals(colSelectorVal.toString(), String.valueOf(j)); + Assert.assertEquals(String.valueOf(j), colSelectorVal.toString()); j++; unnestCursor.advance(); } @@ -96,7 +95,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!unnestCursor.isDone()) { Object valueSelectorVal = unnestColumnValueSelector.getObject(); - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString()); k++; unnestCursor.advance(); } @@ -112,8 +111,6 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling Collections.singletonList(null) ); - List expectedResults = Arrays.asList(null, null, null, null); - //Create base cursor ListCursor listCursor = new ListCursor(inputList); @@ -168,9 +165,9 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling while (!unnestCursor.isDone()) { Object valueSelectorVal = unnestColumnValueSelector.getObject(); if (valueSelectorVal == null) { - Assert.assertEquals(null, expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), null); } else { - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString()); } k++; unnestCursor.advance(); @@ -201,7 +198,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!unnestCursor.isDone()) { Object valueSelectorVal = unnestColumnValueSelector.getObject(); - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString()); k++; unnestCursor.advance(); } @@ -231,7 +228,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!unnestCursor.isDone()) { Object valueSelectorVal = unnestColumnValueSelector.getObject(); - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString()); k++; unnestCursor.advance(); } @@ -265,7 +262,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!unnestCursor.isDone()) { Object valueSelectorVal = unnestColumnValueSelector.getObject(); - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString()); + Assert.assertEquals(expectedResults.get(k).toString(), valueSelectorVal.toString()); k++; unnestCursor.advance(); } @@ -306,7 +303,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!parentCursor.isDone()) { Object valueSelectorVal = unnestColumnValueSelector.getObject(); - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString()); + Assert.assertEquals(expectedResults.get(k).toString(), valueSelectorVal.toString()); k++; parentCursor.advance(); } @@ -344,12 +341,12 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling if (valueSelectorVal == null) { Assert.assertEquals(null, expectedResults.get(k)); } else { - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString()); } k++; unnestCursor.advance(); } - Assert.assertEquals(k, expectedResults.size()); + Assert.assertEquals(expectedResults.size(), k); } @Test @@ -382,7 +379,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling if (valueSelectorVal == null) { Assert.assertEquals(null, expectedResults.get(k)); } else { - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString()); } k++; unnestCursor.advance(); @@ -420,7 +417,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling if (valueSelectorVal == null) { Assert.assertEquals(null, expectedResults.get(k)); } else { - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal.toString()); } k++; unnestCursor.advance(); @@ -455,7 +452,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!unnestCursor.isDone()) { Double valueSelectorVal = unnestColumnValueSelector.getDouble(); - Assert.assertEquals(valueSelectorVal, expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal); k++; unnestCursor.advance(); } @@ -489,7 +486,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!unnestCursor.isDone()) { Float valueSelectorVal = unnestColumnValueSelector.getFloat(); - Assert.assertEquals(valueSelectorVal, expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal); k++; unnestCursor.advance(); } @@ -526,7 +523,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling Object obj = unnestColumnValueSelector.getObject(); Assert.assertNotNull(obj); Long valueSelectorVal = unnestColumnValueSelector.getLong(); - Assert.assertEquals(valueSelectorVal, expectedResults.get(k)); + Assert.assertEquals(expectedResults.get(k), valueSelectorVal); k++; unnestCursor.advance(); } @@ -561,7 +558,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!unnestCursor.isDone()) { Object valueSelectorVal = unnestColumnValueSelector.getObject(); - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString()); + Assert.assertEquals(expectedResults.get(k).toString(), valueSelectorVal.toString()); k++; unnestCursor.advance(); } @@ -570,17 +567,16 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling Assert.assertFalse(unnestCursor.isDoneOrInterrupted()); } - @Test(expected = UOE.class) + @Test public void test_list_unnest_cursors_dimSelector() { List inputList = Arrays.asList( Arrays.asList("a", "b", "c"), Arrays.asList("e", "f", "g", "h", "i"), - Collections.singletonList("j") + Collections.singletonList(null) ); - List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j"); - + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i"); //Create base cursor ListCursor listCursor = new ListCursor(inputList); @@ -592,7 +588,25 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling OUTPUT_NAME, IGNORE_SET ); - unnestCursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_NAME)); + // should return a column value selector for this case + BaseSingleValueDimensionSelector unnestDimSelector = (BaseSingleValueDimensionSelector) unnestCursor.getColumnSelectorFactory() + .makeDimensionSelector( + DefaultDimensionSpec.of( + OUTPUT_NAME)); + unnestDimSelector.inspectRuntimeShape(null); + int k = 0; + while (!unnestCursor.isDone()) { + if (k < 8) { + Assert.assertEquals(expectedResults.get(k).toString(), unnestDimSelector.getValue()); + } else { + Assert.assertNull(unnestDimSelector.getValue()); + } + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 9); + unnestCursor.reset(); + Assert.assertNotNull(unnestDimSelector); } @Test @@ -622,7 +636,7 @@ public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandling int k = 0; while (!unnestCursor.isDone()) { Object valueSelectorVal = unnestColumnValueSelector.getObject(); - Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString()); + Assert.assertEquals(expectedResults.get(k).toString(), valueSelectorVal.toString()); k++; unnestCursor.advance(); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java index b1805519ade..ff92f83c377 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexFieldAccess; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; @@ -34,6 +35,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprMacroTable; @@ -58,6 +60,7 @@ import org.apache.druid.sql.calcite.filtration.Bounds; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.CannotBuildQueryException; import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; import org.apache.druid.sql.calcite.table.RowSignatures; import org.joda.time.Interval; @@ -214,12 +217,51 @@ public class Expressions return rexCallToDruidExpression(plannerContext, rowSignature, rexNode, postAggregatorVisitor); } else if (kind == SqlKind.LITERAL) { return literalToDruidExpression(plannerContext, rexNode); + } else if (kind == SqlKind.FIELD_ACCESS) { + return fieldAccessToDruidExpression(rowSignature, rexNode); } else { // Can't translate. return null; } } + private static DruidExpression fieldAccessToDruidExpression( + final RowSignature rowSignature, + final RexNode rexNode + ) + { + // Translate field references. + final RexFieldAccess ref = (RexFieldAccess) rexNode; + if (ref.getField().getIndex() > rowSignature.size()) { + // This case arises in the case of a correlation where the rexNode points to a table from the left subtree + // while the underlying datasource is the scan stub created from LogicalValuesRule + // In such a case we throw a CannotBuildQueryException so that Calcite does not go ahead with this path + // This exception is caught while returning false from isValidDruidQuery() method + throw new CannotBuildQueryException(StringUtils.format( + "Cannot build query as column name [%s] does not exist in row [%s]", ref.getField().getName(), rowSignature) + ); + } + + final String columnName = ref.getField().getName(); + final int index = rowSignature.indexOf(columnName); + + // This case arises when the rexNode has a name which is not in the underlying stub created using DruidUnnestDataSourceRule + // The column name has name ZERO with rowtype as LONG + // causes the index to be -1. In such a case we cannot build the query + // and throw an exception while returning false from isValidDruidQuery() method + if (index < 0) { + throw new CannotBuildQueryException(StringUtils.format( + "Expression referred to nonexistent index[%d] in row[%s]", + index, + rowSignature + )); + } + + final Optional columnType = rowSignature.getColumnType(index); + + return DruidExpression.ofColumn(columnType.get(), columnName); + } + private static DruidExpression inputRefToDruidExpression( final RowSignature rowSignature, final RexNode rexNode diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java new file mode 100644 index 00000000000..1751201acc1 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.rel; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; + +import javax.annotation.Nullable; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This is the DruidRel to handle correlated rel nodes to be used for unnest. + * Each correlate can be perceived as a join with the join type being inner + * the left of a correlate as seen in the rule {@link org.apache.druid.sql.calcite.rule.DruidCorrelateUnnestRule} + * is the {@link DruidQueryRel} while the right will always be an {@link DruidUnnestDatasourceRel}. + * + * Since this is a subclass of DruidRel it is automatically considered by other rules that involves DruidRels. + * Some example being SELECT_PROJECT and SORT_PROJECT rules in {@link org.apache.druid.sql.calcite.rule.DruidRules.DruidQueryRule} + */ +public class DruidCorrelateUnnestRel extends DruidRel +{ + private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("unnest"); + + private final Filter leftFilter; + private final PartialDruidQuery partialQuery; + private final PlannerConfig plannerConfig; + private final Correlate correlateRel; + private RelNode left; + private RelNode right; + + private DruidCorrelateUnnestRel( + RelOptCluster cluster, + RelTraitSet traitSet, + Correlate correlateRel, + PartialDruidQuery partialQuery, + Filter baseFilter, + PlannerContext plannerContext + ) + { + super(cluster, traitSet, plannerContext); + this.correlateRel = correlateRel; + this.partialQuery = partialQuery; + this.left = correlateRel.getLeft(); + this.right = correlateRel.getRight(); + this.leftFilter = baseFilter; + this.plannerConfig = plannerContext.getPlannerConfig(); + } + + /** + * Create an instance from a Correlate that is based on a {@link DruidRel} and a {@link DruidUnnestDatasourceRel} inputs. + */ + public static DruidCorrelateUnnestRel create( + final Correlate correlateRel, + final Filter leftFilter, + final PlannerContext plannerContext + ) + { + return new DruidCorrelateUnnestRel( + correlateRel.getCluster(), + correlateRel.getTraitSet(), + correlateRel, + PartialDruidQuery.create(correlateRel), + leftFilter, + plannerContext + ); + } + + @Nullable + @Override + public PartialDruidQuery getPartialDruidQuery() + { + return partialQuery; + } + + @Override + public DruidCorrelateUnnestRel withPartialQuery(PartialDruidQuery newQueryBuilder) + { + return new DruidCorrelateUnnestRel( + getCluster(), + getTraitSet().plusAll(newQueryBuilder.getRelTraits()), + correlateRel, + newQueryBuilder, + leftFilter, + getPlannerContext() + ); + } + + @Override + public DruidQuery toDruidQuery(boolean finalizeAggregations) + { + final DruidRel druidQueryRel = (DruidRel) left; + final DruidQuery leftQuery = Preconditions.checkNotNull((druidQueryRel).toDruidQuery(false), "leftQuery"); + final DataSource leftDataSource; + + if (DruidJoinQueryRel.computeLeftRequiresSubquery(druidQueryRel)) { + leftDataSource = new QueryDataSource(leftQuery.getQuery()); + } else { + leftDataSource = leftQuery.getDataSource(); + } + + final DruidUnnestDatasourceRel unnestDatasourceRel = (DruidUnnestDatasourceRel) right; + + + final RowSignature rowSignature = RowSignatures.fromRelDataType( + correlateRel.getRowType().getFieldNames(), + correlateRel.getRowType() + ); + + final DruidExpression expression = Expressions.toDruidExpression( + getPlannerContext(), + rowSignature, + unnestDatasourceRel.getUnnestProject().getProjects().get(0) + ); + + LogicalProject unnestProject = LogicalProject.create( + this, + ImmutableList.of(unnestDatasourceRel.getUnnestProject() + .getProjects() + .get(0)), + unnestDatasourceRel.getUnnestProject().getRowType() + ); + + // placeholder for dimension or expression to be unnested + final String dimOrExpToUnnest; + final VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create( + rowSignature, + getPlannerContext().getExprMacroTable(), + getPlannerContext().getPlannerConfig().isForceExpressionVirtualColumns() + ); + + // the unnest project is needed in case of a virtual column + // unnest(mv_to_array(dim_1)) is reconciled as unnesting a MVD dim_1 not requiring a virtual column + // while unnest(array(dim_2,dim_3)) is understood as unnesting a virtual column which is an array over dim_2 and dim_3 elements + boolean unnestProjectNeeded = false; + getPlannerContext().setJoinExpressionVirtualColumnRegistry(virtualColumnRegistry); + + // handling for case when mv_to_array is used + // No need to use virtual column in such a case + if (StringUtils.toLowerCase(expression.getExpression()).startsWith("mv_to_array")) { + dimOrExpToUnnest = expression.getArguments().get(0).getSimpleExtraction().getColumn(); + } else { + if (expression.isDirectColumnAccess()) { + dimOrExpToUnnest = expression.getDirectColumn(); + } else { + // buckle up time to create virtual columns on expressions + unnestProjectNeeded = true; + dimOrExpToUnnest = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( + expression, + expression.getDruidType() + ); + } + } + + // add the unnest project to the partial query if required + // This is necessary to handle the virtual columns on the unnestProject + // Also create the unnest datasource to be used by the partial query + PartialDruidQuery partialDruidQuery = unnestProjectNeeded ? partialQuery.withUnnest(unnestProject) : partialQuery; + return partialDruidQuery.build( + UnnestDataSource.create( + leftDataSource, + dimOrExpToUnnest, + unnestDatasourceRel.getUnnestProject().getRowType().getFieldNames().get(0), + null + ), + rowSignature, + getPlannerContext(), + getCluster().getRexBuilder(), + finalizeAggregations, + virtualColumnRegistry + ); + } + + @Override + protected DruidCorrelateUnnestRel clone() + { + return DruidCorrelateUnnestRel.create(correlateRel, leftFilter, getPlannerContext()); + } + + @Override + protected RelDataType deriveRowType() + { + return partialQuery.getRowType(); + } + + @Override + public DruidQuery toDruidQueryForExplaining() + { + return partialQuery.build( + DUMMY_DATA_SOURCE, + RowSignatures.fromRelDataType( + correlateRel.getRowType().getFieldNames(), + correlateRel.getRowType() + ), + getPlannerContext(), + getCluster().getRexBuilder(), + false + ); + } + + // This is required to be overwritten as Calcite uses this method + // to maintain a map of equivalent DruidCorrelateUnnestRel or in general any Rel nodes. + // Without this method overwritten multiple RelNodes will produce the same key + // which makes the planner plan incorrectly. + @Override + public RelWriter explainTerms(RelWriter pw) + { + final String queryString; + final DruidQuery druidQuery = toDruidQueryForExplaining(); + + try { + queryString = getPlannerContext().getJsonMapper().writeValueAsString(druidQuery.getQuery()); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return pw.item("query", queryString) + .item("signature", druidQuery.getOutputRowSignature()); + } + + // This is called from the DruidRelToDruidRule which converts from the NONE convention to the DRUID convention + @Override + public DruidCorrelateUnnestRel asDruidConvention() + { + return new DruidCorrelateUnnestRel( + getCluster(), + getTraitSet().replace(DruidConvention.instance()), + correlateRel.copy( + correlateRel.getTraitSet(), + correlateRel.getInputs() + .stream() + .map(input -> RelOptRule.convert(input, DruidConvention.instance())) + .collect(Collectors.toList()) + ), + partialQuery, + leftFilter, + getPlannerContext() + ); + } + + @Override + public List getInputs() + { + return ImmutableList.of(left, right); + } + + @Override + public RelNode copy(final RelTraitSet traitSet, final List inputs) + { + return new DruidCorrelateUnnestRel( + getCluster(), + traitSet, + correlateRel.copy(correlateRel.getTraitSet(), inputs), + getPartialDruidQuery(), + leftFilter, + getPlannerContext() + ); + } + + @Override + public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq) + { + double cost; + + if (DruidJoinQueryRel.computeLeftRequiresSubquery(DruidJoinQueryRel.getSomeDruidChild(left))) { + cost = CostEstimates.COST_SUBQUERY; + } else { + cost = partialQuery.estimateCost(); + if (correlateRel.getJoinType() == JoinRelType.INNER && plannerConfig.isComputeInnerJoinCostAsFilter()) { + cost *= CostEstimates.MULTIPLIER_FILTER; + } + } + + return planner.getCostFactory().makeCost(cost, 0, 0); + } + + @Override + public Set getDataSourceNames() + { + final Set retVal = new HashSet<>(); + retVal.addAll(((DruidRel) left).getDataSourceNames()); + retVal.addAll(((DruidRel) right).getDataSourceNames()); + return retVal; + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java index 9ff0a3cf1ba..dd97bbf3a13 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java @@ -353,7 +353,7 @@ public class DruidJoinQueryRel extends DruidRel } } - private static boolean computeLeftRequiresSubquery(final DruidRel left) + public static boolean computeLeftRequiresSubquery(final DruidRel left) { // Left requires a subquery unless it's a scan or mapping on top of any table or a join. return !DruidRels.isScanOrMapping(left, true); @@ -372,7 +372,7 @@ public class DruidJoinQueryRel extends DruidRel * Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from * applying that prefix. */ - private static Pair computeJoinRowSignature( + static Pair computeJoinRowSignature( final RowSignature leftSignature, final RowSignature rightSignature ) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 4c7340a14d0..5019f6fbaf7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -139,6 +139,9 @@ public class DruidQuery @Nullable private final Projection selectProjection; + @Nullable + private final Projection unnestProjection; + @Nullable private final Grouping grouping; @@ -159,6 +162,7 @@ public class DruidQuery final PlannerContext plannerContext, @Nullable final DimFilter filter, @Nullable final Projection selectProjection, + @Nullable final Projection unnestProjection, @Nullable final Grouping grouping, @Nullable final Sorting sorting, @Nullable final Windowing windowing, @@ -171,6 +175,7 @@ public class DruidQuery this.plannerContext = Preconditions.checkNotNull(plannerContext, "plannerContext"); this.filter = filter; this.selectProjection = selectProjection; + this.unnestProjection = unnestProjection; this.grouping = grouping; this.sorting = sorting; this.windowing = windowing; @@ -210,6 +215,7 @@ public class DruidQuery // Now the fun begins. final DimFilter filter; final Projection selectProjection; + final Projection unnestProjection; final Grouping grouping; final Sorting sorting; final Windowing windowing; @@ -290,11 +296,25 @@ public class DruidQuery windowing = null; } + if (partialQuery.getUnnestProject() != null) { + unnestProjection = Preconditions.checkNotNull( + computeUnnestProjection( + partialQuery, + plannerContext, + computeOutputRowSignature(sourceRowSignature, null, null, null, null), + virtualColumnRegistry + ) + ); + } else { + unnestProjection = null; + } + return new DruidQuery( dataSource, plannerContext, filter, selectProjection, + unnestProjection, grouping, sorting, windowing, @@ -371,6 +391,18 @@ public class DruidQuery } } + @Nonnull + private static Projection computeUnnestProjection( + final PartialDruidQuery partialQuery, + final PlannerContext plannerContext, + final RowSignature rowSignature, + final VirtualColumnRegistry virtualColumnRegistry + ) + { + final Project project = Preconditions.checkNotNull(partialQuery.getUnnestProject(), "unnestProject"); + return Projection.preAggregation(project, plannerContext, rowSignature, virtualColumnRegistry); + } + @Nonnull private static Grouping computeGrouping( final PartialDruidQuery partialQuery, @@ -762,6 +794,16 @@ public class DruidQuery } } + + if (unnestProjection != null) { + for (String columnName : unnestProjection.getVirtualColumns()) { + if (virtualColumnRegistry.isVirtualColumnDefined(columnName)) { + virtualColumns.add(virtualColumnRegistry.getVirtualColumn(columnName)); + } + } + } + + for (String columnName : specialized) { if (virtualColumnRegistry.isVirtualColumnDefined(columnName)) { virtualColumns.add(virtualColumnRegistry.getVirtualColumn(columnName)); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnnestDatasourceRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnnestDatasourceRel.java new file mode 100644 index 00000000000..eaa544d4200 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnnestDatasourceRel.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.rel; + +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Uncollect; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.druid.math.expr.Expr; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; +import org.apache.druid.math.expr.InputBindings; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.RowSignatures; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Set; + +/** + * The Rel node to capture the unnest (or uncollect) part in a query. This covers 2 cases: + * + * Case 1: + * If this is an unnest on a constant and no input table is required, the final query is built using + * an UnnestDataSource with a base InlineDataSource in this rel. + * + * Case 2: + * If the unnest has an input table, this rel resolves the unnest part and delegates the rel to be consumed by other + * rule ({@link org.apache.druid.sql.calcite.rule.DruidCorrelateUnnestRule} + */ +public class DruidUnnestDatasourceRel extends DruidRel +{ + private final Uncollect uncollect; + private final DruidQueryRel druidQueryRel; + private final LogicalProject unnestProject; + + public DruidUnnestDatasourceRel( + Uncollect uncollect, + DruidQueryRel queryRel, + LogicalProject unnestProject, + PlannerContext plannerContext + ) + { + super(uncollect.getCluster(), uncollect.getTraitSet(), plannerContext); + this.uncollect = uncollect; + this.druidQueryRel = queryRel; + this.unnestProject = unnestProject; + } + + public LogicalProject getUnnestProject() + { + return unnestProject; + } + + @Nullable + @Override + public PartialDruidQuery getPartialDruidQuery() + { + return druidQueryRel.getPartialDruidQuery(); + } + + @Override + public DruidUnnestDatasourceRel withPartialQuery(PartialDruidQuery newQueryBuilder) + { + return new DruidUnnestDatasourceRel( + uncollect, + druidQueryRel.withPartialQuery(newQueryBuilder), + unnestProject, + getPlannerContext() + ); + } + + @Override + public DruidQuery toDruidQuery(boolean finalizeAggregations) + { + VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create( + druidQueryRel.getDruidTable().getRowSignature(), + getPlannerContext().getExprMacroTable(), + getPlannerContext().getPlannerConfig().isForceExpressionVirtualColumns() + ); + getPlannerContext().setJoinExpressionVirtualColumnRegistry(virtualColumnRegistry); + + final DruidExpression expression = Expressions.toDruidExpression( + getPlannerContext(), + druidQueryRel.getDruidTable().getRowSignature(), + unnestProject.getProjects().get(0) + ); + if (expression == null) { + return null; + } + Expr parsed = expression.parse(getPlannerContext().getExprMacroTable()); + ExprEval eval = parsed.eval(InputBindings.nilBindings()); + + // If query unnests a constant expression and not use any table + // the unnest would be on an inline data source + // with the input column being called "inline" in the native query + UnnestDataSource dataSource = UnnestDataSource.create( + InlineDataSource.fromIterable( + Collections.singletonList(new Object[]{eval.value()}), + RowSignature.builder().add("inline", ExpressionType.toColumnType(eval.type())).build() + ), + "inline", + druidQueryRel.getRowType().getFieldNames().get(0), + null + ); + + DruidQuery query = druidQueryRel.getPartialDruidQuery().build( + dataSource, + RowSignatures.fromRelDataType(uncollect.getRowType().getFieldNames(), uncollect.getRowType()), + getPlannerContext(), + getCluster().getRexBuilder(), + finalizeAggregations + ); + getPlannerContext().setJoinExpressionVirtualColumnRegistry(null); + return query; + } + + @Override + public DruidQuery toDruidQueryForExplaining() + { + return toDruidQuery(false); + } + + @Override + public DruidUnnestDatasourceRel asDruidConvention() + { + return new DruidUnnestDatasourceRel( + new Uncollect(getCluster(), traitSet.replace(DruidConvention.instance()), uncollect.getInput(), false), + druidQueryRel.asDruidConvention(), + unnestProject, + getPlannerContext() + ); + } + + @Override + public RelWriter explainTerms(RelWriter pw) + { + return super.explainTerms(pw); + } + + @Override + public Set getDataSourceNames() + { + return druidQueryRel.getDruidTable().getDataSource().getTableNames(); + } + + @Override + protected RelDataType deriveRowType() + { + return uncollect.getRowType(); + } + + @Override + protected DruidUnnestDatasourceRel clone() + { + return new DruidUnnestDatasourceRel(uncollect, druidQueryRel, unnestProject, getPlannerContext()); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java index bdd4a4f735f..0cd71af8e5e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java @@ -54,6 +54,7 @@ public class PartialDruidQuery private final RelNode scan; private final Filter whereFilter; private final Project selectProject; + private final Project unnestProject; private final Aggregate aggregate; private final Filter havingFilter; private final Project aggregateProject; @@ -80,7 +81,9 @@ public class PartialDruidQuery SORT_PROJECT, // WINDOW may be present only together with SCAN. - WINDOW + WINDOW, + + UNNEST_PROJECT } private PartialDruidQuery( @@ -93,7 +96,8 @@ public class PartialDruidQuery final Filter havingFilter, final Sort sort, final Project sortProject, - final Window window + final Window window, + final Project unnestProject ) { this.builderSupplier = Preconditions.checkNotNull(builderSupplier, "builderSupplier"); @@ -106,6 +110,7 @@ public class PartialDruidQuery this.sort = sort; this.sortProject = sortProject; this.window = window; + this.unnestProject = unnestProject; } public static PartialDruidQuery create(final RelNode scanRel) @@ -114,7 +119,7 @@ public class PartialDruidQuery scanRel.getCluster(), scanRel.getTable() != null ? scanRel.getTable().getRelOptSchema() : null ); - return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null, null); + return new PartialDruidQuery(builderSupplier, scanRel, null, null, null, null, null, null, null, null, null); } public RelNode getScan() @@ -132,6 +137,11 @@ public class PartialDruidQuery return selectProject; } + public Project getUnnestProject() + { + return unnestProject; + } + public Aggregate getAggregate() { return aggregate; @@ -175,7 +185,8 @@ public class PartialDruidQuery havingFilter, sort, sortProject, - window + window, + unnestProject ); } @@ -218,7 +229,8 @@ public class PartialDruidQuery havingFilter, sort, sortProject, - window + window, + unnestProject ); } @@ -235,7 +247,8 @@ public class PartialDruidQuery havingFilter, sort, sortProject, - window + window, + unnestProject ); } @@ -252,7 +265,8 @@ public class PartialDruidQuery newHavingFilter, sort, sortProject, - window + window, + unnestProject ); } @@ -269,7 +283,8 @@ public class PartialDruidQuery havingFilter, sort, sortProject, - window + window, + unnestProject ); } @@ -286,7 +301,8 @@ public class PartialDruidQuery havingFilter, newSort, sortProject, - window + window, + unnestProject ); } @@ -303,7 +319,8 @@ public class PartialDruidQuery havingFilter, sort, newSortProject, - window + window, + unnestProject ); } @@ -320,7 +337,25 @@ public class PartialDruidQuery havingFilter, sort, sortProject, - newWindow + newWindow, + unnestProject + ); + } + + public PartialDruidQuery withUnnest(final Project newUnnestProject) + { + return new PartialDruidQuery( + builderSupplier, + scan, + whereFilter, + selectProject, + aggregate, + aggregateProject, + havingFilter, + sort, + sortProject, + window, + newUnnestProject ); } @@ -575,6 +610,7 @@ public class PartialDruidQuery ", aggregateProject=" + aggregateProject + ", sort=" + sort + ", sortProject=" + sortProject + + ", unnestProject=" + unnestProject + '}'; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidCorrelateUnnestRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidCorrelateUnnestRule.java new file mode 100644 index 00000000000..1870e11dd75 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidCorrelateUnnestRule.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.rule; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.DruidCorrelateUnnestRel; +import org.apache.druid.sql.calcite.rel.DruidQueryRel; +import org.apache.druid.sql.calcite.rel.DruidRel; +import org.apache.druid.sql.calcite.rel.DruidUnnestDatasourceRel; +import org.apache.druid.sql.calcite.rel.PartialDruidQuery; + +import java.util.ArrayList; +import java.util.List; + +/** + * This class creates the rule to abide by for creating correlations during unnest. + * Typically, Calcite plans the unnest query such as + * SELECT * from numFoo, unnest(dim3) in the following way: + * 80:LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}]) + * 6:LogicalTableScan(subset=[rel#74:Subset#0.NONE.[]], table=[[druid, numfoo]]) + * 78:Uncollect(subset=[rel#79:Subset#3.NONE.[]]) + * 76:LogicalProject(subset=[rel#77:Subset#2.NONE.[]], EXPR$0=[MV_TO_ARRAY($cor0.dim3)]) + * 7:LogicalValues(subset=[rel#75:Subset#1.NONE.[0]], tuples=[[{ 0 }]]) + * + * {@link DruidUnnestDatasourceRule} takes care of the Uncollect(last 3 lines) to generate a {@link DruidUnnestDatasourceRel} + * thereby reducing the logical plan to: + * LogicalCorrelate + * / \ + * DruidRel DruidUnnestDataSourceRel + * + * This forms the premise of this rule. The goal is to transform the above-mentioned structure in the tree + * with a new rel {@link DruidCorrelateUnnestRel} which shall be created here. + * + */ +public class DruidCorrelateUnnestRule extends RelOptRule +{ + private final PlannerContext plannerContext; + private final boolean enableLeftScanDirect; + + public DruidCorrelateUnnestRule(final PlannerContext plannerContext) + { + super( + operand( + Correlate.class, + operand(DruidRel.class, any()), + operand(DruidUnnestDatasourceRel.class, any()) + ) + ); + + this.plannerContext = plannerContext; + this.enableLeftScanDirect = plannerContext.queryContext().getEnableJoinLeftScanDirect(); + } + + @Override + public boolean matches(RelOptRuleCall call) + { + final DruidRel druidRel = call.rel(1); + final DruidRel uncollectRel = call.rel(2); + + return druidRel.getPartialDruidQuery() != null + && uncollectRel.getPartialDruidQuery() != null; + } + + + @Override + public void onMatch(RelOptRuleCall call) + { + final Correlate correlate = call.rel(0); + final DruidRel druidRel = call.rel(1); + DruidUnnestDatasourceRel druidUnnestDatasourceRel = call.rel(2); + + + final RexBuilder rexBuilder = correlate.getCluster().getRexBuilder(); + + final Filter druidRelFilter; + final DruidRel newDruidRelFilter; + final List newProjectExprs = new ArrayList<>(); + + final boolean isLeftDirectAccessPossible = enableLeftScanDirect && (druidRel instanceof DruidQueryRel); + + if (druidRel.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT + && (isLeftDirectAccessPossible || druidRel.getPartialDruidQuery().getWhereFilter() == null)) { + // Swap the druidRel-side projection above the correlate, so the druidRel side is a simple scan or mapping. + // This helps us avoid subqueries. + final RelNode leftScan = druidRel.getPartialDruidQuery().getScan(); + final Project leftProject = druidRel.getPartialDruidQuery().getSelectProject(); + druidRelFilter = druidRel.getPartialDruidQuery().getWhereFilter(); + + // Left-side projection expressions rewritten to be on top of the correlate. + newProjectExprs.addAll(leftProject.getProjects()); + newDruidRelFilter = druidRel.withPartialQuery(PartialDruidQuery.create(leftScan)); + } else { + // Leave druidRel as-is. Write input refs that do nothing. + for (int i = 0; i < druidRel.getRowType().getFieldCount(); i++) { + newProjectExprs.add(rexBuilder.makeInputRef(correlate.getRowType().getFieldList().get(i).getType(), i)); + } + newDruidRelFilter = druidRel; + druidRelFilter = null; + } + + if (druidUnnestDatasourceRel.getPartialDruidQuery().stage() == PartialDruidQuery.Stage.SELECT_PROJECT) { + for (final RexNode rexNode : RexUtil.shift( + druidUnnestDatasourceRel.getPartialDruidQuery() + .getSelectProject() + .getProjects(), + newDruidRelFilter.getRowType().getFieldCount() + )) { + newProjectExprs.add(rexNode); + } + } else { + for (int i = 0; i < druidUnnestDatasourceRel.getRowType().getFieldCount(); i++) { + newProjectExprs.add( + rexBuilder.makeInputRef( + correlate.getRowType() + .getFieldList() + .get(druidRel.getRowType().getFieldCount() + i) + .getType(), + newDruidRelFilter.getRowType().getFieldCount() + i + ) + ); + } + } + + final DruidCorrelateUnnestRel druidCorr = DruidCorrelateUnnestRel.create( + correlate.copy( + correlate.getTraitSet(), + newDruidRelFilter, + druidUnnestDatasourceRel, + correlate.getCorrelationId(), + correlate.getRequiredColumns(), + correlate.getJoinType() + ), + druidRelFilter, + plannerContext + ); + + + final RelBuilder relBuilder = + call.builder() + .push(druidCorr) + .project(RexUtil.fixUp( + rexBuilder, + newProjectExprs, + RelOptUtil.getFieldTypeList(druidCorr.getRowType()) + )); + + call.transformTo(relBuilder.build()); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRelToDruidRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRelToDruidRule.java index 8acc5c906b4..c24ebc6f01c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRelToDruidRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRelToDruidRule.java @@ -50,7 +50,8 @@ public class DruidRelToDruidRule extends ConverterRule public RelNode convert(RelNode rel) { try { - return ((DruidRel) rel).asDruidConvention(); + final RelNode newRel = ((DruidRel) rel).asDruidConvention(); + return newRel; } catch (Exception e) { log.error(e, "Conversion failed"); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java index 8316b1e9868..4276a48b2f1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java @@ -97,7 +97,9 @@ public class DruidRules new DruidUnionRule(plannerContext), new DruidUnionDataSourceRule(plannerContext), DruidSortUnionRule.instance(), - DruidJoinRule.instance(plannerContext) + DruidJoinRule.instance(plannerContext), + new DruidUnnestDatasourceRule(plannerContext), + new DruidCorrelateUnnestRule(plannerContext) ) ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnnestDatasourceRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnnestDatasourceRule.java new file mode 100644 index 00000000000..e8123fe0670 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnnestDatasourceRule.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.rule; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Uncollect; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.tools.RelBuilder; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.DruidQueryRel; +import org.apache.druid.sql.calcite.rel.DruidUnnestDatasourceRel; + +/** + * This class creates the rule to abide by for creating unnest (internally uncollect) in Calcite. + * Typically, Calcite plans the *unnest* part of the query involving a table such as + * SELECT * from numFoo, unnest(dim3) + * or even a standalone unnest query such as + * SELECT * from unnest(ARRAY[1,2,3]) in the following way: + * 78:Uncollect(subset=[rel#79:Subset#3.NONE.[]]) + * 76:LogicalProject(subset=[rel#77:Subset#2.NONE.[]], EXPR$0=[MV_TO_ARRAY($cor0.dim3)]) + * 7:LogicalValues(subset=[rel#75:Subset#1.NONE.[0]], tuples=[[{ 0 }]]) + * + * Calcite tackles plans bottom up. Therefore, + * {@link DruidLogicalValuesRule} converts the LogicalValues part into a leaf level {@link DruidQueryRel} + * thereby creating the following subtree in the call tree + * + * Uncollect + * \ + * LogicalProject + * \ + * DruidQueryRel + * + * + * This forms the premise of this rule. The goal is to transform the above-mentioned structure in the tree + * with a new rel {@link DruidUnnestDatasourceRel} which shall be created here. + * + */ +public class DruidUnnestDatasourceRule extends RelOptRule +{ + private final PlannerContext plannerContext; + + public DruidUnnestDatasourceRule(PlannerContext plannerContext) + { + super( + operand( + Uncollect.class, + operand(LogicalProject.class, operand(DruidQueryRel.class, none())) + ) + ); + this.plannerContext = plannerContext; + } + + @Override + public boolean matches(RelOptRuleCall call) + { + return true; + } + + @Override + public void onMatch(final RelOptRuleCall call) + { + final Uncollect uncollectRel = call.rel(0); + final LogicalProject logicalProject = call.rel(1); + final DruidQueryRel druidQueryRel = call.rel(2); + + final RexBuilder rexBuilder = logicalProject.getCluster().getRexBuilder(); + + final LogicalProject queryProject = LogicalProject.create( + uncollectRel, + ImmutableList.of(rexBuilder.makeInputRef(uncollectRel.getRowType().getFieldList().get(0).getType(), 0)), + uncollectRel.getRowType() + ); + + DruidUnnestDatasourceRel unnestDatasourceRel = new DruidUnnestDatasourceRel( + uncollectRel, + druidQueryRel.withPartialQuery(druidQueryRel.getPartialDruidQuery().withSelectProject(queryProject)), + logicalProject, + plannerContext + ); + + final RelBuilder relBuilder = + call.builder() + .push(unnestDatasourceRel); + + call.transformTo(relBuilder.build()); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index b517f2377ff..ead3b506e3c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -29,9 +29,11 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.ExpressionProcessing; import org.apache.druid.query.Druids; +import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; @@ -51,7 +53,9 @@ import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.topn.TopNQueryBuilder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinType; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.util.CalciteTests; @@ -2613,4 +2617,703 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest Assert.assertEquals(path, expected, actual); } } + + @Test + public void testUnnestInline() + { + skipVectorize(); + cannotVectorize(); + testQuery( + "SELECT * FROM UNNEST(ARRAY[1,2,3])", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource( + UnnestDataSource.create( + InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{new Object[]{1L, 2L, 3L}}), + RowSignature.builder().add("inline", ColumnType.LONG_ARRAY).build() + ), + "inline", + "EXPR$0", + null + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .build() + ), + ImmutableList.of( + new Object[]{1}, + new Object[]{2}, + new Object[]{3} + ) + ); + } + + @Test + public void testUnnest() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + "dim3", + "EXPR$0", + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{""}, + new Object[]{""} + ) : + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{null}, + new Object[]{null} + ) + ); + } + + @Test + public void testUnnestWithGroupBy() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) GROUP BY d3 ", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + "dim3", + "EXPR$0", + null + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setContext(QUERY_CONTEXT_DEFAULT) + .setDimensions(new DefaultDimensionSpec("EXPR$0", "_d0", ColumnType.STRING)) + .setGranularity(Granularities.ALL) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{""}, + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"} + ) : + ImmutableList.of( + new Object[]{null}, + new Object[]{""}, + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"} + ) + ); + } + + @Test + public void testUnnestWithGroupByOrderBy() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT d3, COUNT(*) FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) AS unnested(d3) GROUP BY d3 ORDER BY d3 DESC ", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + "dim3", + "EXPR$0", + null + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setContext(QUERY_CONTEXT_DEFAULT) + .setDimensions(new DefaultDimensionSpec("EXPR$0", "_d0", ColumnType.STRING)) + .setGranularity(Granularities.ALL) + .setLimitSpec( + DefaultLimitSpec + .builder() + .orderBy(new OrderByColumnSpec( + "_d0", + OrderByColumnSpec.Direction.DESCENDING, + StringComparators.LEXICOGRAPHIC + )) + .build() + ) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"d", 1L}, + new Object[]{"c", 1L}, + new Object[]{"b", 2L}, + new Object[]{"a", 1L}, + new Object[]{"", 3L} + ) : + ImmutableList.of( + new Object[]{"d", 1L}, + new Object[]{"c", 1L}, + new Object[]{"b", 2L}, + new Object[]{"a", 1L}, + new Object[]{"", 1L}, + new Object[]{null, 2L} + ) + ); + } + + @Test + public void testUnnestWithGroupByOrderByWithLimit() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT d3, COUNT(*) FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) AS unnested(d3) GROUP BY d3 ORDER BY d3 ASC LIMIT 4 ", + ImmutableList.of( + new TopNQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + "dim3", + "EXPR$0", + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("EXPR$0", "_d0", ColumnType.STRING)) + .metric(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC)) + .threshold(4) + .aggregators(aggregators(new CountAggregatorFactory("a0"))) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"", 3L}, + new Object[]{"a", 1L}, + new Object[]{"b", 2L}, + new Object[]{"c", 1L} + ) : + ImmutableList.of( + new Object[]{null, 2L}, + new Object[]{"", 1L}, + new Object[]{"a", 1L}, + new Object[]{"b", 2L} + ) + ); + } + + + @Test + public void testUnnestWithLimit() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) LIMIT 3", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + "dim3", + "EXPR$0", + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .limit(3) + .build() + ), + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"} + ) + ); + } + + @Test + public void testUnnestFirstQueryOnSelect() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT d3 FROM (select dim1, dim2, dim3 from druid.numfoo), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + "dim3", + "EXPR$0", + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{""}, + new Object[]{""} + ) : + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"b"}, + new Object[]{"c"}, + new Object[]{"d"}, + new Object[]{""}, + new Object[]{null}, + new Object[]{null} + ) + ); + } + + @Test + public void testUnnestWithFilters() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT d3 FROM (select * from druid.numfoo where dim2='a'), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new QueryDataSource( + newScanQueryBuilder() + .dataSource( + new TableDataSource(CalciteTests.DATASOURCE3) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING)) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .filters(new SelectorDimFilter("dim2", "a", null)) + .columns( + "__time", + "cnt", + "d1", + "d2", + "dim1", + "dim3", + "dim4", + "dim5", + "dim6", + "f1", + "f2", + "l1", + "l2", + "m1", + "m2", + "unique_dim1", + "v0" + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + "dim3", + "EXPR$0", + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .build() + ), + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{""} + ) + ); + } + + @Test + public void testUnnestWithInFilters() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT d3 FROM (select * from druid.numfoo where dim2 IN ('a','b','ab','abc')), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new QueryDataSource( + newScanQueryBuilder() + .dataSource( + new TableDataSource(CalciteTests.DATASOURCE3) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .filters(new InDimFilter("dim2", ImmutableList.of("a", "b", "ab", "abc"), null)) + .columns( + "__time", + "cnt", + "d1", + "d2", + "dim1", + "dim2", + "dim3", + "dim4", + "dim5", + "dim6", + "f1", + "f2", + "l1", + "l2", + "m1", + "m2", + "unique_dim1" + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + "dim3", + "EXPR$0", + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .build() + ), + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{""}, + useDefault ? + new Object[]{""} : new Object[]{null} + ) + ); + } + + @Test + public void testUnnestVirtualWithColumns() + { + // This tells the test to skip generating (vectorize = force) path + // Generates only 1 native query with vectorize = false + skipVectorize(); + // This tells that both vectorize = force and vectorize = false takes the same path of non vectorization + // Generates 2 native queries with 2 different values of vectorize + cannotVectorize(); + testQuery( + "SELECT strings FROM druid.numfoo, UNNEST(ARRAY[dim4, dim5]) as unnested (strings)", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + "v0", + "EXPR$0", + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .virtualColumns(expressionVirtualColumn("v0", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY)) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .build() + ), + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"aa"}, + new Object[]{"a"}, + new Object[]{"ab"}, + new Object[]{"a"}, + new Object[]{"ba"}, + new Object[]{"b"}, + new Object[]{"ad"}, + new Object[]{"b"}, + new Object[]{"aa"}, + new Object[]{"b"}, + new Object[]{"ab"} + ) + ); + } + + @Test + public void testUnnestWithGroupByOrderByOnVirtualColumn() + { + skipVectorize(); + cannotVectorize(); + testQuery( + "SELECT d24, COUNT(*) FROM druid.numfoo, UNNEST(ARRAY[dim2, dim4]) AS unnested(d24) GROUP BY d24 ORDER BY d24 DESC ", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(UnnestDataSource.create( + new TableDataSource(CalciteTests.DATASOURCE3), + "v0", + "EXPR$0", + null + )) + .setVirtualColumns(expressionVirtualColumn( + "v0", + "array(\"dim2\",\"dim4\")", + ColumnType.STRING_ARRAY + )) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setContext(QUERY_CONTEXT_DEFAULT) + .setDimensions(new DefaultDimensionSpec("EXPR$0", "_d0", ColumnType.STRING)) + .setGranularity(Granularities.ALL) + .setLimitSpec( + DefaultLimitSpec + .builder() + .orderBy(new OrderByColumnSpec( + "_d0", + OrderByColumnSpec.Direction.DESCENDING, + StringComparators.LEXICOGRAPHIC + )) + .build() + ) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"b", 3L}, + new Object[]{"abc", 1L}, + new Object[]{"a", 5L}, + new Object[]{"", 3L} + ) : + ImmutableList.of( + new Object[]{"b", 3L}, + new Object[]{"abc", 1L}, + new Object[]{"a", 5L}, + new Object[]{"", 1L}, + new Object[]{null, 2L} + ) + ); + } + + @Test + public void testUnnestWithJoinOnTheLeft() + { + skipVectorize(); + cannotVectorize(); + testQuery( + "SELECT d3 from (SELECT * from druid.numfoo JOIN (select dim2 as t from druid.numfoo where dim2 IN ('a','b','ab','abc')) ON dim2=t), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource(UnnestDataSource.create( + join( + new TableDataSource(CalciteTests.DATASOURCE3), + new QueryDataSource( + newScanQueryBuilder() + .dataSource( + new TableDataSource(CalciteTests.DATASOURCE3) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .filters(new InDimFilter("dim2", ImmutableList.of("a", "b", "ab", "abc"), null)) + .columns( + "dim2" + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + "j0.", + "(\"dim2\" == \"j0.dim2\")", + JoinType.INNER + ), + "dim3", + "EXPR$0", + null + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .build() + ), + useDefault ? + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{""}, + new Object[]{""}, + new Object[]{""} + ) : + ImmutableList.of( + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{"a"}, + new Object[]{"b"}, + new Object[]{""}, + new Object[]{""}, + new Object[]{null} + ) + ); + } + + @Test + public void testUnnestWithConstant() + { + // Since there is a constant on the right, + // Druid will plan this as a join query + // as there is nothing to correlate between left and right + skipVectorize(); + cannotVectorize(); + testQuery( + "SELECT longs FROM druid.numfoo, UNNEST(ARRAY[1,2,3]) as unnested (longs)", + ImmutableList.of( + Druids.newScanQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE3), + new QueryDataSource( + newScanQueryBuilder() + .dataSource( + UnnestDataSource.create( + InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{new Object[]{1L, 2L, 3L}}), + RowSignature.builder().add("inline", ColumnType.LONG_ARRAY).build() + ), + "inline", + "EXPR$0", + null + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "EXPR$0" + )) + .build() + ), + "j0.", + "1", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(QUERY_CONTEXT_DEFAULT) + .columns(ImmutableList.of( + "j0.EXPR$0" + )) + .build() + ), + ImmutableList.of( + new Object[]{1}, + new Object[]{2}, + new Object[]{3}, + new Object[]{1}, + new Object[]{2}, + new Object[]{3}, + new Object[]{1}, + new Object[]{2}, + new Object[]{3}, + new Object[]{1}, + new Object[]{2}, + new Object[]{3}, + new Object[]{1}, + new Object[]{2}, + new Object[]{3}, + new Object[]{1}, + new Object[]{2}, + new Object[]{3} + ) + ); + } }