From 04fb75719e7dd76d051dbd4cf8f9c2712b227793 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Fri, 16 Jun 2023 15:10:12 -0700 Subject: [PATCH] Fail query planning if a `CLUSTERED BY` column contains descending order (#14436) * Throw ValidationException if CLUSTERED BY column descending order is specified. - Fails query planning * Some more tests. * fixup existing comment * Update comment * checkstyle fix: remove unused imports * Remove InsertCannotOrderByDescendingFault and deprecate the fault in readme. * move deprecated field to the bottom --- docs/multi-stage-query/reference.md | 2 +- .../apache/druid/msq/exec/ControllerImpl.java | 5 -- .../druid/msq/guice/MSQIndexingModule.java | 2 - .../InsertCannotOrderByDescendingFault.java | 72 ------------------ .../apache/druid/msq/exec/MSQFaultsTest.java | 18 ----- .../apache/druid/msq/exec/MSQInsertTest.java | 19 +++++ .../apache/druid/msq/exec/MSQReplaceTest.java | 21 +++++- .../msq/indexing/error/MSQFaultSerdeTest.java | 1 - .../calcite/parser/DruidSqlParserUtils.java | 33 ++++++++- .../sql/calcite/CalciteInsertDmlTest.java | 38 +++++++--- .../sql/calcite/CalciteReplaceDmlTest.java | 20 +++++ .../parser/DruidSqlParserUtilsTest.java | 73 ++++++++++++++++++- .../insertWithClusteredBy-logicalPlan.txt | 4 +- 13 files changed, 192 insertions(+), 116 deletions(-) delete mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotOrderByDescendingFault.java diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index b28b3a2f5be..3e54abfce8b 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -425,7 +425,6 @@ The following table describes error codes you may encounter in the `multiStageQu | `ColumnTypeNotSupported` | The column type is not supported. This can be because:

| `columnName`: The column name with an unsupported type.

`columnType`: The unknown column type. | | `InsertCannotAllocateSegment` | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:

| `dataSource`

`interval`: The interval for the attempted new segment allocation. | | `InsertCannotBeEmpty` | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` | -| `InsertCannotOrderByDescending` | An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. | `columnName` | | `InsertLockPreempted` | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | | | `InsertTimeNull` | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.

This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.| | `InsertTimeOutOfBounds` | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.

To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp | @@ -449,3 +448,4 @@ The following table describes error codes you may encounter in the `multiStageQu | `WorkerFailed` | A worker task failed unexpectedly. | `errorMsg`

`workerTaskId`: The ID of the worker task. | | `WorkerRpcFailed` | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task | | `UnknownError` | All other errors. | `message` | +| `InsertCannotOrderByDescending` | Deprecated. An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. The query returns a `ValidationException` instead of the fault. | `columnName` | diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index a5ff643e8bf..dd163d406d3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -101,7 +101,6 @@ import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.FaultsExceededChecker; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; -import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; import org.apache.druid.msq.indexing.error.MSQErrorReport; @@ -1854,10 +1853,6 @@ public class ControllerImpl implements Controller // Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows // within an individual segment. for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) { - if (clusterByColumn.order() == KeyOrder.DESCENDING) { - throw new MSQException(new InsertCannotOrderByDescendingFault(clusterByColumn.columnName())); - } - final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName()); for (final int outputColumn : outputColumns) { outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn)); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java index 8cc7ab35bde..59300316d58 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java @@ -45,7 +45,6 @@ import org.apache.druid.msq.indexing.error.ColumnTypeNotSupportedFault; import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; -import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; import org.apache.druid.msq.indexing.error.InsertTimeNullFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; @@ -113,7 +112,6 @@ public class MSQIndexingModule implements DruidModule DurableStorageConfigurationFault.class, InsertCannotAllocateSegmentFault.class, InsertCannotBeEmptyFault.class, - InsertCannotOrderByDescendingFault.class, InsertLockPreemptedFault.class, InsertTimeNullFault.class, InsertTimeOutOfBoundsFault.class, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotOrderByDescendingFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotOrderByDescendingFault.java deleted file mode 100644 index 43b50e87827..00000000000 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotOrderByDescendingFault.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.indexing.error; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; - -import java.util.Objects; - -@JsonTypeName(InsertCannotOrderByDescendingFault.CODE) -public class InsertCannotOrderByDescendingFault extends BaseMSQFault -{ - static final String CODE = "InsertCannotOrderByDescending"; - - private final String columnName; - - @JsonCreator - public InsertCannotOrderByDescendingFault( - @JsonProperty("columnName") final String columnName - ) - { - super(CODE, "Cannot ingest column [%s] in descending order", columnName); - this.columnName = Preconditions.checkNotNull(columnName, "columnName"); - } - - @JsonProperty - public String getColumnName() - { - return columnName; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - InsertCannotOrderByDescendingFault that = (InsertCannotOrderByDescendingFault) o; - return Objects.equals(columnName, that.columnName); - } - - @Override - public int hashCode() - { - return Objects.hash(super.hashCode(), columnName); - } -} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index c1da6ebc8e7..646286acaf5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; -import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault; import org.apache.druid.msq.indexing.error.InsertTimeNullFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault; @@ -92,23 +91,6 @@ public class MSQFaultsTest extends MSQTestBase .verifyResults(); } - @Test - public void testInsertCannotOrderByDescendingFault() - { - RowSignature rowSignature = RowSignature.builder() - .add("__time", ColumnType.LONG) - .add("dim1", ColumnType.STRING) - .add("cnt", ColumnType.LONG).build(); - - // Add an DESC clustered by column, which should not be allowed - testIngestQuery().setSql( - "insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '2000-01-02 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1 DESC") - .setExpectedDataSource("foo1") - .setExpectedRowSignature(rowSignature) - .setExpectedMSQFault(new InsertCannotOrderByDescendingFault("d1")) - .verifyResults(); - } - @Test public void testInsertTimeOutOfBoundsFault() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 548e5e01664..b55de6c165c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -736,6 +736,25 @@ public class MSQInsertTest extends MSQTestBase } + @Test + public void testInsertWithClusteredByDescendingThrowsException() + { + // Add a DESC clustered by column, which should not be allowed + testIngestQuery().setSql("INSERT INTO foo1 " + + "SELECT __time, dim1 , count(*) as cnt " + + "FROM foo " + + "GROUP BY 1, 2" + + "PARTITIONED BY DAY " + + "CLUSTERED BY dim1 DESC" + ) + .setExpectedValidationErrorMatcher(CoreMatchers.allOf( + CoreMatchers.instanceOf(SqlPlanningException.class), + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( + "[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order.")) + )) + .verifyPlanningErrors(); + } + @Test public void testRollUpOnFoo1WithTimeFunctionComplexCol() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index eb1c459e66b..1dfd7742146 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -678,7 +678,7 @@ public class MSQReplaceTest extends MSQTestBase } @Test - public void testInsertOnFoo1Range() + public void testReplaceOnFoo1Range() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -731,6 +731,25 @@ public class MSQReplaceTest extends MSQTestBase .verifyResults(); } + @Test + public void testReplaceWithClusteredByDescendingThrowsException() + { + // Add a DESC clustered by column, which should not be allowed + testIngestQuery().setSql(" REPLACE INTO foobar " + + "OVERWRITE ALL " + + "SELECT __time, m1, m2 " + + "FROM foo " + + "PARTITIONED BY ALL TIME " + + "CLUSTERED BY m2, m1 DESC" + ) + .setExpectedValidationErrorMatcher(CoreMatchers.allOf( + CoreMatchers.instanceOf(SqlPlanningException.class), + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( + "[`m1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order.")) + )) + .verifyPlanningErrors(); + } + @Test public void testReplaceTombstonesOverPartiallyOverlappingSegments() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java index 256397e9a23..484989e7bad 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java @@ -56,7 +56,6 @@ public class MSQFaultSerdeTest assertFaultSerde(new ColumnNameRestrictedFault("the column")); assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY)); assertFaultSerde(new InsertCannotBeEmptyFault("the datasource")); - assertFaultSerde(new InsertCannotOrderByDescendingFault("the column")); assertFaultSerde(InsertLockPreemptedFault.INSTANCE); assertFaultSerde(InsertTimeNullFault.INSTANCE); assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY)); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java index cc67cff1199..9009237b780 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtils.java @@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.parser; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; @@ -256,9 +257,12 @@ public class DruidSqlParserUtils * @param query sql query * @param clusteredByList List of clustered by columns * @return SqlOrderBy node containing the clusteredByList information + * @throws ValidationException if any of the clustered by columns contain DESCENDING order. */ public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, SqlNodeList clusteredByList) + throws ValidationException { + validateClusteredByColumns(clusteredByList); // If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new // SqlOrderBy node SqlNode offset = null; @@ -266,9 +270,9 @@ public class DruidSqlParserUtils if (query instanceof SqlOrderBy) { SqlOrderBy sqlOrderBy = (SqlOrderBy) query; - // This represents the underlying query free of OFFSET, FETCH and ORDER BY clauses - // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 10 FETCH 30 ORDER BY dim1 GROUP - // BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1 + // query represents the underlying query free of OFFSET, FETCH and ORDER BY clauses + // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo GROUP BY dim1 ORDER BY dim1 FETCH 30 OFFSET 10", + // this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1 query = sqlOrderBy.query; offset = sqlOrderBy.offset; fetch = sqlOrderBy.fetch; @@ -283,6 +287,29 @@ public class DruidSqlParserUtils ); } + /** + * Validates the clustered by columns to ensure that it does not contain DESCENDING order columns. + * + * @param clusteredByNodes List of SqlNodes representing columns to be clustered by. + * @throws ValidationException if any of the clustered by columns contain DESCENDING order. + */ + public static void validateClusteredByColumns(final SqlNodeList clusteredByNodes) throws ValidationException + { + if (clusteredByNodes == null) { + return; + } + + for (final SqlNode clusteredByNode : clusteredByNodes.getList()) { + if (clusteredByNode.isA(ImmutableSet.of(SqlKind.DESCENDING))) { + throw new ValidationException( + StringUtils.format("[%s] is invalid." + + " CLUSTERED BY columns cannot be sorted in descending order.", clusteredByNode.toString() + ) + ); + } + } + } + /** * This method is used to convert an {@link SqlNode} representing a query into a {@link DimFilter} for the same query. * It takes the timezone as a separate parameter, as Sql timestamps don't contain that information. Supported functions diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index 641b0ea49b4..aeabf5241a0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -653,11 +653,11 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest skipVectorize(); final String resources = "[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"; - final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2, `dim1` DESC, CEIL(`m2`)\"}"; + final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2, `dim1`, CEIL(`m2`)\"}"; final String sql = "EXPLAIN PLAN FOR INSERT INTO druid.dst " + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo " - + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)"; + + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1, CEIL(m2)"; ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper(); final ScanQuery expectedQuery = newScanQueryBuilder() @@ -671,7 +671,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest .orderBy( ImmutableList.of( new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING), - new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING), + new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING), new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING) ) ) @@ -718,7 +718,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest + "\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"floor(\\\"m1\\\")\",\"outputType\":\"FLOAT\"}," + "{\"type\":\"expression\",\"name\":\"v1\",\"expression\":\"ceil(\\\"m2\\\")\",\"outputType\":\"DOUBLE\"}]," + "\"resultFormat\":\"compactedList\"," - + "\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"descending\"}," + + "\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"ascending\"}," + "{\"columnName\":\"v1\",\"order\":\"ascending\"}],\"columns\":[\"__time\",\"dim1\",\"v0\",\"v1\"],\"legacy\":false," + "\"context\":{\"sqlInsertSegmentGranularity\":\"\\\"DAY\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}}," + "\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"v0\",\"type\":\"FLOAT\"},{\"name\":\"dim1\",\"type\":\"STRING\"}," @@ -751,6 +751,24 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest didTest = true; } + @Test + public void testExplainPlanInsertWithClusteredByDescThrowsException() + { + skipVectorize(); + + final String sql = "EXPLAIN PLAN FOR INSERT INTO druid.dst " + + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo " + + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)"; + + testIngestionQuery() + .sql(sql) + .expectValidationError( + SqlPlanningException.class, + "[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order." + ) + .verify(); + } + @Test public void testInsertWithClusteredBy() { @@ -765,7 +783,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest .sql( "INSERT INTO druid.dst " + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo " - + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)" + + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1, CEIL(m2)" ) .expectTarget("dst", targetRowSignature) .expectResources(dataSourceRead("foo"), dataSourceWrite("dst")) @@ -781,7 +799,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest .orderBy( ImmutableList.of( new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING), - new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING), + new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING), new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING) ) ) @@ -1052,7 +1070,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest final String query = "EXPLAIN PLAN FOR INSERT INTO druid.dst " + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo " - + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)"; + + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 ASC, CEIL(m2)"; ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper(); final ScanQuery expectedQuery = newScanQueryBuilder() @@ -1066,7 +1084,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest .orderBy( ImmutableList.of( new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING), - new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING), + new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING), new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING) ) ) @@ -1091,7 +1109,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest + "\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"floor(\\\"m1\\\")\",\"outputType\":\"FLOAT\"}," + "{\"type\":\"expression\",\"name\":\"v1\",\"expression\":\"ceil(\\\"m2\\\")\",\"outputType\":\"DOUBLE\"}]," + "\"resultFormat\":\"compactedList\"," - + "\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"descending\"}," + + "\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"ascending\"}," + "{\"columnName\":\"v1\",\"order\":\"ascending\"}],\"columns\":[\"__time\",\"dim1\",\"v0\",\"v1\"],\"legacy\":false," + "\"context\":{\"sqlInsertSegmentGranularity\":\"\\\"DAY\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}}," + "\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"v0\",\"type\":\"FLOAT\"},{\"name\":\"dim1\",\"type\":\"STRING\"}," @@ -1101,7 +1119,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest + "}]"; final String resources = "[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"; - final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2, `dim1` DESC, CEIL(`m2`)\"}"; + final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2, `dim1`, CEIL(`m2`)\"}"; // Use testQuery for EXPLAIN (not testIngestionQuery). testQuery( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java index fb922ffe772..0c1f016600d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java @@ -765,6 +765,26 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest didTest = true; } + + @Test + public void testExplainPlanReplaceWithClusteredByDescThrowsException() + { + skipVectorize(); + + final String sql = "EXPLAIN PLAN FOR" + + " REPLACE INTO dst" + + " OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' " + + "SELECT * FROM foo PARTITIONED BY DAY CLUSTERED BY dim1 DESC"; + + testIngestionQuery() + .sql(sql) + .expectValidationError( + SqlPlanningException.class, + "[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order." + ) + .verify(); + } + @Test public void testExplainReplaceFromExternalUnauthorized() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtilsTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtilsTest.java index 75c7f039941..1f295ea3587 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtilsTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/parser/DruidSqlParserUtilsTest.java @@ -21,13 +21,17 @@ package org.apache.druid.sql.calcite.parser; import com.google.common.collect.ImmutableList; import org.apache.calcite.avatica.util.TimeUnit; +import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlPostfixOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.tools.ValidationException; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion; @@ -119,7 +123,74 @@ public class DruidSqlParserUtilsTest } } - public static class FloorToGranularityConversionTestErrors + public static class ClusteredByColumnsValidationTest + { + /** + * Tests an empty CLUSTERED BY clause + */ + @Test + public void testEmptyClusteredByColumnsValid() + { + final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO); + try { + DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs); + } + catch (ValidationException e) { + Assert.fail("Did not expect an exception" + e.getMessage()); + } + } + + /** + * Tests clause "CLUSTERED BY DIM1, DIM2 ASC, 3" + */ + @Test + public void testClusteredByColumnsValid() + { + final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO); + clusteredByArgs.add(new SqlIdentifier("DIM1", SqlParserPos.ZERO)); + clusteredByArgs.add(new SqlIdentifier("DIM2 ASC", SqlParserPos.ZERO)); + clusteredByArgs.add(SqlLiteral.createExactNumeric("3", SqlParserPos.ZERO)); + + try { + DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs); + } + catch (ValidationException e) { + Assert.fail("Did not expect an exception" + e.getMessage()); + } + } + + /** + * Tests clause "CLUSTERED BY DIM1, DIM2 ASC, 3, DIM4 DESC" + */ + @Test + public void testClusteredByColumnsWithDescThrowsException() + { + final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO); + clusteredByArgs.add(new SqlIdentifier("DIM1", SqlParserPos.ZERO)); + clusteredByArgs.add(new SqlIdentifier("DIM2 ASC", SqlParserPos.ZERO)); + clusteredByArgs.add(SqlLiteral.createExactNumeric("3", SqlParserPos.ZERO)); + + final SqlBasicCall sqlBasicCall = new SqlBasicCall( + new SqlPostfixOperator("DESC", SqlKind.DESCENDING, 2, null, null, null), + new SqlNode[]{ + new SqlIdentifier("DIM4", SqlParserPos.ZERO) + }, + new SqlParserPos(0, 3) + ); + clusteredByArgs.add(sqlBasicCall); + + ValidationException e = Assert.assertThrows( + ValidationException.class, + () -> DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs) + ); + Assert.assertEquals( + "[`DIM4` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order.", + e.getMessage() + ); + } + } + + public static class FloorToGranularityConversionErrorsTest { /** * Tests clause like "PARTITIONED BY 'day'" diff --git a/sql/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt b/sql/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt index eb2d8501d5a..9eb25a81ec9 100644 --- a/sql/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt +++ b/sql/src/test/resources/calcite/expected/ingest/insertWithClusteredBy-logicalPlan.txt @@ -1,4 +1,4 @@ -LogicalInsert(target=[druid.dst], partitionedBy=[{type=period, period=P1D, timeZone=UTC, origin=null}], clusteredBy=[2, `dim1` DESC, CEIL(`m2`)]) - LogicalSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[DESC], dir2=[ASC]) +LogicalInsert(target=[druid.dst], partitionedBy=[{type=period, period=P1D, timeZone=UTC, origin=null}], clusteredBy=[2, `dim1`, CEIL(`m2`)]) + LogicalSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$1], ceil_m2=[CEIL($6)]) LogicalTableScan(table=[[druid, foo]])