Minor fixes and enhancements in UnionQuery handling (#17483)

* plan consistently with either UnionDataSource or UnionQuery for decoupled mode
* expose errors
* move decoupled related setting from PlannerConfig to QueryContexts
This commit is contained in:
Zoltan Haindrich 2024-11-28 10:05:12 +01:00 committed by GitHub
parent ddbb985369
commit c1ef38b052
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 394 additions and 180 deletions

View File

@ -62,13 +62,12 @@ public class CalciteUnionQueryMSQTest extends CalciteUnionQueryTest
*/
@Test
@Override
public void testUnionIsUnplannable()
public void testUnionDifferentColumnOrder()
{
assertQueryIsUnplannable(
"SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo",
"SQL requires union between two tables and column names queried for each table are different Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."
);
}
@Disabled("Ignored till MSQ can plan UNION ALL with any operand")

View File

@ -661,4 +661,13 @@ public class QueryContext
"context=" + context +
'}';
}
public boolean isDecoupledMode()
{
String value = getString(
QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED
);
return QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED.equals(value);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
@ -105,6 +106,10 @@ public class QueryContexts
// SQL statement resource specific keys
public static final String CTX_EXECUTION_MODE = "executionMode";
public static final String CTX_NATIVE_QUERY_SQL_PLANNING_MODE = "plannerStrategy";
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED = "COUPLED";
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED = "DECOUPLED";
// Defaults
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;

View File

@ -198,4 +198,9 @@ public class UnionDataSource implements DataSource
"dataSources=" + dataSources +
'}';
}
public static boolean isCompatibleDataSource(DataSource dataSource)
{
return (dataSource instanceof TableDataSource || dataSource instanceof InlineDataSource);
}
}

View File

@ -185,14 +185,13 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
@Override
public RowSignature getResultRowSignature(Finalization finalization)
{
final Finalization finalization1 = finalization;
final RowSignature.Builder builder = RowSignature.builder();
builder.addTimeColumn();
String timestampResultField = getTimestampResultField();
if (StringUtils.isNotEmpty(timestampResultField)) {
builder.add(timestampResultField, ColumnType.LONG);
}
builder.addAggregators(aggregatorSpecs, finalization1);
builder.addAggregators(aggregatorSpecs, finalization);
builder.addPostAggregators(postAggregatorSpecs);
return builder.build();
}

View File

@ -96,6 +96,7 @@ public class UnionQueryLogic implements QueryLogic
Sequence run = runner.run(queryPlus.withQuery(q), responseContext);
seqs.add(run);
}
return Sequences.concat(seqs);
}
}

View File

@ -372,6 +372,28 @@ public class QueryContextTest
assertTrue(QueryContext.of(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true)).isDebug());
}
@Test
public void testIsDecoupled()
{
assertFalse(QueryContext.empty().isDecoupledMode());
assertTrue(
QueryContext.of(
ImmutableMap.of(
QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED
)
).isDecoupledMode()
);
assertFalse(
QueryContext.of(
ImmutableMap.of(
QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
"garbage"
)
).isDecoupledMode()
);
}
// This test is a bit silly. It is retained because another test uses the
// LegacyContextQuery test.
@Test

View File

@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.NoopDataSource;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -62,6 +64,17 @@ public class UnionDataSourceTest
new UnionDataSource(Collections.emptyList());
}
@Test
public void test_isCompatible()
{
TableDataSource tableDataSource = new TableDataSource("foo");
InlineDataSource inlineDataSource = InlineDataSource.fromIterable(Collections.emptyList(), RowSignature.empty());
Assert.assertTrue(UnionDataSource.isCompatibleDataSource(tableDataSource));
Assert.assertTrue(UnionDataSource.isCompatibleDataSource(inlineDataSource));
Assert.assertFalse(UnionDataSource.isCompatibleDataSource(new NoopDataSource()));
}
@Test
public void test_getTableNames()
{
@ -131,7 +144,7 @@ public class UnionDataSourceTest
//noinspection unchecked
Assert.assertEquals(
new UnionDataSource(newDataSources),
unionDataSource.withChildren((List) newDataSources)
unionDataSource.withChildren(newDataSources)
);
}

