MSQ: Plan without implicit sorting. (#16073)

* MSQ: Plan without implicit sorting.

This patch adds an EngineFeature "GROUPBY_IMPLICITLY_SORTS" and sets
it true for native, false for MSQ. It's useful for two reasons:

1) In the future we'll likely want MSQ to hash-partition for GROUP BY
   instead of using a global sort, which would mean MSQ would not
   implicitly ORDER BY when there is a GROUP BY.

2) When doing REPLACE with MSQ, CLUSTERED BY is transformed to ORDER BY.
   We should retain that ORDER BY, as it may be a subset of the GROUP BY,
   and it is important to remember which fields the user wanted to include in
   range shard specs.

* Fix tests.

* Fix tests for real.

* Fix test.
This commit is contained in:
Gian Merlino 2024-03-13 08:27:39 -07:00 committed by GitHub
parent 818cc9eedf
commit 910124d4de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 179 additions and 49 deletions

View File

@ -107,7 +107,7 @@ public class MSQTaskSqlEngine implements SqlEngine
}
@Override
public boolean featureAvailable(EngineFeature feature, PlannerContext plannerContext)
public boolean featureAvailable(EngineFeature feature)
{
switch (feature) {
case ALLOW_BINDABLE_PLAN:
@ -118,6 +118,7 @@ public class MSQTaskSqlEngine implements SqlEngine
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case ALLOW_TOP_LEVEL_UNION_ALL:
case GROUPBY_IMPLICITLY_SORTS:
return false;
case UNNEST:
case CAN_SELECT:

View File

@ -676,7 +676,7 @@ public class MSQReplaceTest extends MSQTestBase
+ "PARTITIONED BY MONTH")
.setQueryContext(context)
.setExpectedValidationErrorMatcher(invalidSqlContains(
"INSERT and REPLACE queries cannot have a LIMIT unless PARTITIONED BY is \"ALL\""
"INSERT and REPLACE queries cannot have a LIMIT unless PARTITIONED BY is \"ALL\""
))
.verifyPlanningErrors();
}
@ -692,7 +692,7 @@ public class MSQReplaceTest extends MSQTestBase
+ "OFFSET 10"
+ "PARTITIONED BY ALL TIME")
.setExpectedValidationErrorMatcher(invalidSqlContains(
"INSERT and REPLACE queries cannot have an OFFSET"
"INSERT and REPLACE queries cannot have an OFFSET"
))
.setQueryContext(context)
.verifyPlanningErrors();
@ -875,6 +875,44 @@ public class MSQReplaceTest extends MSQTestBase
}
@Test
public void testReplaceOnFoo1RangeClusteredBySubset()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("m1", ColumnType.FLOAT)
.add("cnt", ColumnType.LONG)
.build();
testIngestQuery().setSql(
"REPLACE INTO foo1\n"
+ "OVERWRITE ALL\n"
+ "SELECT dim1, m1, COUNT(*) AS cnt\n"
+ "FROM foo\n"
+ "GROUP BY dim1, m1\n"
+ "PARTITIONED BY ALL\n"
+ "CLUSTERED BY dim1"
)
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(
ImmutableList.of(
new Object[]{0L, NullHandling.sqlCompatible() ? "" : null, 1.0f, 1L},
new Object[]{0L, "1", 4.0f, 1L},
new Object[]{0L, "10.1", 2.0f, 1L},
new Object[]{0L, "2", 3.0f, 1L},
new Object[]{0L, "abc", 6.0f, 1L},
new Object[]{0L, "def", 5.0f, 1L}
)
)
.verifyResults();
}
@Test
public void testReplaceSegmentsInsertIntoNewTable()
{
@ -916,7 +954,7 @@ public class MSQReplaceTest extends MSQTestBase
+ "FROM foo "
+ "PARTITIONED BY ALL TIME "
+ "CLUSTERED BY m2, m1 DESC"
)
)
.setExpectedValidationErrorMatcher(
invalidSqlIs("Invalid CLUSTERED BY clause [`m1` DESC]: cannot sort in descending order.")
)

View File

@ -122,10 +122,9 @@ public abstract class CalciteSelectJoinQueryMSQTest
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper)
{
@Override
public boolean featureAvailable(EngineFeature feature, PlannerContext plannerContext)
public boolean featureAvailable(EngineFeature feature)
{
plannerContext.queryContextMap().put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, joinAlgorithm.toString());
return super.featureAvailable(feature, plannerContext);
return super.featureAvailable(feature);
}
@Override

View File

