UNION ALLs in MSQ (#14981)

MSQ now supports UNION ALL with UnionDataSource
This commit is contained in:
Laksh Singla 2023-10-09 18:18:15 +05:30 committed by GitHub
parent 40a6dc4631
commit 549ef56288
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 962 additions and 569 deletions

View File

@ -51,6 +51,7 @@ import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
@ -170,6 +171,18 @@ public class DataSourcePlan
minStageNumber,
broadcast
);
} else if (dataSource instanceof UnionDataSource) {
return forUnion(
queryKit,
queryId,
queryContext,
(UnionDataSource) dataSource,
querySegmentSpec,
filter,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof JoinDataSource) {
final JoinAlgorithm preferredJoinAlgorithm = PlannerContext.getJoinAlgorithm(queryContext);
final JoinAlgorithm deducedJoinAlgorithm = deduceJoinAlgorithm(
@ -458,6 +471,54 @@ public class DataSourcePlan
);
}
private static DataSourcePlan forUnion(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final UnionDataSource unionDataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
// This is done to prevent loss of generality since MSQ can plan any type of DataSource.
List<DataSource> children = unionDataSource.getDataSources();
final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder();
final List<DataSource> newChildren = new ArrayList<>();
final List<InputSpec> inputSpecs = new ArrayList<>();
final IntSet broadcastInputs = new IntOpenHashSet();
for (DataSource child : children) {
DataSourcePlan childDataSourcePlan = forDataSource(
queryKit,
queryId,
queryContext,
child,
querySegmentSpec,
filter,
maxWorkerCount,
Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
broadcast
);
int shift = inputSpecs.size();
newChildren.add(shiftInputNumbers(childDataSourcePlan.getNewDataSource(), shift));
inputSpecs.addAll(childDataSourcePlan.getInputSpecs());
childDataSourcePlan.getSubQueryDefBuilder().ifPresent(subqueryDefBuilder::addAll);
childDataSourcePlan.getBroadcastInputs().forEach(inp -> broadcastInputs.add(inp + shift));
}
return new DataSourcePlan(
new UnionDataSource(newChildren),
inputSpecs,
broadcastInputs,
subqueryDefBuilder
);
}
/**
* Build a plan for broadcast hash-join.
*/

View File

@ -114,6 +114,7 @@ public class MSQTaskSqlEngine implements SqlEngine
case TIME_BOUNDARY_QUERY:
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case ALLOW_TOP_LEVEL_UNION_ALL:
return false;
case UNNEST:
case CAN_SELECT:

View File

@ -20,6 +20,8 @@
package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -330,4 +332,46 @@ public class MSQFaultsTest extends MSQTestBase
.setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2))
.verifyResults();
}
@Test
public void testUnionAllWithDifferentColumnNames()
{
// This test fails till MSQ can support arbitrary column names and column types for UNION ALL
testIngestQuery()
.setSql(
"INSERT INTO druid.dst "
+ "SELECT dim2, dim1, m1 FROM foo2 "
+ "UNION ALL "
+ "SELECT dim1, dim2, m1 FROM foo "
+ "PARTITIONED BY ALL TIME")
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(
DruidException.Persona.ADMIN,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains("SQL requires union between two tables and column names queried for each table are different "
+ "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."))
.verifyPlanningErrors();
}
@Test
public void testTopLevelUnionAllWithJoins()
{
// This test fails becaues it is a top level UNION ALL which cannot be planned using MSQ. It will be supported once
// we support arbitrary types and column names for UNION ALL
testSelectQuery()
.setSql(
"(SELECT COUNT(*) FROM foo INNER JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k) "
+ "UNION ALL "
+ "(SELECT SUM(cnt) FROM foo)"
)
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(
DruidException.Persona.ADMIN,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains(
"SQL requires union between inputs that are not simple table scans and involve a filter or aliasing"))
.verifyPlanningErrors();
}
}

View File