View File

@ -165,6 +165,10 @@
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-metrics</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
@ -239,11 +243,6 @@
<version>1.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>

View File

@ -38,9 +38,7 @@ public class PlannerConfig
public static final String CTX_KEY_USE_NATIVE_QUERY_EXPLAIN = "useNativeQueryExplain";
public static final String CTX_KEY_FORCE_EXPRESSION_VIRTUAL_COLUMNS = "forceExpressionVirtualColumns";
public static final String CTX_MAX_NUMERIC_IN_FILTERS = "maxNumericInFilters";
public static final String CTX_NATIVE_QUERY_SQL_PLANNING_MODE = "plannerStrategy";
public static final int NUM_FILTER_NOT_USED = -1;
@JsonProperty
private int maxTopNLimit = 100_000;
@ -75,9 +73,7 @@ public class PlannerConfig
private int maxNumericInFilters = NUM_FILTER_NOT_USED;
@JsonProperty
private String nativeQuerySqlPlanningMode = NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED; // can be COUPLED or DECOUPLED
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED = "COUPLED";
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED = "DECOUPLED";
private String nativeQuerySqlPlanningMode = QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED; // can be COUPLED or DECOUPLED
private boolean serializeComplexValues = true;
@ -383,7 +379,7 @@ public class PlannerConfig
maxNumericInFilters);
nativeQuerySqlPlanningMode = QueryContexts.parseString(
queryContext,
CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
nativeQuerySqlPlanningMode
);
return this;

View File

@ -29,6 +29,7 @@ import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
@ -508,6 +509,9 @@ public class PlannerContext
*/
public void setPlanningError(String formatText, Object... arguments)
{
if (queryContext().isDecoupledMode()) {
throw InvalidSqlInput.exception(formatText, arguments);
}
planningError = StringUtils.nonStrictFormat(formatText, arguments);
}

View File