@ -574,9 +574,8 @@ public class PlannerContext
/**
* Checks if the current {@link SqlEngine} supports a particular feature.
*
* When executing a specific query, use this method instead of
* {@link SqlEngine#featureAvailable(EngineFeature, PlannerContext)}, because it also verifies feature flags such as
* {@link #CTX_ENABLE_WINDOW_FNS}.
* When executing a specific query, use this method instead of {@link SqlEngine#featureAvailable(EngineFeature)},
* because it also verifies feature flags such as {@link #CTX_ENABLE_WINDOW_FNS}.
*/
public boolean featureAvailable(final EngineFeature feature)
{
@ -585,7 +584,11 @@ public class PlannerContext
// Short-circuit: feature requires context flag.
return false;
}
return engine.featureAvailable(feature, this);
if (feature == EngineFeature.TIME_BOUNDARY_QUERY && !queryContext().isTimeBoundaryPlanningEnabled()) {
// Short-circuit: feature requires context flag.
return false;
}
return engine.featureAvailable(feature);
}
public QueryMaker getQueryMaker()

View File

@ -92,7 +92,7 @@ public class DruidQueryGenerator
return newVertex.get();
}
inputVertex = vertexFactory.createVertex(
PartialDruidQuery.createOuterQuery(((PDQVertex) inputVertex).partialDruidQuery),
PartialDruidQuery.createOuterQuery(((PDQVertex) inputVertex).partialDruidQuery, vertexFactory.plannerContext),
ImmutableList.of(inputVertex)
);
newVertex = inputVertex.extendWith(stack);

View File