@ -51,6 +51,7 @@ import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -1929,8 +1930,8 @@ public class MSQSelectTest extends MSQTestBase
new ColumnMappings(ImmutableList.of(
new ColumnMapping("d0", "cnt"),
new ColumnMapping("a0", "cnt1")
)
))
)
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
@ -2322,6 +2323,64 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
@Test
public void testUnionAllUsingUnionDataSource()
{
final RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();
final List<Object[]> results = ImmutableList.of(
new Object[]{946684800000L, ""},
new Object[]{946684800000L, ""},
new Object[]{946771200000L, "10.1"},
new Object[]{946771200000L, "10.1"},
new Object[]{946857600000L, "2"},
new Object[]{946857600000L, "2"},
new Object[]{978307200000L, "1"},
new Object[]{978307200000L, "1"},
new Object[]{978393600000L, "def"},
new Object[]{978393600000L, "def"},
new Object[]{978480000000L, "abc"},
new Object[]{978480000000L, "abc"}
);
// This plans the query using DruidUnionDataSourceRule since the DruidUnionDataSourceRule#isCompatible
// returns true (column names, types match, and it is a union on the table data sources).
// It gets planned correctly, however MSQ engine cannot plan the query correctly
testSelectQuery()
.setSql("SELECT __time, dim1 FROM foo\n"
+ "UNION ALL\n"
+ "SELECT __time, dim1 FROM foo\n")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQSpec(
MSQSpec.builder()
.query(newScanQueryBuilder()
.dataSource(new UnionDataSource(
ImmutableList.of(new TableDataSource("foo"), new TableDataSource("foo"))
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(defaultScanQueryContext(
context,
rowSignature
))
.columns(ImmutableList.of("__time", "dim1"))
.build())
.columnMappings(ColumnMappings.identity(rowSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setQueryContext(context)
.setExpectedResultRows(results)
.verifyResults();
}
@Nonnull
private List<Object[]> expectedMultiValueFooRowsGroup()
{

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.CalciteUnionQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Runs {@link CalciteUnionQueryTest} but with MSQ engine
*/
public class CalciteUnionQueryMSQTest extends CalciteUnionQueryTest
{
private TestGroupByBuffers groupByBuffers;
@Before
public void setup2()
{
groupByBuffers = TestGroupByBuffers.createDefault();
}
@After
public void teardown2()
{
groupByBuffers.close();
}
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder, groupByBuffers).toArray(new Module[0]));
}
@Override
public SqlEngine createEngine(
QueryLifecycleFactory qlf,
ObjectMapper queryJsonMapper,
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters =
WorkerMemoryParameters.createInstance(
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
2,
10,
2,
0,
0
);
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper),
workerMemoryParameters
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
}
@Override
protected QueryTestBuilder testBuilder()
{
return new QueryTestBuilder(new BaseCalciteQueryTest.CalciteTestConfig(true))
.addCustomRunner(new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient()))
.skipVectorize(true)
.verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate())
.msqCompatible(msqCompatible);
}
/**
* Generates a different error hint than what is required by the native engine, since planner does try to plan "UNION"
* using group by, however fails due to the column name mismatch.
* MSQ does wnat to support any type of data source, with least restrictive column names and types, therefore it
* should eventually work.
*/
@Test
@Override
public void testUnionIsUnplannable()
{
assertQueryIsUnplannable(
"SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo",
"SQL requires union between two tables and column names queried for each table are different Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."
);
}
@Ignore("Ignored till MSQ can plan UNION ALL with any operand")
@Test
public void testUnionOnSubqueries()
{
testQuery(
"SELECT\n"
+ " SUM(cnt),\n"
+ " COUNT(*)\n"
+ "FROM (\n"
+ " (SELECT dim2, SUM(cnt) AS cnt FROM druid.foo GROUP BY dim2)\n"
+ " UNION ALL\n"
+ " (SELECT dim2, SUM(cnt) AS cnt FROM druid.foo GROUP BY dim2)\n"
+ ")",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(aggregators(
new LongSumAggregatorFactory("_a0", "a0"),
new CountAggregatorFactory("_a1")
))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
NullHandling.replaceWithDefault() ?
ImmutableList.of(
new Object[]{12L, 3L}
) :
ImmutableList.of(
new Object[]{12L, 4L}
)
);
}
}

View File

@ -1412,9 +1412,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
public void verifyResults()
{
Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null");
Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null");
Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null ");
if (expectedMSQFault == null) {
Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null");
Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null");
Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null ");
}
Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>> specAndResults = runQueryWithResult();
if (specAndResults == null) { // A fault was expected and the assertion has been done in the runQueryWithResult

View File

@ -23,11 +23,12 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.utils.CollectionUtils;
import java.util.Collections;
import java.util.List;
@ -36,13 +37,24 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Reperesents a UNION ALL of two or more datasources.
*
* Native engine can only work with table datasources that are scans or simple mappings (column rename without any
* expression applied on top). Therefore, it uses methods like {@link #getTableNames()} and
* {@link #getDataSourcesAsTableDataSources()} to assert that the children were TableDataSources.
*
* MSQ should be able to plan and work with arbitrary datasources. It also needs to replace the datasource with the
* InputNumberDataSource while preparing the query plan.
*/
public class UnionDataSource implements DataSource
{
@JsonProperty
private final List<TableDataSource> dataSources;
@JsonProperty("dataSources")
private final List<DataSource> dataSources;
@JsonCreator
public UnionDataSource(@JsonProperty("dataSources") List<TableDataSource> dataSources)
public UnionDataSource(@JsonProperty("dataSources") List<DataSource> dataSources)
{
if (dataSources == null || dataSources.isEmpty()) {
throw new ISE("'dataSources' must be non-null and non-empty for 'union'");
@ -51,18 +63,45 @@ public class UnionDataSource implements DataSource
this.dataSources = dataSources;
}
public List<DataSource> getDataSources()
{
return dataSources;
}
/**
* Asserts that the children of the union are all table data sources before returning the table names
*/
@Override
public Set<String> getTableNames()
{
return dataSources.stream()
.map(input -> Iterables.getOnlyElement(input.getTableNames()))
.collect(Collectors.toSet());
return dataSources
.stream()
.map(input -> {
if (!(input instanceof TableDataSource)) {
throw DruidException.defensive("should be table");
}
return CollectionUtils.getOnlyElement(
input.getTableNames(),
xs -> DruidException.defensive("Expected only single table name in TableDataSource")
);
})
.collect(Collectors.toSet());
}
@JsonProperty
public List<TableDataSource> getDataSources()
/**
* Asserts that the children of the union are all table data sources
*/
public List<TableDataSource> getDataSourcesAsTableDataSources()
{
return dataSources;
return dataSources.stream()
.map(input -> {
if (!(input instanceof TableDataSource)) {
throw DruidException.defensive("should be table");
}
return (TableDataSource) input;
})
.collect(Collectors.toList());
}
@Override
@ -78,13 +117,7 @@ public class UnionDataSource implements DataSource
throw new IAE("Expected [%d] children, got [%d]", dataSources.size(), children.size());
}
if (!children.stream().allMatch(dataSource -> dataSource instanceof TableDataSource)) {
throw new IAE("All children must be tables");
}
return new UnionDataSource(
children.stream().map(dataSource -> (TableDataSource) dataSource).collect(Collectors.toList())
);
return new UnionDataSource(children);
}
@Override
@ -149,11 +182,7 @@ public class UnionDataSource implements DataSource
UnionDataSource that = (UnionDataSource) o;
if (!dataSources.equals(that.dataSources)) {
return false;
}
return true;
return dataSources.equals(that.dataSources);
}
@Override

View File

@ -57,16 +57,16 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
final UnionDataSource unionDataSource = analysis.getBaseUnionDataSource().get();
if (unionDataSource.getDataSources().isEmpty()) {
if (unionDataSource.getDataSourcesAsTableDataSources().isEmpty()) {
// Shouldn't happen, because UnionDataSource doesn't allow empty unions.
throw new ISE("Unexpectedly received empty union");
} else if (unionDataSource.getDataSources().size() == 1) {
} else if (unionDataSource.getDataSourcesAsTableDataSources().size() == 1) {
// Single table. Run as a normal query.
return baseRunner.run(
queryPlus.withQuery(
Queries.withBaseDataSource(
query,
Iterables.getOnlyElement(unionDataSource.getDataSources())
Iterables.getOnlyElement(unionDataSource.getDataSourcesAsTableDataSources())
)
),
responseContext
@ -77,8 +77,8 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
query.getResultOrdering(),
Sequences.simple(
Lists.transform(
IntStream.range(0, unionDataSource.getDataSources().size())
.mapToObj(i -> new Pair<>(unionDataSource.getDataSources().get(i), i + 1))
IntStream.range(0, unionDataSource.getDataSourcesAsTableDataSources().size())
.mapToObj(i -> new Pair<>(unionDataSource.getDataSourcesAsTableDataSources().get(i), i + 1))
.collect(Collectors.toList()),
(Function<Pair<TableDataSource, Integer>, Sequence<T>>) singleSourceWithIndex ->
baseRunner.run(

View File

@ -89,7 +89,7 @@ public class DataSourceTest
Assert.assertTrue(dataSource instanceof UnionDataSource);
Assert.assertEquals(
Lists.newArrayList(new TableDataSource("ds1"), new TableDataSource("ds2")),
Lists.newArrayList(((UnionDataSource) dataSource).getDataSources())
Lists.newArrayList(((UnionDataSource) dataSource).getDataSourcesAsTableDataSources())
);
Assert.assertEquals(
ImmutableSet.of("ds1", "ds2"),

View File

@ -123,7 +123,7 @@ public class UnionDataSourceTest
@Test
public void test_withChildren_sameNumber()
{
final List<TableDataSource> newDataSources = ImmutableList.of(
final List<DataSource> newDataSources = ImmutableList.of(
new TableDataSource("baz"),
new TableDataSource("qux")
);

View File

@ -118,7 +118,7 @@ public class DruidUnionDataSourceRel extends DruidRel<DruidUnionDataSourceRel>
@Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final List<TableDataSource> dataSources = new ArrayList<>();
final List<DataSource> dataSources = new ArrayList<>();
RowSignature signature = null;
for (final RelNode relNode : unionRel.getInputs()) {

View File

@ -95,13 +95,16 @@ public class DruidRules
DruidOuterQueryRule.WHERE_FILTER,
DruidOuterQueryRule.SELECT_PROJECT,
DruidOuterQueryRule.SORT,
new DruidUnionRule(plannerContext),
new DruidUnionRule(plannerContext), // Add top level union rule since it helps in constructing a cleaner error message for the user
new DruidUnionDataSourceRule(plannerContext),
DruidSortUnionRule.instance(),
DruidJoinRule.instance(plannerContext)
)
);
if (plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) {
retVal.add(DruidSortUnionRule.instance());
}
if (plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) {
retVal.add(new DruidQueryRule<>(Window.class, PartialDruidQuery.Stage.WINDOW, PartialDruidQuery::withWindow));
retVal.add(

View File

@ -32,6 +32,7 @@ import java.util.Collections;
*/
public class DruidSortUnionRule extends RelOptRule
{
private static final DruidSortUnionRule INSTANCE = new DruidSortUnionRule();
private DruidSortUnionRule()

View File

@ -112,7 +112,12 @@ public class DruidUnionDataSourceRule extends RelOptRule
// Can only do UNION ALL of inputs that have compatible schemas (or schema mappings) and right side
// is a simple table scan
public static boolean isCompatible(final Union unionRel, final DruidRel<?> first, final DruidRel<?> second, @Nullable PlannerContext plannerContext)
public static boolean isCompatible(
final Union unionRel,
final DruidRel<?> first,
final DruidRel<?> second,
@Nullable PlannerContext plannerContext
)
{
if (!(second instanceof DruidQueryRel)) {
return false;

View File

@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Union;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.run.EngineFeature;
import java.util.List;
@ -51,6 +52,10 @@ public class DruidUnionRule extends RelOptRule
@Override
public boolean matches(RelOptRuleCall call)
{
if (plannerContext != null && !plannerContext.featureAvailable(EngineFeature.ALLOW_TOP_LEVEL_UNION_ALL)) {
plannerContext.setPlanningError("Queries cannot be planned using top level union all");
return false;
}
// Make DruidUnionRule and DruidUnionDataSourceRule mutually exclusive.
final Union unionRel = call.rel(0);
final DruidRel<?> firstDruidRel = call.rel(1);

View File

@ -102,5 +102,21 @@ public enum EngineFeature
* that it actually *does* generate correct results in native when the join is processed on the Broker. It is much
* less likely that MSQ will plan in such a way that generates correct results.
*/
ALLOW_BROADCAST_RIGHTY_JOIN;
ALLOW_BROADCAST_RIGHTY_JOIN,
/**
* Planner is permitted to use {@link org.apache.druid.sql.calcite.rel.DruidUnionRel} to plan the top level UNION ALL.
* This is to dissuade planner from accepting and running the UNION ALL queries that are not supported by engines
* (primarily MSQ).
*
* Due to the nature of the exeuction of the top level UNION ALLs (we run the individual queries and concat the
* results), it only makes sense to enable this on engines where the queries return the results synchronously
*
* Planning queries with top level UNION_ALL leads to undesirable behaviour with asynchronous engines like MSQ.
* To enumerate this behaviour for MSQ, the broker attempts to run the individual queries as MSQ queries in succession,
* submits the first query correctly, fails on the rest of the queries (due to conflicting taskIds),
* and cannot concat the results together (as * the result for broker is the query id). Therefore, we don't get the
* correct result back, while the MSQ engine is executing the partial query
*/
ALLOW_TOP_LEVEL_UNION_ALL;
}

View File

@ -105,6 +105,7 @@ public class NativeSqlEngine implements SqlEngine
case WINDOW_FUNCTIONS:
case UNNEST:
case ALLOW_BROADCAST_RIGHTY_JOIN:
case ALLOW_TOP_LEVEL_UNION_ALL:
return true;
case TIME_BOUNDARY_QUERY:
return plannerContext.queryContext().isTimeBoundaryPlanningEnabled();

View File

@ -63,6 +63,7 @@ public class ViewSqlEngine implements SqlEngine
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case UNNEST:
case ALLOW_TOP_LEVEL_UNION_ALL:
return true;
// Views can't sit on top of INSERT or REPLACE.
case CAN_INSERT:

View File

@ -2874,476 +2874,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@DecoupledIgnore
@Test
public void testUnionAllDifferentTablesWithMapping()
{
msqIncompatible();
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE3)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 2.0, 2L},
new Object[]{"1", "a", 8.0, 2L}
)
);
}
@DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES)
@Test
public void testJoinUnionAllDifferentTablesWithMapping()
{
msqIncompatible();
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE3)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 2.0, 2L},
new Object[]{"1", "a", 8.0, 2L}
)
);
}
@Test
public void testUnionAllTablesColumnCountMismatch()
{
try {
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM foo UNION ALL SELECT * FROM numfoo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("query execution should fail");
}
catch (DruidException e) {
MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [42])"));
}
}
@DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES)
@Test
public void testUnionAllTablesColumnTypeMismatchFloatLong()
{
msqIncompatible();
// "m1" has a different type in foo and foo2 (float vs long), but this query is OK anyway because they can both
// be implicitly cast to double.
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo2 UNION ALL SELECT dim1, dim2, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'en'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE2),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("en", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 1.0, 1L},
new Object[]{"1", "a", 4.0, 1L},
new Object[]{"druid", "en", 1.0, 1L}
)
);
}
@DecoupledIgnore(mode = Modes.ERROR_HANDLING)
@Test
public void testUnionAllTablesColumnTypeMismatchStringLong()
{
// "dim3" has a different type in foo and foo2 (string vs long), which requires a casting subquery, so this
// query cannot be planned.
assertQueryIsUnplannable(
"SELECT\n"
+ "dim3, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim3, dim2, m1 FROM foo2 UNION ALL SELECT dim3, dim2, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'en'\n"
+ "GROUP BY 1, 2",
"SQL requires union between inputs that are not simple table scans and involve a " +
"filter or aliasing. Or column types of tables being unioned are not of same type."
);
}
@DecoupledIgnore(mode = Modes.ERROR_HANDLING)
@Test
public void testUnionAllTablesWhenMappingIsRequired()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
+ "c, COUNT(*)\n"
+ "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT dim2 AS c, m1 FROM numfoo)\n"
+ "WHERE c = 'a' OR c = 'def'\n"
+ "GROUP BY 1",
"SQL requires union between two tables " +
"and column names queried for each table are different Left: [dim1], Right: [dim2]."
);
}
@DecoupledIgnore(mode = Modes.ERROR_HANDLING)
@Test
public void testUnionIsUnplannable()
{
// Cannot plan this UNION operation
assertQueryIsUnplannable(
"SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo",
"SQL requires 'UNION' but only 'UNION ALL' is supported."
);
}
@DecoupledIgnore(mode = Modes.ERROR_HANDLING)
@Test
public void testUnionAllTablesWhenCastAndMappingIsRequired()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
+ "c, COUNT(*)\n"
+ "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT cnt AS c, m1 FROM numfoo)\n"
+ "WHERE c = 'a' OR c = 'def'\n"
+ "GROUP BY 1",
"SQL requires union between inputs that are not simple table scans and involve " +
"a filter or aliasing. Or column types of tables being unioned are not of same type."
);
}
@DecoupledIgnore
@Test
public void testUnionAllSameTableTwice()
{
msqIncompatible();
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 2.0, 2L},
new Object[]{"1", "a", 8.0, 2L}
)
);
}
@DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES)
@Test
public void testUnionAllSameTableTwiceWithSameMapping()
{
msqIncompatible();
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 2.0, 2L},
new Object[]{"1", "a", 8.0, 2L}
)
);
}
@DecoupledIgnore(mode = Modes.ERROR_HANDLING)
@Test
public void testUnionAllSameTableTwiceWithDifferentMapping()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim2, dim1, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
"SQL requires union between two tables and column names queried for each table are different Left: [dim1, dim2, m1], Right: [dim2, dim1, m1]."
);
}
@DecoupledIgnore
@Test
public void testUnionAllSameTableThreeTimes()
{
msqIncompatible();
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 3.0, 3L},
new Object[]{"1", "a", 12.0, 3L}
)
);
}
@Test
public void testUnionAllThreeTablesColumnCountMismatch1()
{
try {
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("query execution should fail");
}
catch (DruidException e) {
MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])"));
}
}
@Test
public void testUnionAllThreeTablesColumnCountMismatch2()
{
try {
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("query execution should fail");
}
catch (DruidException e) {
MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])"));
}
}
@Test
public void testUnionAllThreeTablesColumnCountMismatch3()
{
try {
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * from numfoo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("query execution should fail");
}
catch (DruidException e) {
MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [70])"));
}
}
@DecoupledIgnore
@Test
public void testUnionAllSameTableThreeTimesWithSameMapping()
{
msqIncompatible();
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 3.0, 3L},
new Object[]{"1", "a", 12.0, 3L}
)
);
}
@Test
public void testPruneDeadAggregators()
{
@ -3669,6 +3199,107 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
/**
* This test case should be in {@link CalciteUnionQueryTest}. However, there's a bug in the test framework that
* doesn't reset framework once the merge buffers
*/
@DecoupledIgnore
@Test
public void testUnionAllSameTableThreeTimes()
{
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 3.0, 3L},
new Object[]{"1", "a", 12.0, 3L}
)
);
}
@DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES)
@Test
public void testExactCountDistinctUsingSubqueryOnUnionAllTables()
{
testQuery(
"SELECT\n"
+ " SUM(cnt),\n"
+ " COUNT(*)\n"
+ "FROM (\n"
+ " SELECT dim2, SUM(cnt) AS cnt\n"
+ " FROM (SELECT * FROM druid.foo UNION ALL SELECT * FROM druid.foo)\n"
+ " GROUP BY dim2\n"
+ ")",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(aggregators(
new LongSumAggregatorFactory("_a0", "a0"),
new CountAggregatorFactory("_a1")
))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
NullHandling.replaceWithDefault() ?
ImmutableList.of(
new Object[]{12L, 3L}
) :
ImmutableList.of(
new Object[]{12L, 4L}
)
);
}
@Test
public void testNullDoubleTopN()
{
@ -7330,60 +6961,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@DecoupledIgnore(mode = Modes.NOT_ENOUGH_RULES)
@Test
public void testExactCountDistinctUsingSubqueryOnUnionAllTables()
{
msqIncompatible();
testQuery(
"SELECT\n"
+ " SUM(cnt),\n"
+ " COUNT(*)\n"
+ "FROM (\n"
+ " SELECT dim2, SUM(cnt) AS cnt\n"
+ " FROM (SELECT * FROM druid.foo UNION ALL SELECT * FROM druid.foo)\n"
+ " GROUP BY dim2\n"
+ ")",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(aggregators(
new LongSumAggregatorFactory("_a0", "a0"),
new CountAggregatorFactory("_a1")
))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
NullHandling.replaceWithDefault() ?
ImmutableList.of(
new Object[]{12L, 3L}
) :
ImmutableList.of(
new Object[]{12L, 4L}
)
);
}
@Test
public void testAvgDailyCountDistinct()
{

View File

@ -0,0 +1,405 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
public class CalciteUnionQueryTest extends BaseCalciteQueryTest
{
@Test
public void testUnionAllDifferentTablesWithMapping()
{
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE3)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 2.0, 2L},
new Object[]{"1", "a", 8.0, 2L}
)
);
}
@Test
public void testJoinUnionAllDifferentTablesWithMapping()
{
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM numfoo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE3)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 2.0, 2L},
new Object[]{"1", "a", 8.0, 2L}
)
);
}
@Test
public void testUnionAllTablesColumnCountMismatch()
{
try {
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM foo UNION ALL SELECT * FROM numfoo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("query execution should fail");
}
catch (DruidException e) {
MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [42])"));
}
}
@Test
public void testUnionAllTablesColumnTypeMismatchFloatLong()
{
// "m1" has a different type in foo and foo2 (float vs long), but this query is OK anyway because they can both
// be implicitly cast to double.
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo2 UNION ALL SELECT dim1, dim2, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'en'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE2),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("en", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 1.0, 1L},
new Object[]{"1", "a", 4.0, 1L},
new Object[]{"druid", "en", 1.0, 1L}
)
);
}
@Test
public void testUnionAllTablesColumnTypeMismatchStringLong()
{
// "dim3" has a different type in foo and foo2 (string vs long), which requires a casting subquery, so this
// query cannot be planned.
assertQueryIsUnplannable(
"SELECT\n"
+ "dim3, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim3, dim2, m1 FROM foo2 UNION ALL SELECT dim3, dim2, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'en'\n"
+ "GROUP BY 1, 2",
"SQL requires union between inputs that are not simple table scans and involve a " +
"filter or aliasing. Or column types of tables being unioned are not of same type."
);
}
@Test
public void testUnionAllTablesWhenMappingIsRequired()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
+ "c, COUNT(*)\n"
+ "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT dim2 AS c, m1 FROM numfoo)\n"
+ "WHERE c = 'a' OR c = 'def'\n"
+ "GROUP BY 1",
"SQL requires union between two tables " +
"and column names queried for each table are different Left: [dim1], Right: [dim2]."
);
}
@Test
public void testUnionIsUnplannable()
{
// Cannot plan this UNION operation
assertQueryIsUnplannable(
"SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo",
"SQL requires 'UNION' but only 'UNION ALL' is supported."
);
}
@Test
public void testUnionAllTablesWhenCastAndMappingIsRequired()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
+ "c, COUNT(*)\n"
+ "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT cnt AS c, m1 FROM numfoo)\n"
+ "WHERE c = 'a' OR c = 'def'\n"
+ "GROUP BY 1",
"SQL requires union between inputs that are not simple table scans and involve " +
"a filter or aliasing. Or column types of tables being unioned are not of same type."
);
}
@Test
public void testUnionAllSameTableTwice()
{
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 2.0, 2L},
new Object[]{"1", "a", 8.0, 2L}
)
);
}
@Test
public void testUnionAllSameTableTwiceWithSameMapping()
{
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim1, dim2, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(CalciteTests.DATASOURCE1),
new TableDataSource(CalciteTests.DATASOURCE1)
)
)
)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimFilter(in("dim2", ImmutableList.of("def", "a"), null))
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
aggregators(
new DoubleSumAggregatorFactory("a0", "m1"),
new CountAggregatorFactory("a1")
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{"", "a", 2.0, 2L},
new Object[]{"1", "a", 8.0, 2L}
)
);
}
@Test
public void testUnionAllSameTableTwiceWithDifferentMapping()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim2, dim1, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
"SQL requires union between two tables and column names queried for each table are different Left: [dim1, dim2, m1], Right: [dim2, dim1, m1]."
);
}
@Test
public void testUnionAllThreeTablesColumnCountMismatch1()
{
try {
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("query execution should fail");
}
catch (DruidException e) {
MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])"));
}
}
@Test
public void testUnionAllThreeTablesColumnCountMismatch2()
{
try {
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM numfoo UNION ALL SELECT * FROM foo UNION ALL SELECT * from foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("query execution should fail");
}
catch (DruidException e) {
MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [45])"));
}
}
@Test
public void testUnionAllThreeTablesColumnCountMismatch3()
{
try {
testQuery(
"SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT * FROM foo UNION ALL SELECT * FROM foo UNION ALL SELECT * from numfoo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("query execution should fail");
}
catch (DruidException e) {
MatcherAssert.assertThat(e, invalidSqlIs("Column count mismatch in UNION ALL (line [3], column [70])"));
}
}
}