@ -39,6 +39,7 @@ import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthConfig;
@ -196,7 +197,7 @@ public class PlannerFactory extends PlannerToolbox
}
});
if (PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED
if (QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED
.equals(plannerConfig().getNativeQuerySqlPlanningMode())
) {
frameworkConfigBuilder.costFactory(new DruidVolcanoCost.Factory());

View File

@ -62,6 +62,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
@ -540,7 +541,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
if (plannerContext.getPlannerConfig()
.getNativeQuerySqlPlanningMode()
.equals(PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
.equals(QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
) {
RelNode newRoot = parameterized;
newRoot = planner.transform(

View File

@ -27,10 +27,13 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@ -43,6 +46,7 @@ import org.apache.druid.sql.calcite.rel.logical.DruidAggregate;
import org.apache.druid.sql.calcite.rel.logical.DruidJoin;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
import org.apache.druid.sql.calcite.rel.logical.DruidSort;
import org.apache.druid.sql.calcite.rel.logical.DruidUnion;
import java.util.ArrayList;
import java.util.List;
@ -202,31 +206,22 @@ public class DruidQueryGenerator
SourceDesc unwrapSourceDesc();
}
enum JoinSupportTweaks
private static class VertexTweaks
{
NONE,
LEFT,
RIGHT;
public final JoinPosition joinType;
public final boolean isParentUnion;
static JoinSupportTweaks analyze(DruidNodeStack stack)
public VertexTweaks(JoinPosition joinType, boolean isParentUnion)
{
if (stack.size() < 2) {
return NONE;
}
DruidLogicalNode possibleJoin = stack.parentNode();
if (!(possibleJoin instanceof DruidJoin)) {
return NONE;
}
if (stack.peekOperandIndex() == 0) {
return LEFT;
} else {
return RIGHT;
}
this.joinType = joinType;
this.isParentUnion = isParentUnion;
}
boolean finalizeSubQuery()
static VertexTweaks analyze(DruidNodeStack stack)
{
return this == NONE;
JoinPosition joinType = JoinPosition.analyze(stack);
boolean isParentUnion = stack.size() > 2 && stack.parentNode() instanceof DruidUnion;
return new VertexTweaks(joinType, isParentUnion);
}
boolean forceSubQuery(SourceDesc sourceDesc)
@ -234,12 +229,43 @@ public class DruidQueryGenerator
if (sourceDesc.dataSource.isGlobal()) {
return false;
}
return this == RIGHT;
return joinType == JoinPosition.RIGHT;
}
boolean filteredDatasourceAllowed()
{
return this == NONE;
return joinType == JoinPosition.NONE;
}
boolean finalizeSubQuery()
{
return joinType == JoinPosition.NONE;
}
boolean mayUnwrapWithRename()
{
return !isParentUnion;
}
enum JoinPosition
{
NONE, LEFT, RIGHT;
public static JoinPosition analyze(DruidNodeStack stack)
{
if (stack.size() < 2) {
return NONE;
}
DruidLogicalNode possibleJoin = stack.parentNode();
if (!(possibleJoin instanceof DruidJoin)) {
return NONE;
}
if (stack.peekOperandIndex() == 0) {
return LEFT;
} else {
return RIGHT;
}
}
}
}
@ -259,22 +285,22 @@ public class DruidQueryGenerator
Vertex createVertex(DruidNodeStack stack, PartialDruidQuery partialDruidQuery, List<Vertex> inputs)
{
JoinSupportTweaks jst = JoinSupportTweaks.analyze(stack);
return new PDQVertex(partialDruidQuery, inputs, jst);
VertexTweaks tweaks = VertexTweaks.analyze(stack);
return new PDQVertex(partialDruidQuery, inputs, tweaks);
}
public class PDQVertex implements Vertex
{
final PartialDruidQuery partialDruidQuery;
final List<Vertex> inputs;
final JoinSupportTweaks jst;
final VertexTweaks tweaks;
private SourceDesc source;
public PDQVertex(PartialDruidQuery partialDruidQuery, List<Vertex> inputs, JoinSupportTweaks jst)
public PDQVertex(PartialDruidQuery partialDruidQuery, List<Vertex> inputs, VertexTweaks tweaks)
{
this.partialDruidQuery = partialDruidQuery;
this.inputs = inputs;
this.jst = jst;
this.tweaks = tweaks;
}
@Override
@ -286,7 +312,7 @@ public class DruidQueryGenerator
source.rowSignature,
plannerContext,
rexBuilder,
!(topLevel) && jst.finalizeSubQuery()
!(topLevel) && tweaks.finalizeSubQuery()
);
}
@ -304,9 +330,10 @@ public class DruidQueryGenerator
private SourceDesc realGetSource()
{
List<SourceDesc> sourceDescs = new ArrayList<>();
boolean mayUnwrap = mayUnwrapInputs();
for (Vertex inputVertex : inputs) {
final SourceDesc desc;
if (inputVertex.canUnwrapSourceDesc()) {
if (mayUnwrap && inputVertex.canUnwrapSourceDesc()) {
desc = inputVertex.unwrapSourceDesc();
} else {
DruidQuery inputQuery = inputVertex.buildQuery(false);
@ -325,6 +352,20 @@ public class DruidQueryGenerator
throw DruidException.defensive("Unable to create SourceDesc for Operator [%s]", scan);
}
private boolean mayUnwrapInputs()
{
if (!(partialDruidQuery.getScan() instanceof DruidUnion)) {
return true;
}
boolean mayUnwrap = true;
for (Vertex vertex : inputs) {
if (!vertex.canUnwrapSourceDesc()) {
mayUnwrap = false;
}
}
return mayUnwrap;
}
/**
* Extends the the current partial query with the new parent if possible.
*/
@ -424,22 +465,65 @@ public class DruidQueryGenerator
@Override
public boolean canUnwrapSourceDesc()
{
if (jst.forceSubQuery(getSource())) {
if (tweaks.forceSubQuery(getSource())) {
return false;
}
if (partialDruidQuery.stage() == Stage.SCAN) {
return true;
}
if (jst.filteredDatasourceAllowed() && partialDruidQuery.stage() == PartialDruidQuery.Stage.WHERE_FILTER) {
if (tweaks.filteredDatasourceAllowed() && partialDruidQuery.stage() == PartialDruidQuery.Stage.WHERE_FILTER) {
return true;
}
if (partialDruidQuery.stage() == PartialDruidQuery.Stage.SELECT_PROJECT &&
(jst.filteredDatasourceAllowed() || partialDruidQuery.getWhereFilter() == null) &&
partialDruidQuery.getSelectProject().isMapping()) {
(tweaks.filteredDatasourceAllowed() || partialDruidQuery.getWhereFilter() == null) &&
mayDiscardSelectProject()) {
return true;
}
return false;
}
private boolean mayDiscardSelectProject()
{
if (!partialDruidQuery.getSelectProject().isMapping()) {
return false;
}
if (!tweaks.isParentUnion) {
return true;
}
SourceDesc src = getSource();
List<String> inputFieldNames = src.rowSignature.getColumnNames();
List<String> outputFieldNames = partialDruidQuery.getRowType().getFieldNames();
if (!isNameConsistentMapping(partialDruidQuery.getSelectProject(), inputFieldNames, outputFieldNames)) {
return false;
}
boolean isAssociative = UnionDataSource.isCompatibleDataSource(src.dataSource);
if (!isAssociative) {
if (!outputFieldNames.equals(inputFieldNames.subList(0, outputFieldNames.size()))) {
return false;
}
}
return true;
}
private boolean isNameConsistentMapping(
Project selectProject,
List<String> inputFieldNames,
List<String> outputFieldNames)
{
List<RexNode> projects = selectProject.getProjects();
for (int i = 0; i < projects.size(); i++) {
RexInputRef p = (RexInputRef) projects.get(i);
String inputName = inputFieldNames.get(p.getIndex());
String outputName = outputFieldNames.get(i);
if (!inputName.equals(outputName)) {
return false;
}
}
return true;
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.planner.querygen;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.druid.query.DataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@ -55,6 +56,12 @@ public interface SourceDescProducer
this.rowSignature = rowSignature;
this.virtualColumnRegistry = virtualColumnRegistry;
}
@Override
public String toString()
{
return ToStringBuilder.reflectionToString(this);
}
}
SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources);

View File

@ -1654,21 +1654,24 @@ public class DruidQuery
: Order.ASCENDING
)
).collect(Collectors.toList());
} else {
orderByColumns = Collections.emptyList();
}
if (!plannerContext.featureAvailable(EngineFeature.SCAN_ORDER_BY_NON_TIME) && !orderByColumns.isEmpty()) {
if (orderByColumns.size() > 1
|| orderByColumns.stream()
.anyMatch(orderBy -> !orderBy.getColumnName().equals(ColumnHolder.TIME_COLUMN_NAME))) {
// We cannot handle this ordering, but we encounter this ordering as part of the exploration of the volcano
// planner, which means that the query that we are looking right now might only be doing this as one of the
// potential branches of exploration rather than being a semantic requirement of the query itself. So, it is
// not safe to send an error message telling the end-user exactly what is happening, instead we need to set the
// planning error and hope.
setPlanningErrorOrderByNonTimeIsUnsupported();
|| orderByColumns.stream().anyMatch(orderBy -> !orderBy.getColumnName().equals(ColumnHolder.TIME_COLUMN_NAME))) {
if (!plannerContext.queryContext().isDecoupledMode()) {
// We cannot handle this ordering, but we encounter this ordering as part of the exploration of the volcano
// planner, which means that the query that we are looking right now might only be doing this as one of the
// potential branches of exploration rather than being a semantic requirement of the query itself. So, it is
// not safe to send an error message telling the end-user exactly what is happening, instead we need to set the
// planning error and hope.
setPlanningErrorOrderByNonTimeIsUnsupported();
}
return null;
}
}

View File

@ -29,16 +29,16 @@ import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.union.UnionQuery;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.ArrayList;
import java.util.List;
@ -53,6 +53,9 @@ public class DruidUnion extends Union implements DruidLogicalNode, SourceDescPro
boolean all)
{
super(cluster, traits, hints, inputs, all);
if (!all) {
throw InvalidSqlInput.exception("SQL requires 'UNION' but only 'UNION ALL' is supported.");
}
}
@Override
@ -70,79 +73,47 @@ public class DruidUnion extends Union implements DruidLogicalNode, SourceDescPro
@Override
public SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources)
{
if (mayUseUnionDataSource(sources)) {
List<DataSource> dataSources = new ArrayList<>();
RowSignature signature = null;
for (SourceDesc sourceDesc : sources) {
checkDataSourceSupported(sourceDesc.dataSource);
dataSources.add(sourceDesc.dataSource);
if (signature == null) {
signature = sourceDesc.rowSignature;
} else {
if (!signature.equals(sourceDesc.rowSignature)) {
throw DruidException.defensive(
"Row signature mismatch in Union inputs [%s] and [%s]",
signature,
sourceDesc.rowSignature
);
}
}
}
return new SourceDesc(new UnionDataSource(dataSources), signature);
}
if (mayUseUnionQuery(sources)) {
RowSignature signature = null;
List<Query<?>> queries = new ArrayList<>();
for (SourceDesc sourceDesc : sources) {
QueryDataSource qds = (QueryDataSource) sourceDesc.dataSource;
queries.add(qds.getQuery());
if (signature == null) {
signature = sourceDesc.rowSignature;
} else {
if (!signature.equals(sourceDesc.rowSignature)) {
throw DruidException.defensive(
"Row signature mismatch in Union inputs [%s] and [%s]",
signature,
sourceDesc.rowSignature
);
}
}
}
return new SourceDesc(new QueryDataSource(new UnionQuery(queries)), signature);
}
RowSignature signature = RowSignatures.fromRelDataType(
sources.get(0).rowSignature.getColumnNames(),
getRowType()
);
UnionDataSource unionDataSource = buildUnionDataSource(sources);
if (unionDataSource != null) {
return new SourceDesc(unionDataSource, signature);
throw DruidException.defensive("Union with input [%s] is not supported. This should not happen.", sources);
}
// all other cases are handled via UnionQuery
UnionQuery unionQuery = makeUnionQuery(sources);
return new SourceDesc(new QueryDataSource(unionQuery), signature);
}
private boolean mayUseUnionQuery(List<SourceDesc> sources)
private UnionDataSource buildUnionDataSource(List<SourceDesc> sources)
{
List<DataSource> dataSources = new ArrayList<>();
for (SourceDesc sourceDesc : sources) {
DataSource dataSource = sourceDesc.dataSource;
if (!UnionDataSource.isCompatibleDataSource(dataSource)) {
return null;
}
dataSources.add(dataSource);
}
return new UnionDataSource(dataSources);
}
private UnionQuery makeUnionQuery(List<SourceDesc> sources)
{
List<Query<?>> queries = new ArrayList<>();
for (SourceDesc sourceDesc : sources) {
DataSource dataSource = sourceDesc.dataSource;
if (dataSource instanceof QueryDataSource) {
continue;
queries.add(((QueryDataSource) dataSource).getQuery());
} else {
throw DruidException.defensive(
"Expected that all inputs are QueryDataSource-s! Encountered something else [%s].",
dataSource
);
}
return false;
}
return true;
}
private boolean mayUseUnionDataSource(List<SourceDesc> sources)
{
for (SourceDesc sourceDesc : sources) {
DataSource dataSource = sourceDesc.dataSource;
if (dataSource instanceof TableDataSource || dataSource instanceof InlineDataSource) {
continue;
}
return false;
}
return true;
}
private void checkDataSourceSupported(DataSource dataSource)
{
if (dataSource instanceof TableDataSource || dataSource instanceof InlineDataSource) {
return;
}
throw DruidException.defensive("Only Table and Values are supported as inputs for Union [%s]", dataSource);
return new UnionQuery(queries);
}
}