@ -137,7 +137,7 @@ public class DruidCorrelateUnnestRel extends DruidRel<DruidCorrelateUnnestRel>
{
return new DruidCorrelateUnnestRel(
getCluster(),
newQueryBuilder.getTraitSet(getConvention()),
newQueryBuilder.getTraitSet(getConvention(), getPlannerContext()),
correlateRel,
newQueryBuilder,
getPlannerContext()

View File

@ -136,7 +136,7 @@ public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
{
return new DruidJoinQueryRel(
getCluster(),
newQueryBuilder.getTraitSet(getConvention()),
newQueryBuilder.getTraitSet(getConvention(), getPlannerContext()),
joinRel,
leftFilter,
newQueryBuilder,

View File

@ -83,7 +83,7 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
{
return new DruidOuterQueryRel(
sourceRel.getCluster(),
partialQuery.getTraitSet(sourceRel.getConvention()),
partialQuery.getTraitSet(sourceRel.getConvention(), sourceRel.getPlannerContext()),
sourceRel,
partialQuery,
sourceRel.getPlannerContext()
@ -101,7 +101,7 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
{
return new DruidOuterQueryRel(
getCluster(),
newQueryBuilder.getTraitSet(getConvention()),
newQueryBuilder.getTraitSet(getConvention(), getPlannerContext()),
sourceRel,
newQueryBuilder,
getPlannerContext()

View File

@ -171,7 +171,7 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
{
return new DruidQueryRel(
getCluster(),
newQueryBuilder.getTraitSet(getConvention()),
newQueryBuilder.getTraitSet(getConvention(), getPlannerContext()),
table,
druidTable,
getPlannerContext(),

View File

@ -106,7 +106,7 @@ public class DruidUnionDataSourceRel extends DruidRel<DruidUnionDataSourceRel>
{
return new DruidUnionDataSourceRel(
getCluster(),
newQueryBuilder.getTraitSet(getConvention()),
newQueryBuilder.getTraitSet(getConvention(), getPlannerContext()),
unionRel,
unionColumnNames,
newQueryBuilder,

View File

@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.DataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -172,12 +173,12 @@ public class PartialDruidQuery
return new PartialDruidQuery(builderSupplier, inputRel, null, null, null, null, null, null, null, null, null);
}
public static PartialDruidQuery createOuterQuery(final PartialDruidQuery inputQuery)
public static PartialDruidQuery createOuterQuery(final PartialDruidQuery inputQuery, PlannerContext plannerContext)
{
final RelNode inputRel = inputQuery.leafRel();
return create(
inputRel.copy(
inputQuery.getTraitSet(inputRel.getConvention()),
inputQuery.getTraitSet(inputRel.getConvention(), plannerContext),
inputRel.getInputs()
)
);
@ -457,7 +458,7 @@ public class PartialDruidQuery
*
* @param convention convention to include in the returned array
*/
public RelTraitSet getTraitSet(final Convention convention)
public RelTraitSet getTraitSet(final Convention convention, final PlannerContext plannerContext)
{
final RelTraitSet leafRelTraits = leafRel().getTraitSet();
@ -467,7 +468,9 @@ public class PartialDruidQuery
case AGGREGATE:
case AGGREGATE_PROJECT:
final RelCollation collation = leafRelTraits.getTrait(RelCollationTraitDef.INSTANCE);
if ((collation == null || collation.getFieldCollations().isEmpty()) && aggregate.getGroupSets().size() == 1) {
if (plannerContext.featureAvailable(EngineFeature.GROUPBY_IMPLICITLY_SORTS)
&& (collation == null || collation.getFieldCollations().isEmpty())
&& aggregate.getGroupSets().size() == 1) {
// Druid sorts by grouping keys when grouping. Add the collation.
// Note: [aggregate.getGroupSets().size() == 1] above means that collation isn't added for GROUPING SETS.
final List<RelFieldCollation> sortFields = new ArrayList<>();

View File

@ -200,7 +200,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery(), druidRel.getPlannerContext())
.withAggregate(aggregate)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -223,7 +223,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery(), druidRel.getPlannerContext())
.withWhereFilter(filter)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -246,7 +246,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery(), druidRel.getPlannerContext())
.withSelectProject(filter)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -269,7 +269,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery(), druidRel.getPlannerContext())
.withSort(sort)
);
if (outerQueryRel.isValidDruidQuery()) {
@ -292,7 +292,7 @@ public class DruidRules
final DruidOuterQueryRel outerQueryRel = DruidOuterQueryRel.create(
druidRel,
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery())
PartialDruidQuery.createOuterQuery(druidRel.getPartialDruidQuery(), druidRel.getPlannerContext())
.withWindow(window)
);
if (outerQueryRel.isValidDruidQuery()) {

View File

@ -20,10 +20,9 @@
package org.apache.druid.sql.calcite.run;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.planner.PlannerContext;
/**
* Arguments to {@link SqlEngine#featureAvailable(EngineFeature, PlannerContext)}.
* Arguments to {@link SqlEngine#featureAvailable(EngineFeature)}.
*/
public enum EngineFeature
{
@ -125,5 +124,16 @@ public enum EngineFeature
/**
* Queries can write to an external datasource using {@link org.apache.druid.sql.destination.ExportDestination}
*/
WRITE_EXTERNAL_DATA;
WRITE_EXTERNAL_DATA,
/**
* Whether GROUP BY implies an ORDER BY on the same fields.
* There are two reasons we need this:
* (1) We may want MSQ to hash-partition for GROUP BY instead of using a global sort, which would mean MSQ would not
* implicitly ORDER BY when there is a GROUP BY.
* (2) When doing REPLACE with MSQ, CLUSTERED BY is transformed to ORDER BY. We need to retain that ORDER BY, as it
* may be a subset of the GROUP BY, and it is important to remember which fields the user wanted to include in
* {@link org.apache.druid.timeline.partition.DimensionRangeShardSpec}.
*/
GROUPBY_IMPLICITLY_SORTS
}

View File

@ -95,7 +95,7 @@ public class NativeSqlEngine implements SqlEngine
}
@Override
public boolean featureAvailable(EngineFeature feature, PlannerContext plannerContext)
public boolean featureAvailable(EngineFeature feature)
{
switch (feature) {
case CAN_SELECT:
@ -107,9 +107,9 @@ public class NativeSqlEngine implements SqlEngine
case UNNEST:
case ALLOW_BROADCAST_RIGHTY_JOIN:
case ALLOW_TOP_LEVEL_UNION_ALL:
return true;
case TIME_BOUNDARY_QUERY:
return plannerContext.queryContext().isTimeBoundaryPlanningEnabled();
case GROUPBY_IMPLICITLY_SORTS:
return true;
case CAN_INSERT:
case CAN_REPLACE:
case READ_EXTERNAL_DATA:

View File

@ -39,9 +39,11 @@ public interface SqlEngine
String name();
/**
* Whether a feature applies to this engine or not.
* Whether a feature applies to this engine or not. Most callers should use
* {@link PlannerContext#featureAvailable(EngineFeature)} instead, which also checks feature flags in context
* parameters.
*/
boolean featureAvailable(EngineFeature feature, PlannerContext plannerContext);
boolean featureAvailable(EngineFeature feature);
/**
* Validates a provided query context. Returns quietly if the context is OK; throws {@link ValidationException}

View File

@ -51,7 +51,7 @@ public class ViewSqlEngine implements SqlEngine
}
@Override
public boolean featureAvailable(EngineFeature feature, PlannerContext plannerContext)
public boolean featureAvailable(EngineFeature feature)
{
switch (feature) {
// Use most permissive set of SELECT features, since our goal is to get the row type of the view.
@ -77,6 +77,7 @@ public class ViewSqlEngine implements SqlEngine
case TIMESERIES_QUERY:
case TIME_BOUNDARY_QUERY:
case SCAN_NEEDS_SIGNATURE:
case GROUPBY_IMPLICITLY_SORTS:
return false;
default:

View File

@ -94,7 +94,6 @@ import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.ExpressionTestHelper;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@ -130,7 +129,6 @@ import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -754,8 +752,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public void assumeFeatureAvailable(EngineFeature feature)
{
boolean featureAvailable = queryFramework().engine()
.featureAvailable(feature, ExpressionTestHelper.PLANNER_CONTEXT);
boolean featureAvailable = queryFramework().engine().featureAvailable(feature);
assumeTrue(StringUtils.format("test disabled; feature [%s] is not available!", feature), featureAvailable);
}

View File

@ -2252,7 +2252,20 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("cnt", "d0", ColumnType.LONG)))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setLimitSpec(NoopLimitSpec.instance())
.setLimitSpec(
queryFramework().engine().featureAvailable(EngineFeature.GROUPBY_IMPLICITLY_SORTS)
? NoopLimitSpec.instance()
: new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -9276,7 +9289,20 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.LONG)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(NoopLimitSpec.instance())
.setLimitSpec(
queryFramework().engine().featureAvailable(EngineFeature.GROUPBY_IMPLICITLY_SORTS)
? NoopLimitSpec.instance()
: new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -9313,7 +9339,20 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.LONG)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(NoopLimitSpec.instance())
.setLimitSpec(
queryFramework().engine().featureAvailable(EngineFeature.GROUPBY_IMPLICITLY_SORTS)
? NoopLimitSpec.instance()
: new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -10207,7 +10246,20 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.LONG)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(NoopLimitSpec.instance())
.setLimitSpec(
queryFramework().engine().featureAvailable(EngineFeature.GROUPBY_IMPLICITLY_SORTS)
? NoopLimitSpec.instance()
: new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -10245,7 +10297,20 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING)))
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(NoopLimitSpec.instance())
.setLimitSpec(
queryFramework().engine().featureAvailable(EngineFeature.GROUPBY_IMPLICITLY_SORTS)
? NoopLimitSpec.instance()
: new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"d0",
Direction.ASCENDING,
StringComparators.LEXICOGRAPHIC
)
),
Integer.MAX_VALUE
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -10479,7 +10544,17 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
)
.setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.setLimitSpec(NoopLimitSpec.instance())
.setLimitSpec(
queryFramework().engine().featureAvailable(EngineFeature.GROUPBY_IMPLICITLY_SORTS)
? NoopLimitSpec.instance()
: new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec("d0", Direction.ASCENDING, StringComparators.LEXICOGRAPHIC),
new OrderByColumnSpec("d1", Direction.ASCENDING, StringComparators.NUMERIC)
),
Integer.MAX_VALUE
)
)
.setContext(withTimestampResultContext(QUERY_CONTEXT_DEFAULT, "d1", 1, Granularities.MONTH))
.build()
),

View File

@ -138,9 +138,9 @@ public class CalciteScanSignatureTest extends BaseCalciteQueryTest
}
@Override
public boolean featureAvailable(EngineFeature feature, PlannerContext plannerContext)
public boolean featureAvailable(EngineFeature feature)
{
return feature == EngineFeature.SCAN_NEEDS_SIGNATURE || parent.featureAvailable(feature, plannerContext);
return feature == EngineFeature.SCAN_NEEDS_SIGNATURE || parent.featureAvailable(feature);
}
@Override

View File

@ -75,7 +75,7 @@ public class IngestionTestSqlEngine implements SqlEngine
}
@Override
public boolean featureAvailable(final EngineFeature feature, final PlannerContext plannerContext)
public boolean featureAvailable(final EngineFeature feature)
{
switch (feature) {
case CAN_SELECT:
@ -85,6 +85,7 @@ public class IngestionTestSqlEngine implements SqlEngine
case TIME_BOUNDARY_QUERY:
case SCAN_NEEDS_SIGNATURE:
case UNNEST:
case GROUPBY_IMPLICITLY_SORTS:
return false;
case CAN_INSERT:
case CAN_REPLACE: