diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index d8481bf7a09..16eaef63c49 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -51,6 +51,7 @@ import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.planning.DataSourceAnalysis; @@ -170,6 +171,18 @@ public class DataSourcePlan minStageNumber, broadcast ); + } else if (dataSource instanceof UnionDataSource) { + return forUnion( + queryKit, + queryId, + queryContext, + (UnionDataSource) dataSource, + querySegmentSpec, + filter, + maxWorkerCount, + minStageNumber, + broadcast + ); } else if (dataSource instanceof JoinDataSource) { final JoinAlgorithm preferredJoinAlgorithm = PlannerContext.getJoinAlgorithm(queryContext); final JoinAlgorithm deducedJoinAlgorithm = deduceJoinAlgorithm( @@ -458,6 +471,54 @@ public class DataSourcePlan ); } + private static DataSourcePlan forUnion( + final QueryKit queryKit, + final String queryId, + final QueryContext queryContext, + final UnionDataSource unionDataSource, + final QuerySegmentSpec querySegmentSpec, + @Nullable DimFilter filter, + final int maxWorkerCount, + final int minStageNumber, + final boolean broadcast + ) + { + // This is done to prevent loss of generality since MSQ can plan any type of DataSource. + List children = unionDataSource.getDataSources(); + + final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(); + final List newChildren = new ArrayList<>(); + final List inputSpecs = new ArrayList<>(); + final IntSet broadcastInputs = new IntOpenHashSet(); + + for (DataSource child : children) { + DataSourcePlan childDataSourcePlan = forDataSource( + queryKit, + queryId, + queryContext, + child, + querySegmentSpec, + filter, + maxWorkerCount, + Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()), + broadcast + ); + + int shift = inputSpecs.size(); + + newChildren.add(shiftInputNumbers(childDataSourcePlan.getNewDataSource(), shift)); + inputSpecs.addAll(childDataSourcePlan.getInputSpecs()); + childDataSourcePlan.getSubQueryDefBuilder().ifPresent(subqueryDefBuilder::addAll); + childDataSourcePlan.getBroadcastInputs().forEach(inp -> broadcastInputs.add(inp + shift)); + } + return new DataSourcePlan( + new UnionDataSource(newChildren), + inputSpecs, + broadcastInputs, + subqueryDefBuilder + ); + } + /** * Build a plan for broadcast hash-join. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index e6578388a40..cb331760ca3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -114,6 +114,7 @@ public class MSQTaskSqlEngine implements SqlEngine case TIME_BOUNDARY_QUERY: case GROUPING_SETS: case WINDOW_FUNCTIONS: + case ALLOW_TOP_LEVEL_UNION_ALL: return false; case UNNEST: case CAN_SELECT: diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 42bb1506a30..4b77dd78b33 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -20,6 +20,8 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableMap; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -330,4 +332,46 @@ public class MSQFaultsTest extends MSQTestBase .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2)) .verifyResults(); } + + @Test + public void testUnionAllWithDifferentColumnNames() + { + // This test fails till MSQ can support arbitrary column names and column types for UNION ALL + testIngestQuery() + .setSql( + "INSERT INTO druid.dst " + + "SELECT dim2, dim1, m1 FROM foo2 " + + "UNION ALL " + + "SELECT dim1, dim2, m1 FROM foo " + + "PARTITIONED BY ALL TIME") + .setExpectedValidationErrorMatcher( + new DruidExceptionMatcher( + DruidException.Persona.ADMIN, + DruidException.Category.INVALID_INPUT, + "general" + ).expectMessageContains("SQL requires union between two tables and column names queried for each table are different " + + "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1].")) + .verifyPlanningErrors(); + } + + @Test + public void testTopLevelUnionAllWithJoins() + { + // This test fails becaues it is a top level UNION ALL which cannot be planned using MSQ. It will be supported once + // we support arbitrary types and column names for UNION ALL + testSelectQuery() + .setSql( + "(SELECT COUNT(*) FROM foo INNER JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k) " + + "UNION ALL " + + "(SELECT SUM(cnt) FROM foo)" + ) + .setExpectedValidationErrorMatcher( + new DruidExceptionMatcher( + DruidException.Persona.ADMIN, + DruidException.Category.INVALID_INPUT, + "general" + ).expectMessageContains( + "SQL requires union between inputs that are not simple table scans and involve a filter or aliasing")) + .verifyPlanningErrors(); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index ac9ca855a63..d771f7497a8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -51,6 +51,7 @@ import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -1929,8 +1930,8 @@ public class MSQSelectTest extends MSQTestBase new ColumnMappings(ImmutableList.of( new ColumnMapping("d0", "cnt"), new ColumnMapping("a0", "cnt1") - ) )) + ) .tuningConfig(MSQTuningConfig.defaultConfig()) .destination(isDurableStorageDestination() ? DurableStorageMSQDestination.INSTANCE @@ -2322,6 +2323,64 @@ public class MSQSelectTest extends MSQTestBase .verifyResults(); } + @Test + public void testUnionAllUsingUnionDataSource() + { + + final RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .build(); + + final List results = ImmutableList.of( + new Object[]{946684800000L, ""}, + new Object[]{946684800000L, ""}, + new Object[]{946771200000L, "10.1"}, + new Object[]{946771200000L, "10.1"}, + new Object[]{946857600000L, "2"}, + new Object[]{946857600000L, "2"}, + new Object[]{978307200000L, "1"}, + new Object[]{978307200000L, "1"}, + new Object[]{978393600000L, "def"}, + new Object[]{978393600000L, "def"}, + new Object[]{978480000000L, "abc"}, + new Object[]{978480000000L, "abc"} + ); + // This plans the query using DruidUnionDataSourceRule since the DruidUnionDataSourceRule#isCompatible + // returns true (column names, types match, and it is a union on the table data sources). + // It gets planned correctly, however MSQ engine cannot plan the query correctly + testSelectQuery() + .setSql("SELECT __time, dim1 FROM foo\n" + + "UNION ALL\n" + + "SELECT __time, dim1 FROM foo\n") + .setExpectedRowSignature(rowSignature) + .setExpectedMSQSpec( + MSQSpec.builder() + .query(newScanQueryBuilder() + .dataSource(new UnionDataSource( + ImmutableList.of(new TableDataSource("foo"), new TableDataSource("foo")) + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .context(defaultScanQueryContext( + context, + rowSignature + )) + .columns(ImmutableList.of("__time", "dim1")) + .build()) + .columnMappings(ColumnMappings.identity(rowSignature)) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(isDurableStorageDestination() + ? DurableStorageMSQDestination.INSTANCE + : TaskReportMSQDestination.INSTANCE) + .build() + ) + .setQueryContext(context) + .setExpectedResultRows(results) + .verifyResults(); + } + @Nonnull private List expectedMultiValueFooRowsGroup() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java new file mode 100644 index 00000000000..8ee9e78c838 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import com.google.inject.Module; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.guice.DruidInjectorBuilder; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.msq.exec.WorkerMemoryParameters; +import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.sql.calcite.BaseCalciteQueryTest; +import org.apache.druid.sql.calcite.CalciteUnionQueryTest; +import org.apache.druid.sql.calcite.QueryTestBuilder; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.run.SqlEngine; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Runs {@link CalciteUnionQueryTest} but with MSQ engine + */ +public class CalciteUnionQueryMSQTest extends CalciteUnionQueryTest +{ + private TestGroupByBuffers groupByBuffers; + + @Before + public void setup2() + { + groupByBuffers = TestGroupByBuffers.createDefault(); + } + + @After + public void teardown2() + { + groupByBuffers.close(); + } + + @Override + public void configureGuice(DruidInjectorBuilder builder) + { + super.configureGuice(builder); + builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder, groupByBuffers).toArray(new Module[0])); + } + + + @Override + public SqlEngine createEngine( + QueryLifecycleFactory qlf, + ObjectMapper queryJsonMapper, + Injector injector + ) + { + final WorkerMemoryParameters workerMemoryParameters = + WorkerMemoryParameters.createInstance( + WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50, + 2, + 10, + 2, + 0, + 0 + ); + final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient( + queryJsonMapper, + injector, + new MSQTestTaskActionClient(queryJsonMapper), + workerMemoryParameters + ); + return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper); + } + + @Override + protected QueryTestBuilder testBuilder() + { + return new QueryTestBuilder(new BaseCalciteQueryTest.CalciteTestConfig(true)) + .addCustomRunner(new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient())) + .skipVectorize(true) + .verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate()) + .msqCompatible(msqCompatible); + } + + /** + * Generates a different error hint than what is required by the native engine, since planner does try to plan "UNION" + * using group by, however fails due to the column name mismatch. + * MSQ does wnat to support any type of data source, with least restrictive column names and types, therefore it + * should eventually work. + */ + @Test + @Override + public void testUnionIsUnplannable() + { + assertQueryIsUnplannable( + "SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo", + "SQL requires union between two tables and column names queried for each table are different Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]." + ); + + } + + @Ignore("Ignored till MSQ can plan UNION ALL with any operand") + @Test + public void testUnionOnSubqueries() + { + testQuery( + "SELECT\n" + + " SUM(cnt),\n" + + " COUNT(*)\n" + + "FROM (\n" + + " (SELECT dim2, SUM(cnt) AS cnt FROM druid.foo GROUP BY dim2)\n" + + " UNION ALL\n" + + " (SELECT dim2, SUM(cnt) AS cnt FROM druid.foo GROUP BY dim2)\n" + + ")", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new LongSumAggregatorFactory("_a0", "a0"), + new CountAggregatorFactory("_a1") + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.replaceWithDefault() ? + ImmutableList.of( + new Object[]{12L, 3L} + ) : + ImmutableList.of( + new Object[]{12L, 4L} + ) + ); + } + +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 4d97b911a09..31ece253ebd 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1412,9 +1412,11 @@ public class MSQTestBase extends BaseCalciteQueryTest public void verifyResults() { - Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null"); - Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null"); - Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null "); + if (expectedMSQFault == null) { + Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null"); + Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null"); + Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null "); + } Pair, List>> specAndResults = runQueryWithResult(); if (specAndResults == null) { // A fault was expected and the assertion has been done in the runQueryWithResult diff --git a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java index 3f538f5ad5a..27a0113d76f 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java @@ -23,11 +23,12 @@ package org.apache.druid.query; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.SegmentReference; +import org.apache.druid.utils.CollectionUtils; import java.util.Collections; import java.util.List; @@ -36,13 +37,24 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; +/** + * Reperesents a UNION ALL of two or more datasources. + * + * Native engine can only work with table datasources that are scans or simple mappings (column rename without any + * expression applied on top). Therefore, it uses methods like {@link #getTableNames()} and + * {@link #getDataSourcesAsTableDataSources()} to assert that the children were TableDataSources. + * + * MSQ should be able to plan and work with arbitrary datasources. It also needs to replace the datasource with the + * InputNumberDataSource while preparing the query plan. + */ public class UnionDataSource implements DataSource { - @JsonProperty - private final List dataSources; + + @JsonProperty("dataSources") + private final List dataSources; @JsonCreator - public UnionDataSource(@JsonProperty("dataSources") List dataSources) + public UnionDataSource(@JsonProperty("dataSources") List dataSources) { if (dataSources == null || dataSources.isEmpty()) { throw new ISE("'dataSources' must be non-null and non-empty for 'union'"); @@ -51,18 +63,45 @@ public class UnionDataSource implements DataSource this.dataSources = dataSources; } + public List getDataSources() + { + return dataSources; + } + + + /** + * Asserts that the children of the union are all table data sources before returning the table names + */ @Override public Set getTableNames() { - return dataSources.stream() - .map(input -> Iterables.getOnlyElement(input.getTableNames())) - .collect(Collectors.toSet()); + return dataSources + .stream() + .map(input -> { + if (!(input instanceof TableDataSource)) { + throw DruidException.defensive("should be table"); + } + return CollectionUtils.getOnlyElement( + input.getTableNames(), + xs -> DruidException.defensive("Expected only single table name in TableDataSource") + ); + }) + .collect(Collectors.toSet()); } - @JsonProperty - public List getDataSources() + /** + * Asserts that the children of the union are all table data sources + */ + public List getDataSourcesAsTableDataSources() { - return dataSources; + return dataSources.stream() + .map(input -> { + if (!(input instanceof TableDataSource)) { + throw DruidException.defensive("should be table"); + } + return (TableDataSource) input; + }) + .collect(Collectors.toList()); } @Override @@ -78,13 +117,7 @@ public class UnionDataSource implements DataSource throw new IAE("Expected [%d] children, got [%d]", dataSources.size(), children.size()); } - if (!children.stream().allMatch(dataSource -> dataSource instanceof TableDataSource)) { - throw new IAE("All children must be tables"); - } - - return new UnionDataSource( - children.stream().map(dataSource -> (TableDataSource) dataSource).collect(Collectors.toList()) - ); + return new UnionDataSource(children); } @Override @@ -149,11 +182,7 @@ public class UnionDataSource implements DataSource UnionDataSource that = (UnionDataSource) o; - if (!dataSources.equals(that.dataSources)) { - return false; - } - - return true; + return dataSources.equals(that.dataSources); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java index aeb3897e644..5459e1d8c22 100644 --- a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java @@ -57,16 +57,16 @@ public class UnionQueryRunner implements QueryRunner final UnionDataSource unionDataSource = analysis.getBaseUnionDataSource().get(); - if (unionDataSource.getDataSources().isEmpty()) { + if (unionDataSource.getDataSourcesAsTableDataSources().isEmpty()) { // Shouldn't happen, because UnionDataSource doesn't allow empty unions. throw new ISE("Unexpectedly received empty union"); - } else if (unionDataSource.getDataSources().size() == 1) { + } else if (unionDataSource.getDataSourcesAsTableDataSources().size() == 1) { // Single table. Run as a normal query. return baseRunner.run( queryPlus.withQuery( Queries.withBaseDataSource( query, - Iterables.getOnlyElement(unionDataSource.getDataSources()) + Iterables.getOnlyElement(unionDataSource.getDataSourcesAsTableDataSources()) ) ), responseContext @@ -77,8 +77,8 @@ public class UnionQueryRunner implements QueryRunner query.getResultOrdering(), Sequences.simple( Lists.transform( - IntStream.range(0, unionDataSource.getDataSources().size()) - .mapToObj(i -> new Pair<>(unionDataSource.getDataSources().get(i), i + 1)) + IntStream.range(0, unionDataSource.getDataSourcesAsTableDataSources().size()) + .mapToObj(i -> new Pair<>(unionDataSource.getDataSourcesAsTableDataSources().get(i), i + 1)) .collect(Collectors.toList()), (Function, Sequence>) singleSourceWithIndex -> baseRunner.run( diff --git a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java index 7c7f50f281b..e7850953a60 100644 --- a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java @@ -89,7 +89,7 @@ public class DataSourceTest Assert.assertTrue(dataSource instanceof UnionDataSource); Assert.assertEquals( Lists.newArrayList(new TableDataSource("ds1"), new TableDataSource("ds2")), - Lists.newArrayList(((UnionDataSource) dataSource).getDataSources()) + Lists.newArrayList(((UnionDataSource) dataSource).getDataSourcesAsTableDataSources()) ); Assert.assertEquals( ImmutableSet.of("ds1", "ds2"), diff --git a/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java index f408e71abf2..12522df08df 100644 --- a/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java +++ b/processing/src/test/java/org/apache/druid/query/UnionDataSourceTest.java @@ -123,7 +123,7 @@ public class UnionDataSourceTest @Test public void test_withChildren_sameNumber() { - final List newDataSources = ImmutableList.of( + final List newDataSources = ImmutableList.of( new TableDataSource("baz"), new TableDataSource("qux") ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java index 5e213de711c..dbbcfa0f9a3 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java @@ -118,7 +118,7 @@ public class DruidUnionDataSourceRel extends DruidRel @Override public DruidQuery toDruidQuery(final boolean finalizeAggregations) { - final List dataSources = new ArrayList<>(); + final List dataSources = new ArrayList<>(); RowSignature signature = null; for (final RelNode relNode : unionRel.getInputs()) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java index 8ca4ab076d9..dfcf1652c0d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java @@ -95,13 +95,16 @@ public class DruidRules DruidOuterQueryRule.WHERE_FILTER, DruidOuterQueryRule.SELECT_PROJECT, DruidOuterQueryRule.SORT, - new DruidUnionRule(plannerContext), + new DruidUnionRule(plannerContext), // Add top level union rule since it helps in constructing a cleaner error message for the user new DruidUnionDataSourceRule(plannerContext), - DruidSortUnionRule.instance(), DruidJoinRule.instance(plannerContext) ) ); + if (plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) { + retVal.add(DruidSortUnionRule.instance()); + } + if (plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) { retVal.add(new DruidQueryRule<>(Window.class, PartialDruidQuery.Stage.WINDOW, PartialDruidQuery::withWindow)); retVal.add( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidSortUnionRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidSortUnionRule.java index daf1162ac44..d06c39d72b5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidSortUnionRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidSortUnionRule.java @@ -32,8 +32,9 @@ import java.util.Collections; */ public class DruidSortUnionRule extends RelOptRule { - private static final DruidSortUnionRule INSTANCE = new DruidSortUnionRule(); + private static final DruidSortUnionRule INSTANCE = new DruidSortUnionRule(); + private DruidSortUnionRule() { super(operand(Sort.class, operand(DruidUnionRel.class, any()))); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java index 99f6248b37d..e4a72776315 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java @@ -112,7 +112,12 @@ public class DruidUnionDataSourceRule extends RelOptRule // Can only do UNION ALL of inputs that have compatible schemas (or schema mappings) and right side // is a simple table scan - public static boolean isCompatible(final Union unionRel, final DruidRel first, final DruidRel second, @Nullable PlannerContext plannerContext) + public static boolean isCompatible( + final Union unionRel, + final DruidRel first, + final DruidRel second, + @Nullable PlannerContext plannerContext + ) { if (!(second instanceof DruidQueryRel)) { return false; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionRule.java index 40cb2161c15..58fddbf933f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionRule.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionRule.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Union; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.DruidRel; import org.apache.druid.sql.calcite.rel.DruidUnionRel; +import org.apache.druid.sql.calcite.run.EngineFeature; import java.util.List; @@ -51,6 +52,10 @@ public class DruidUnionRule extends RelOptRule @Override public boolean matches(RelOptRuleCall call) { + if (plannerContext != null && !plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) { + plannerContext.setPlanningError("Queries cannot be planned using top level union all"); + return false; + } // Make DruidUnionRule and DruidUnionDataSourceRule mutually exclusive. final Union unionRel = call.rel(0); final DruidRel firstDruidRel = call.rel(1); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java index 94827c2955d..778c7ec03b6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java @@ -102,5 +102,21 @@ public enum EngineFeature * that it actually *does* generate correct results in native when the join is processed on the Broker. It is much * less likely that MSQ will plan in such a way that generates correct results. */ - ALLOW_BROADCAST_RIGHTY_JOIN; + ALLOW_BROADCAST_RIGHTY_JOIN, + + /** + * Planner is permitted to use {@link org.apache.druid.sql.calcite.rel.DruidUnionRel} to plan the top level UNION ALL. + * This is to dissuade planner from accepting and running the UNION ALL queries that are not supported by engines + * (primarily MSQ). + * + * Due to the nature of the exeuction of the top level UNION ALLs (we run the individual queries and concat the + * results), it only makes sense to enable this on engines where the queries return the results synchronously + * + * Planning queries with top level UNION_ALL leads to undesirable behaviour with asynchronous engines like MSQ. + * To enumerate this behaviour for MSQ, the broker attempts to run the individual queries as MSQ queries in succession, + * submits the first query correctly, fails on the rest of the queries (due to conflicting taskIds), + * and cannot concat the results together (as * the result for broker is the query id). Therefore, we don't get the + * correct result back, while the MSQ engine is executing the partial query + */ + ALLOW_TOP_LEVEL_UNION_ALL; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index d7fc7d043b6..164e02a0ca8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -105,6 +105,7 @@ public class NativeSqlEngine implements SqlEngine case WINDOW_FUNCTIONS: case UNNEST: case ALLOW_BROADCAST_RIGHTY_JOIN: + case ALLOW_TOP_LEVEL_UNION_ALL: return true; case TIME_BOUNDARY_QUERY: return plannerContext.queryContext().isTimeBoundaryPlanningEnabled(); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java index cd719d7f29f..e2ce813a37f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java @@ -63,6 +63,7 @@ public class ViewSqlEngine implements SqlEngine case GROUPING_SETS: case WINDOW_FUNCTIONS: case UNNEST: + case ALLOW_TOP_LEVEL_UNION_ALL: return true; // Views can't sit on top of INSERT or REPLACE. case CAN_INSERT: diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 4042def2750..d49f7de9dd5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -2874,476 +2874,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } - @DecoupledIgnore - @Test - public void testUnionAllDifferentTablesWithMapping() - { - msqIncompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE3) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 2.0, 2L}, - new Object[]{"1", "a", 8.0, 2L} - ) - ); - } - - @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) - @Test - public void testJoinUnionAllDifferentTablesWithMapping() - { - msqIncompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE3) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 2.0, 2L}, - new Object[]{"1", "a", 8.0, 2L} - ) - ); - } - - @Test - public void testUnionAllTablesColumnCountMismatch() - { - try { - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM numfoo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of(), - ImmutableList.of() - ); - Assert.fail("query execution should fail"); - } - catch (DruidException e) { - MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [42])")); - } - } - - @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) - @Test - public void testUnionAllTablesColumnTypeMismatchFloatLong() - { - msqIncompatible(); - // "m1" has a different type in foo and foo2 (float vs long), but this query is OK anyway because they can both - // be implicitly cast to double. - - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo2 UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'en'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE2), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("en", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 1.0, 1L}, - new Object[]{"1", "a", 4.0, 1L}, - new Object[]{"druid", "en", 1.0, 1L} - ) - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionAllTablesColumnTypeMismatchStringLong() - { - // "dim3" has a different type in foo and foo2 (string vs long), which requires a casting subquery, so this - // query cannot be planned. - - assertQueryIsUnplannable( - "SELECT\n" - + "dim3, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim3, dim2, m1 FROM foo2 UNION ALL SELECT dim3, dim2, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'en'\n" - + "GROUP BY 1, 2", - "SQL requires union between inputs that are not simple table scans and involve a " + - "filter or aliasing. Or column types of tables being unioned are not of same type." - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionAllTablesWhenMappingIsRequired() - { - // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. - - assertQueryIsUnplannable( - "SELECT\n" - + "c, COUNT(*)\n" - + "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT dim2 AS c, m1 FROM numfoo)\n" - + "WHERE c = 'a' OR c = 'def'\n" - + "GROUP BY 1", - "SQL requires union between two tables " + - "and column names queried for each table are different Left: [dim1], Right: [dim2]." - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionIsUnplannable() - { - // Cannot plan this UNION operation - assertQueryIsUnplannable( - "SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo", - "SQL requires 'UNION' but only 'UNION ALL' is supported." - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionAllTablesWhenCastAndMappingIsRequired() - { - // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. - assertQueryIsUnplannable( - "SELECT\n" - + "c, COUNT(*)\n" - + "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT cnt AS c, m1 FROM numfoo)\n" - + "WHERE c = 'a' OR c = 'def'\n" - + "GROUP BY 1", - "SQL requires union between inputs that are not simple table scans and involve " + - "a filter or aliasing. Or column types of tables being unioned are not of same type." - ); - } - - @DecoupledIgnore - @Test - public void testUnionAllSameTableTwice() - { - msqIncompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 2.0, 2L}, - new Object[]{"1", "a", 8.0, 2L} - ) - ); - } - - @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) - @Test - public void testUnionAllSameTableTwiceWithSameMapping() - { - msqIncompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 2.0, 2L}, - new Object[]{"1", "a", 8.0, 2L} - ) - ); - } - - @DecoupledIgnore(mode = Modes.ERROR_HANDLING) - @Test - public void testUnionAllSameTableTwiceWithDifferentMapping() - { - // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. - assertQueryIsUnplannable( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim2, dim1, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - "SQL requires union between two tables and column names queried for each table are different Left: [dim1, dim2, m1], Right: [dim2, dim1, m1]." - ); - } - @DecoupledIgnore - @Test - public void testUnionAllSameTableThreeTimes() - { - msqIncompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 3.0, 3L}, - new Object[]{"1", "a", 12.0, 3L} - ) - ); - } - - @Test - public void testUnionAllThreeTablesColumnCountMismatch1() - { - try { - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of(), - ImmutableList.of() - ); - Assert.fail("query execution should fail"); - } - catch (DruidException e) { - MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])")); - } - } - - @Test - public void testUnionAllThreeTablesColumnCountMismatch2() - { - try { - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of(), - ImmutableList.of() - ); - Assert.fail("query execution should fail"); - } - catch (DruidException e) { - MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])")); - } - } - - @Test - public void testUnionAllThreeTablesColumnCountMismatch3() - { - try { - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * from numfoo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of(), - ImmutableList.of() - ); - Assert.fail("query execution should fail"); - } - catch (DruidException e) { - MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [70])")); - } - } - - @DecoupledIgnore - @Test - public void testUnionAllSameTableThreeTimesWithSameMapping() - { - msqIncompatible(); - testQuery( - "SELECT\n" - + "dim1, dim2, SUM(m1), COUNT(*)\n" - + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" - + "WHERE dim2 = 'a' OR dim2 = 'def'\n" - + "GROUP BY 1, 2", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) - .setDimensions( - new DefaultDimensionSpec("dim1", "d0"), - new DefaultDimensionSpec("dim2", "d1") - ) - .setAggregatorSpecs( - aggregators( - new DoubleSumAggregatorFactory("a0", "m1"), - new CountAggregatorFactory("a1") - ) - ) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{"", "a", 3.0, 3L}, - new Object[]{"1", "a", 12.0, 3L} - ) - ); - } - @Test public void testPruneDeadAggregators() { @@ -3669,6 +3199,107 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + /** + * This test case should be in {@link CalciteUnionQueryTest}. However, there's a bug in the test framework that + * doesn't reset framework once the merge buffers + */ + @DecoupledIgnore + @Test + public void testUnionAllSameTableThreeTimes() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 3.0, 3L}, + new Object[]{"1", "a", 12.0, 3L} + ) + ); + } + + @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) + @Test + public void testExactCountDistinctUsingSubqueryOnUnionAllTables() + { + testQuery( + "SELECT\n" + + " SUM(cnt),\n" + + " COUNT(*)\n" + + "FROM (\n" + + " SELECT dim2, SUM(cnt) AS cnt\n" + + " FROM (SELECT * FROM druid.foo UNION ALL SELECT * FROM druid.foo)\n" + + " GROUP BY dim2\n" + + ")", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new LongSumAggregatorFactory("_a0", "a0"), + new CountAggregatorFactory("_a1") + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.replaceWithDefault() ? + ImmutableList.of( + new Object[]{12L, 3L} + ) : + ImmutableList.of( + new Object[]{12L, 4L} + ) + ); + } + @Test public void testNullDoubleTopN() { @@ -7330,60 +6961,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } - @DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES) - @Test - public void testExactCountDistinctUsingSubqueryOnUnionAllTables() - { - msqIncompatible(); - testQuery( - "SELECT\n" - + " SUM(cnt),\n" - + " COUNT(*)\n" - + "FROM (\n" - + " SELECT dim2, SUM(cnt) AS cnt\n" - + " FROM (SELECT * FROM druid.foo UNION ALL SELECT * FROM druid.foo)\n" - + " GROUP BY dim2\n" - + ")", - ImmutableList.of( - GroupByQuery.builder() - .setDataSource( - new QueryDataSource( - GroupByQuery.builder() - .setDataSource( - new UnionDataSource( - ImmutableList.of( - new TableDataSource(CalciteTests.DATASOURCE1), - new TableDataSource(CalciteTests.DATASOURCE1) - ) - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) - .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ) - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setAggregatorSpecs(aggregators( - new LongSumAggregatorFactory("_a0", "a0"), - new CountAggregatorFactory("_a1") - )) - .setContext(QUERY_CONTEXT_DEFAULT) - .build() - ), - NullHandling.replaceWithDefault() ? - ImmutableList.of( - new Object[]{12L, 3L} - ) : - ImmutableList.of( - new Object[]{12L, 4L} - ) - ); - } - @Test public void testAvgDailyCountDistinct() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteUnionQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteUnionQueryTest.java new file mode 100644 index 00000000000..773e1776857 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteUnionQueryTest.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; +import org.junit.Test; + +public class CalciteUnionQueryTest extends BaseCalciteQueryTest +{ + @Test + public void testUnionAllDifferentTablesWithMapping() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE3) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 2.0, 2L}, + new Object[]{"1", "a", 8.0, 2L} + ) + ); + } + + @Test + public void testJoinUnionAllDifferentTablesWithMapping() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE3) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 2.0, 2L}, + new Object[]{"1", "a", 8.0, 2L} + ) + ); + } + + @Test + public void testUnionAllTablesColumnCountMismatch() + { + try { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM numfoo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of(), + ImmutableList.of() + ); + Assert.fail("query execution should fail"); + } + catch (DruidException e) { + MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [42])")); + } + } + + @Test + public void testUnionAllTablesColumnTypeMismatchFloatLong() + { + // "m1" has a different type in foo and foo2 (float vs long), but this query is OK anyway because they can both + // be implicitly cast to double. + + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo2 UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'en'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE2), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("en", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 1.0, 1L}, + new Object[]{"1", "a", 4.0, 1L}, + new Object[]{"druid", "en", 1.0, 1L} + ) + ); + } + + @Test + public void testUnionAllTablesColumnTypeMismatchStringLong() + { + // "dim3" has a different type in foo and foo2 (string vs long), which requires a casting subquery, so this + // query cannot be planned. + + assertQueryIsUnplannable( + "SELECT\n" + + "dim3, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim3, dim2, m1 FROM foo2 UNION ALL SELECT dim3, dim2, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'en'\n" + + "GROUP BY 1, 2", + "SQL requires union between inputs that are not simple table scans and involve a " + + "filter or aliasing. Or column types of tables being unioned are not of same type." + ); + } + + @Test + public void testUnionAllTablesWhenMappingIsRequired() + { + // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. + + assertQueryIsUnplannable( + "SELECT\n" + + "c, COUNT(*)\n" + + "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT dim2 AS c, m1 FROM numfoo)\n" + + "WHERE c = 'a' OR c = 'def'\n" + + "GROUP BY 1", + "SQL requires union between two tables " + + "and column names queried for each table are different Left: [dim1], Right: [dim2]." + ); + } + + @Test + public void testUnionIsUnplannable() + { + // Cannot plan this UNION operation + assertQueryIsUnplannable( + "SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo", + "SQL requires 'UNION' but only 'UNION ALL' is supported." + ); + } + + @Test + public void testUnionAllTablesWhenCastAndMappingIsRequired() + { + // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. + assertQueryIsUnplannable( + "SELECT\n" + + "c, COUNT(*)\n" + + "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT cnt AS c, m1 FROM numfoo)\n" + + "WHERE c = 'a' OR c = 'def'\n" + + "GROUP BY 1", + "SQL requires union between inputs that are not simple table scans and involve " + + "a filter or aliasing. Or column types of tables being unioned are not of same type." + ); + } + + @Test + public void testUnionAllSameTableTwice() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 2.0, 2L}, + new Object[]{"1", "a", 8.0, 2L} + ) + ); + } + + @Test + public void testUnionAllSameTableTwiceWithSameMapping() + { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new UnionDataSource( + ImmutableList.of( + new TableDataSource(CalciteTests.DATASOURCE1), + new TableDataSource(CalciteTests.DATASOURCE1) + ) + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter(in("dim2", ImmutableList.of("def", "a"), null)) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0"), + new DefaultDimensionSpec("dim2", "d1") + ) + .setAggregatorSpecs( + aggregators( + new DoubleSumAggregatorFactory("a0", "m1"), + new CountAggregatorFactory("a1") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", "a", 2.0, 2L}, + new Object[]{"1", "a", 8.0, 2L} + ) + ); + } + + @Test + public void testUnionAllSameTableTwiceWithDifferentMapping() + { + // Cannot plan this UNION ALL operation, because the column swap would require generating a subquery. + assertQueryIsUnplannable( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim2, dim1, m1 FROM foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + "SQL requires union between two tables and column names queried for each table are different Left: [dim1, dim2, m1], Right: [dim2, dim1, m1]." + ); + } + + @Test + public void testUnionAllThreeTablesColumnCountMismatch1() + { + try { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of(), + ImmutableList.of() + ); + Assert.fail("query execution should fail"); + } + catch (DruidException e) { + MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])")); + } + } + + @Test + public void testUnionAllThreeTablesColumnCountMismatch2() + { + try { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of(), + ImmutableList.of() + ); + Assert.fail("query execution should fail"); + } + catch (DruidException e) { + MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])")); + } + } + + @Test + public void testUnionAllThreeTablesColumnCountMismatch3() + { + try { + testQuery( + "SELECT\n" + + "dim1, dim2, SUM(m1), COUNT(*)\n" + + "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * from numfoo)\n" + + "WHERE dim2 = 'a' OR dim2 = 'def'\n" + + "GROUP BY 1, 2", + ImmutableList.of(), + ImmutableList.of() + ); + Assert.fail("query execution should fail"); + } + catch (DruidException e) { + MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [70])")); + } + } + +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java index 46fb40fddad..b0bf0bd7b29 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java @@ -90,6 +90,7 @@ public class IngestionTestSqlEngine implements SqlEngine case READ_EXTERNAL_DATA: case SCAN_ORDER_BY_NON_TIME: case ALLOW_BROADCAST_RIGHTY_JOIN: + case ALLOW_TOP_LEVEL_UNION_ALL: return true; default: throw SqlEngines.generateUnrecognizedFeatureException(IngestionTestSqlEngine.class.getSimpleName(), feature); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java index 48e7ee2423b..12db32d4f01 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.planner; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import com.google.inject.Guice; import com.google.inject.Injector; @@ -42,7 +43,7 @@ import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider; -import org.apache.druid.sql.calcite.run.SqlEngine; +import org.apache.druid.sql.calcite.run.NativeSqlEngine; import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog; import org.apache.druid.sql.calcite.schema.DruidSchemaName; import org.apache.druid.sql.calcite.schema.NamedSchema; @@ -90,8 +91,6 @@ public class CalcitePlannerModuleTest extends CalciteTestBase @Mock private DruidSchemaCatalog rootSchema; - @Mock - private SqlEngine engine; private Set aggregators; private Set operatorConversions; @@ -175,10 +174,11 @@ public class CalcitePlannerModuleTest extends CalciteTestBase @Test public void testExtensionCalciteRule() { + ObjectMapper mapper = new DefaultObjectMapper(); PlannerToolbox toolbox = new PlannerToolbox( injector.getInstance(DruidOperatorTable.class), macroTable, - new DefaultObjectMapper(), + mapper, injector.getInstance(PlannerConfig.class), rootSchema, joinableFactoryWrapper, @@ -189,11 +189,10 @@ public class CalcitePlannerModuleTest extends CalciteTestBase AuthConfig.newBuilder().build() ); - PlannerContext context = PlannerContext.create( toolbox, "SELECT 1", - engine, + new NativeSqlEngine(queryLifecycleFactory, mapper), Collections.emptyMap(), null );