View File

@ -75,7 +75,6 @@ import org.apache.druid.segment.virtual.NestedFieldVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.CalciteNestedDataQueryTest.NestedComponentSupplier;
import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
@ -7373,13 +7372,12 @@ public class CalciteNestedDataQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_CANNOT_TRANSLATE_COUNT_DISTINCT)
@Test
public void testApproxCountDistinctOnUnsupportedComplexColumn()
{
assertQueryIsUnplannable(
"SELECT COUNT(DISTINCT nester) FROM druid.nested",
"Query could not be planned. A possible reason is [Using APPROX_COUNT_DISTINCT() or enabling "
"Using APPROX_COUNT_DISTINCT() or enabling "
+ "approximation with COUNT(DISTINCT) is not supported for column type [COMPLEX<json>]. "
+ "You can disable approximation by setting [useApproximateCountDistinct: false] in the query context."
);

View File

@ -6414,7 +6414,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnplannableScanOrderByNonTime()
{
@ -6531,7 +6530,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnplannableExactCountDistinctOnSketch()
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.TableDataSource;
@ -28,7 +29,6 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Assert;
@ -142,7 +142,6 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
}
}
@NotYetSupported(Modes.UNION_MORE_STRICT_ROWTYPE_CHECK)
@Test
public void testUnionAllTablesColumnTypeMismatchFloatLong()
{
@ -189,66 +188,157 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionFlipFlop()
{
String sql = "SELECT\n"
+ "dim1,dim2,count(1) \n"
+ "FROM (SELECT dim1 as dim1, dim2 as dim2 FROM foo UNION ALL SELECT dim2 as dim1, dim1 as dim2 FROM foo)\n"
+ "WHERE dim1 = 'def' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2";
if (testBuilder().isDecoupledMode()) {
cannotVectorize();
testBuilder()
.sql(sql)
.expectedResults(
ImmutableList.of(
new Object[] {"abc", "def", 1L},
new Object[] {"def", "abc", 1L}
)
)
.run();
} else {
assertQueryIsUnplannable(
sql,
"names queried for each table are different"
);
}
}
@Test
public void testUnionAllTablesColumnTypeMismatchStringLong()
{
// "dim3" has a different type in foo and foo2 (string vs long), which requires a casting subquery, so this
// query cannot be planned.
assertQueryIsUnplannable(
"SELECT\n"
String sql = "SELECT\n"
+ "dim3, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim3, dim2, m1 FROM foo2 UNION ALL SELECT dim3, dim2, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'en'\n"
+ "GROUP BY 1, 2",
"SQL requires union between inputs that are not simple table scans and involve a " +
"filter or aliasing. Or column types of tables being unioned are not of same type."
);
+ "GROUP BY 1, 2";
if (testBuilder().isDecoupledMode()) {
cannotVectorize();
testBuilder()
.sql(sql)
.expectedResults(
ImmutableList.of(
new Object[] {"", "a", 4.0D, 1L},
new Object[] {"11", "en", 1.0D, 1L},
new Object[] {"a", "a", 1.0D, 1L},
new Object[] {"b", "a", 1.0D, 1L}
)
)
.run();
} else {
// "dim3" has a different type in foo and foo2 (string vs long), which requires a casting subquery, so this
// query cannot be planned.
assertQueryIsUnplannable(
sql,
"SQL requires union between inputs that are not simple table scans and involve a " +
"filter or aliasing. Or column types of tables being unioned are not of same type."
);
}
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionAllTablesWhenMappingIsRequired()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
String sql = "SELECT\n"
+ "c, COUNT(*)\n"
+ "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT dim2 AS c, m1 FROM numfoo)\n"
+ "WHERE c = 'a' OR c = 'def'\n"
+ "GROUP BY 1",
"SQL requires union between two tables " +
"and column names queried for each table are different Left: [dim1], Right: [dim2]."
);
+ "GROUP BY 1";
if (testBuilder().isDecoupledMode()) {
cannotVectorize();
testBuilder()
.sql(sql)
.expectedResults(
ImmutableList.of(
new Object[] {"a", 2L},
new Object[] {"def", 1L}
)
)
.run();
} else {
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
sql,
"SQL requires union between two tables " +
"and column names queried for each table are different Left: [dim1], Right: [dim2]."
);
}
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionIsUnplannable()
public void testUnionDifferentColumnOrder()
{
// Cannot plan this UNION operation
assertQueryIsUnplannable(
"SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo",
"SQL requires 'UNION' but only 'UNION ALL' is supported."
);
String sql = "SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo";
if (testBuilder().isDecoupledMode()) {
// UnionToDistinctRule
testBuilder()
.sql(sql)
.expectedResults(ImmutableList.of(new Object[] {"def", 2L}))
.expectedResults(
ResultMatchMode.RELAX_NULLS,
ImmutableList.of(
new Object[] {null, "10.1", 2.0F},
// these 2 results invert order because in compatible mode `null` becomes "" and thus they change order
NullHandling.sqlCompatible() ? new Object[] {null, "abc", 6.0F} : new Object[] {"", "2", 3.0F},
NullHandling.sqlCompatible() ? new Object[] {"", "2", 3.0F} : new Object[] {null, "abc", 6.0F},
new Object[] {"a", "", 1.0F},
new Object[] {"a", "1", 4.0F},
new Object[] {"abc", "def", 5.0F},
new Object[] {"en", "druid", 1.0F},
new Object[] {"he", "\u05D3\u05E8\u05D5\u05D0\u05D9\u05D3", 1.0F},
new Object[] {"ru", "\u0434\u0440\u0443\u0438\u0434", 1.0F}
)
)
.run();
} else {
// Cannot plan this UNION operation
assertQueryIsUnplannable(
sql,
"SQL requires 'UNION' but only 'UNION ALL' is supported."
);
}
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionAllTablesWhenCastAndMappingIsRequired()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
String sql = "SELECT\n"
+ "c, COUNT(*)\n"
+ "FROM (SELECT dim1 AS c, m1 FROM foo UNION ALL SELECT cnt AS c, m1 FROM numfoo)\n"
+ "WHERE c = 'a' OR c = 'def'\n"
+ "GROUP BY 1",
"SQL requires union between inputs that are not simple table scans and involve " +
"a filter or aliasing. Or column types of tables being unioned are not of same type."
);
+ "WHERE c = '1' OR c = 'def'\n"
+ "GROUP BY 1";
if (testBuilder().isDecoupledMode()) {
cannotVectorize();
testBuilder()
.sql(sql)
.expectedResults(
ImmutableList.of(
new Object[]{"1", 7L},
new Object[]{"def", 1L}
)
)
.run();
} else {
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
sql,
"SQL requires union between inputs that are not simple table scans and involve " +
"a filter or aliasing. Or column types of tables being unioned are not of same type."
);
}
}
@Test
@ -335,19 +425,31 @@ public class CalciteUnionQueryTest extends BaseCalciteQueryTest
);
}
@NotYetSupported(Modes.ERROR_HANDLING)
@Test
public void testUnionAllSameTableTwiceWithDifferentMapping()
{
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
"SELECT\n"
String sql = "SELECT\n"
+ "dim1, dim2, SUM(m1), COUNT(*)\n"
+ "FROM (SELECT dim1, dim2, m1 FROM foo UNION ALL SELECT dim2, dim1, m1 FROM foo)\n"
+ "WHERE dim2 = 'a' OR dim2 = 'def'\n"
+ "GROUP BY 1, 2",
"SQL requires union between two tables and column names queried for each table are different Left: [dim1, dim2, m1], Right: [dim2, dim1, m1]."
);
+ "GROUP BY 1, 2";
if (testBuilder().isDecoupledMode()) {
testBuilder()
.sql(sql)
.expectedResults(
ImmutableList.of(
new Object[] {"", "a", 2.0D, 2L},
new Object[] {"1", "a", 8.0D, 2L}
)
)
.run();
} else {
// Cannot plan this UNION ALL operation, because the column swap would require generating a subquery.
assertQueryIsUnplannable(
sql,
"SQL requires union between two tables and column names queried for each table are different Left: [dim1, dim2, m1], Right: [dim2, dim1, m1]."
);
}
}
@Test