View File

@ -90,6 +90,7 @@ public class IngestionTestSqlEngine implements SqlEngine
case READ_EXTERNAL_DATA:
case SCAN_ORDER_BY_NON_TIME:
case ALLOW_BROADCAST_RIGHTY_JOIN:
case ALLOW_TOP_LEVEL_UNION_ALL:
return true;
default:
throw SqlEngines.generateUnrecognizedFeatureException(IngestionTestSqlEngine.class.getSimpleName(), feature);

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Injector;
@ -42,7 +43,7 @@ import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import org.apache.druid.sql.calcite.schema.NamedSchema;
@ -90,8 +91,6 @@ public class CalcitePlannerModuleTest extends CalciteTestBase
@Mock
private DruidSchemaCatalog rootSchema;
@Mock
private SqlEngine engine;
private Set<SqlAggregator> aggregators;
private Set<SqlOperatorConversion> operatorConversions;
@ -175,10 +174,11 @@ public class CalcitePlannerModuleTest extends CalciteTestBase
@Test
public void testExtensionCalciteRule()
{
ObjectMapper mapper = new DefaultObjectMapper();
PlannerToolbox toolbox = new PlannerToolbox(
injector.getInstance(DruidOperatorTable.class),
macroTable,
new DefaultObjectMapper(),
mapper,
injector.getInstance(PlannerConfig.class),
rootSchema,
joinableFactoryWrapper,
@ -189,11 +189,10 @@ public class CalcitePlannerModuleTest extends CalciteTestBase
AuthConfig.newBuilder().build()
);
PlannerContext context = PlannerContext.create(
toolbox,
"SELECT 1",
engine,
new NativeSqlEngine(queryLifecycleFactory, mapper),
Collections.emptyMap(),
null
);