View File

@ -56,7 +56,7 @@ public class DecoupledExtension implements BeforeEachCallback
private static final ImmutableMap<String, Object> CONTEXT_OVERRIDES = ImmutableMap.<String, Object>builder()
.putAll(BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT)
.put(PlannerConfig.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
.put(QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
.put(QueryContexts.ENABLE_DEBUG, true)
.build();

View File

@ -78,7 +78,6 @@ public @interface NotYetSupported
{
// @formatter:off
DISTINCT_AGGREGATE_NOT_SUPPORTED(DruidException.class, "DISTINCT is not supported"),
ERROR_HANDLING(AssertionError.class, "targetPersona: is <[A-Z]+> and category: is <[A-Z_]+> and errorCode: is"),
EXPRESSION_NOT_GROUPED(DruidException.class, "Expression '[a-z]+' is not being grouped"),
NULLS_FIRST_LAST(DruidException.class, "NULLS (FIRST|LAST)"),
BIGINT_TO_DATE(DruidException.class, "BIGINT to type (DATE|TIME)"),
@ -89,11 +88,9 @@ public @interface NotYetSupported
RESULT_MISMATCH(AssertionError.class, "(assertResulEquals|AssertionError: column content mismatch)"),
LONG_CASTING(AssertionError.class, "expected: java.lang.Long"),
UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering with NULLS (LAST|FIRST)"),
UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs"),
SORT_REMOVE_TROUBLE(DruidException.class, "Calcite assertion violated.*Sort\\.<init>"),
SORT_REMOVE_CONSTANT_KEYS_CONFLICT(DruidException.class, "not enough rules"),
REQUIRE_TIME_CONDITION(CannotBuildQueryException.class, "requireTimeCondition is enabled"),
ERROR_CANNOT_TRANSLATE_COUNT_DISTINCT(AssertionError.class, "Cannot translate aggregator.COUNT.DISTINCT"),
UNNEST_INLINED(Exception.class, "Missing conversion is Uncollect"),
UNNEST_RESULT_MISMATCH(AssertionError.class, "(Result count mismatch|column content mismatch)");
// @formatter:on

View File

@ -311,8 +311,8 @@ public class QueryTestBuilder
public boolean isDecoupledMode()
{
String mode = (String) queryContext.getOrDefault(PlannerConfig.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, "");
return PlannerConfig.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED.equalsIgnoreCase(mode);
String mode = (String) queryContext.getOrDefault(QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, "");
return QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED.equalsIgnoreCase(mode);
}
}