Fail query planning if a `CLUSTERED BY` column contains descending order (#14436)

* Throw ValidationException if CLUSTERED BY column descending order is specified.

- Fails query planning

* Some more tests.

* fixup existing comment

* Update comment

* checkstyle fix: remove unused imports

* Remove InsertCannotOrderByDescendingFault and deprecate the fault in readme.

* move deprecated field to the bottom
This commit is contained in:
Abhishek Radhakrishnan 2023-06-16 15:10:12 -07:00 committed by GitHub
parent 64af9bfe5b
commit 04fb75719e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 192 additions and 116 deletions

View File

@ -425,7 +425,6 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The column type is not supported. This can be because:<br /> <br /><ul><li>Support for writing or reading from a particular column type is not supported.</li><li>The query attempted to use a column type that is not supported by the frame format. This occurs with ARRAY types, which are not yet implemented for frames.</li></ul> | `columnName`: The column name with an unsupported type.<br /> <br />`columnType`: The unknown column type. | | <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The column type is not supported. This can be because:<br /> <br /><ul><li>Support for writing or reading from a particular column type is not supported.</li><li>The query attempted to use a column type that is not supported by the frame format. This occurs with ARRAY types, which are not yet implemented for frames.</li></ul> | `columnName`: The column name with an unsupported type.<br /> <br />`columnType`: The unknown column type. |
| <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. | | <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. |
| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` | | <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` |
| <a name="error_InsertCannotOrderByDescending">`InsertCannotOrderByDescending`</a> | An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. | `columnName` |
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | | | <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.<br /><br />This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.| | <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.<br /><br />This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.|
| <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp | | <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp |
@ -449,3 +448,4 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker task. | | <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker task. |
| <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task | | <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task |
| <a name="error_UnknownError">`UnknownError`</a> | All other errors. | `message` | | <a name="error_UnknownError">`UnknownError`</a> | All other errors. | `message` |
| <a name="error_InsertCannotOrderByDescending">`InsertCannotOrderByDescending`</a> | Deprecated. An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. The query returns a `ValidationException` instead of the fault. | `columnName` |

View File

@ -101,7 +101,6 @@ import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.FaultsExceededChecker; import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport; import org.apache.druid.msq.indexing.error.MSQErrorReport;
@ -1854,10 +1853,6 @@ public class ControllerImpl implements Controller
// Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows // Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows
// within an individual segment. // within an individual segment.
for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) { for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) {
if (clusterByColumn.order() == KeyOrder.DESCENDING) {
throw new MSQException(new InsertCannotOrderByDescendingFault(clusterByColumn.columnName()));
}
final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName()); final IntList outputColumns = columnMappings.getOutputColumnsForQueryColumn(clusterByColumn.columnName());
for (final int outputColumn : outputColumns) { for (final int outputColumn : outputColumns) {
outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn)); outputColumnsInOrder.add(columnMappings.getOutputColumnName(outputColumn));

View File

@ -45,7 +45,6 @@ import org.apache.druid.msq.indexing.error.ColumnTypeNotSupportedFault;
import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault; import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault; import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
@ -113,7 +112,6 @@ public class MSQIndexingModule implements DruidModule
DurableStorageConfigurationFault.class, DurableStorageConfigurationFault.class,
InsertCannotAllocateSegmentFault.class, InsertCannotAllocateSegmentFault.class,
InsertCannotBeEmptyFault.class, InsertCannotBeEmptyFault.class,
InsertCannotOrderByDescendingFault.class,
InsertLockPreemptedFault.class, InsertLockPreemptedFault.class,
InsertTimeNullFault.class, InsertTimeNullFault.class,
InsertTimeOutOfBoundsFault.class, InsertTimeOutOfBoundsFault.class,

View File

@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.error;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import java.util.Objects;
@JsonTypeName(InsertCannotOrderByDescendingFault.CODE)
public class InsertCannotOrderByDescendingFault extends BaseMSQFault
{
static final String CODE = "InsertCannotOrderByDescending";
private final String columnName;
@JsonCreator
public InsertCannotOrderByDescendingFault(
@JsonProperty("columnName") final String columnName
)
{
super(CODE, "Cannot ingest column [%s] in descending order", columnName);
this.columnName = Preconditions.checkNotNull(columnName, "columnName");
}
@JsonProperty
public String getColumnName()
{
return columnName;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
InsertCannotOrderByDescendingFault that = (InsertCannotOrderByDescendingFault) o;
return Objects.equals(columnName, that.columnName);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), columnName);
}
}

View File

@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertCannotOrderByDescendingFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault; import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault; import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
@ -92,23 +91,6 @@ public class MSQFaultsTest extends MSQTestBase
.verifyResults(); .verifyResults();
} }
@Test
public void testInsertCannotOrderByDescendingFault()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
// Add an DESC clustered by column, which should not be allowed
testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '2000-01-02 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1 DESC")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(new InsertCannotOrderByDescendingFault("d1"))
.verifyResults();
}
@Test @Test
public void testInsertTimeOutOfBoundsFault() public void testInsertTimeOutOfBoundsFault()
{ {

View File

@ -736,6 +736,25 @@ public class MSQInsertTest extends MSQTestBase
} }
@Test
public void testInsertWithClusteredByDescendingThrowsException()
{
// Add a DESC clustered by column, which should not be allowed
testIngestQuery().setSql("INSERT INTO foo1 "
+ "SELECT __time, dim1 , count(*) as cnt "
+ "FROM foo "
+ "GROUP BY 1, 2"
+ "PARTITIONED BY DAY "
+ "CLUSTERED BY dim1 DESC"
)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order."))
))
.verifyPlanningErrors();
}
@Test @Test
public void testRollUpOnFoo1WithTimeFunctionComplexCol() public void testRollUpOnFoo1WithTimeFunctionComplexCol()
{ {

View File

@ -678,7 +678,7 @@ public class MSQReplaceTest extends MSQTestBase
} }
@Test @Test
public void testInsertOnFoo1Range() public void testReplaceOnFoo1Range()
{ {
RowSignature rowSignature = RowSignature.builder() RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG) .add("__time", ColumnType.LONG)
@ -731,6 +731,25 @@ public class MSQReplaceTest extends MSQTestBase
.verifyResults(); .verifyResults();
} }
@Test
public void testReplaceWithClusteredByDescendingThrowsException()
{
// Add a DESC clustered by column, which should not be allowed
testIngestQuery().setSql(" REPLACE INTO foobar "
+ "OVERWRITE ALL "
+ "SELECT __time, m1, m2 "
+ "FROM foo "
+ "PARTITIONED BY ALL TIME "
+ "CLUSTERED BY m2, m1 DESC"
)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"[`m1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order."))
))
.verifyPlanningErrors();
}
@Test @Test
public void testReplaceTombstonesOverPartiallyOverlappingSegments() public void testReplaceTombstonesOverPartiallyOverlappingSegments()
{ {

View File

@ -56,7 +56,6 @@ public class MSQFaultSerdeTest
assertFaultSerde(new ColumnNameRestrictedFault("the column")); assertFaultSerde(new ColumnNameRestrictedFault("the column"));
assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY)); assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY));
assertFaultSerde(new InsertCannotBeEmptyFault("the datasource")); assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
assertFaultSerde(new InsertCannotOrderByDescendingFault("the column"));
assertFaultSerde(InsertLockPreemptedFault.INSTANCE); assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE); assertFaultSerde(InsertTimeNullFault.INSTANCE);
assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY)); assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));

View File

@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlIdentifier;
@ -256,9 +257,12 @@ public class DruidSqlParserUtils
* @param query sql query * @param query sql query
* @param clusteredByList List of clustered by columns * @param clusteredByList List of clustered by columns
* @return SqlOrderBy node containing the clusteredByList information * @return SqlOrderBy node containing the clusteredByList information
* @throws ValidationException if any of the clustered by columns contain DESCENDING order.
*/ */
public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, SqlNodeList clusteredByList) public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, SqlNodeList clusteredByList)
throws ValidationException
{ {
validateClusteredByColumns(clusteredByList);
// If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new // If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new
// SqlOrderBy node // SqlOrderBy node
SqlNode offset = null; SqlNode offset = null;
@ -266,9 +270,9 @@ public class DruidSqlParserUtils
if (query instanceof SqlOrderBy) { if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query; SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
// This represents the underlying query free of OFFSET, FETCH and ORDER BY clauses // query represents the underlying query free of OFFSET, FETCH and ORDER BY clauses
// For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 10 FETCH 30 ORDER BY dim1 GROUP // For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo GROUP BY dim1 ORDER BY dim1 FETCH 30 OFFSET 10",
// BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1 // this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1
query = sqlOrderBy.query; query = sqlOrderBy.query;
offset = sqlOrderBy.offset; offset = sqlOrderBy.offset;
fetch = sqlOrderBy.fetch; fetch = sqlOrderBy.fetch;
@ -283,6 +287,29 @@ public class DruidSqlParserUtils
); );
} }
/**
* Validates the clustered by columns to ensure that it does not contain DESCENDING order columns.
*
* @param clusteredByNodes List of SqlNodes representing columns to be clustered by.
* @throws ValidationException if any of the clustered by columns contain DESCENDING order.
*/
public static void validateClusteredByColumns(final SqlNodeList clusteredByNodes) throws ValidationException
{
if (clusteredByNodes == null) {
return;
}
for (final SqlNode clusteredByNode : clusteredByNodes.getList()) {
if (clusteredByNode.isA(ImmutableSet.of(SqlKind.DESCENDING))) {
throw new ValidationException(
StringUtils.format("[%s] is invalid."
+ " CLUSTERED BY columns cannot be sorted in descending order.", clusteredByNode.toString()
)
);
}
}
}
/** /**
* This method is used to convert an {@link SqlNode} representing a query into a {@link DimFilter} for the same query. * This method is used to convert an {@link SqlNode} representing a query into a {@link DimFilter} for the same query.
* It takes the timezone as a separate parameter, as Sql timestamps don't contain that information. Supported functions * It takes the timezone as a separate parameter, as Sql timestamps don't contain that information. Supported functions

View File

@ -653,11 +653,11 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
skipVectorize(); skipVectorize();
final String resources = "[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"; final String resources = "[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2, `dim1` DESC, CEIL(`m2`)\"}"; final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2, `dim1`, CEIL(`m2`)\"}";
final String sql = "EXPLAIN PLAN FOR INSERT INTO druid.dst " final String sql = "EXPLAIN PLAN FOR INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo " + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo "
+ "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)"; + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1, CEIL(m2)";
ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper(); ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
final ScanQuery expectedQuery = newScanQueryBuilder() final ScanQuery expectedQuery = newScanQueryBuilder()
@ -671,7 +671,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.orderBy( .orderBy(
ImmutableList.of( ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING), new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING), new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING) new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING)
) )
) )
@ -718,7 +718,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"floor(\\\"m1\\\")\",\"outputType\":\"FLOAT\"}," + "\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"floor(\\\"m1\\\")\",\"outputType\":\"FLOAT\"},"
+ "{\"type\":\"expression\",\"name\":\"v1\",\"expression\":\"ceil(\\\"m2\\\")\",\"outputType\":\"DOUBLE\"}]," + "{\"type\":\"expression\",\"name\":\"v1\",\"expression\":\"ceil(\\\"m2\\\")\",\"outputType\":\"DOUBLE\"}],"
+ "\"resultFormat\":\"compactedList\"," + "\"resultFormat\":\"compactedList\","
+ "\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"descending\"}," + "\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"ascending\"},"
+ "{\"columnName\":\"v1\",\"order\":\"ascending\"}],\"columns\":[\"__time\",\"dim1\",\"v0\",\"v1\"],\"legacy\":false," + "{\"columnName\":\"v1\",\"order\":\"ascending\"}],\"columns\":[\"__time\",\"dim1\",\"v0\",\"v1\"],\"legacy\":false,"
+ "\"context\":{\"sqlInsertSegmentGranularity\":\"\\\"DAY\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}}," + "\"context\":{\"sqlInsertSegmentGranularity\":\"\\\"DAY\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}},"
+ "\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"v0\",\"type\":\"FLOAT\"},{\"name\":\"dim1\",\"type\":\"STRING\"}," + "\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"v0\",\"type\":\"FLOAT\"},{\"name\":\"dim1\",\"type\":\"STRING\"},"
@ -751,6 +751,24 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
didTest = true; didTest = true;
} }
@Test
public void testExplainPlanInsertWithClusteredByDescThrowsException()
{
skipVectorize();
final String sql = "EXPLAIN PLAN FOR INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo "
+ "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)";
testIngestionQuery()
.sql(sql)
.expectValidationError(
SqlPlanningException.class,
"[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order."
)
.verify();
}
@Test @Test
public void testInsertWithClusteredBy() public void testInsertWithClusteredBy()
{ {
@ -765,7 +783,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql( .sql(
"INSERT INTO druid.dst " "INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo " + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo "
+ "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)" + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1, CEIL(m2)"
) )
.expectTarget("dst", targetRowSignature) .expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst")) .expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
@ -781,7 +799,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.orderBy( .orderBy(
ImmutableList.of( ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING), new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING), new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING) new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING)
) )
) )
@ -1052,7 +1070,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
final String query = "EXPLAIN PLAN FOR INSERT INTO druid.dst " final String query = "EXPLAIN PLAN FOR INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo " + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo "
+ "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)"; + "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 ASC, CEIL(m2)";
ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper(); ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
final ScanQuery expectedQuery = newScanQueryBuilder() final ScanQuery expectedQuery = newScanQueryBuilder()
@ -1066,7 +1084,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.orderBy( .orderBy(
ImmutableList.of( ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING), new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING), new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING) new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING)
) )
) )
@ -1091,7 +1109,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"floor(\\\"m1\\\")\",\"outputType\":\"FLOAT\"}," + "\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"floor(\\\"m1\\\")\",\"outputType\":\"FLOAT\"},"
+ "{\"type\":\"expression\",\"name\":\"v1\",\"expression\":\"ceil(\\\"m2\\\")\",\"outputType\":\"DOUBLE\"}]," + "{\"type\":\"expression\",\"name\":\"v1\",\"expression\":\"ceil(\\\"m2\\\")\",\"outputType\":\"DOUBLE\"}],"
+ "\"resultFormat\":\"compactedList\"," + "\"resultFormat\":\"compactedList\","
+ "\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"descending\"}," + "\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"dim1\",\"order\":\"ascending\"},"
+ "{\"columnName\":\"v1\",\"order\":\"ascending\"}],\"columns\":[\"__time\",\"dim1\",\"v0\",\"v1\"],\"legacy\":false," + "{\"columnName\":\"v1\",\"order\":\"ascending\"}],\"columns\":[\"__time\",\"dim1\",\"v0\",\"v1\"],\"legacy\":false,"
+ "\"context\":{\"sqlInsertSegmentGranularity\":\"\\\"DAY\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}}," + "\"context\":{\"sqlInsertSegmentGranularity\":\"\\\"DAY\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}},"
+ "\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"v0\",\"type\":\"FLOAT\"},{\"name\":\"dim1\",\"type\":\"STRING\"}," + "\"signature\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"v0\",\"type\":\"FLOAT\"},{\"name\":\"dim1\",\"type\":\"STRING\"},"
@ -1101,7 +1119,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "}]"; + "}]";
final String resources = "[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"; final String resources = "[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2, `dim1` DESC, CEIL(`m2`)\"}"; final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"druid.dst\",\"partitionedBy\":\"DAY\",\"clusteredBy\":\"2, `dim1`, CEIL(`m2`)\"}";
// Use testQuery for EXPLAIN (not testIngestionQuery). // Use testQuery for EXPLAIN (not testIngestionQuery).
testQuery( testQuery(

View File

@ -765,6 +765,26 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
didTest = true; didTest = true;
} }
@Test
public void testExplainPlanReplaceWithClusteredByDescThrowsException()
{
skipVectorize();
final String sql = "EXPLAIN PLAN FOR"
+ " REPLACE INTO dst"
+ " OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' "
+ "SELECT * FROM foo PARTITIONED BY DAY CLUSTERED BY dim1 DESC";
testIngestionQuery()
.sql(sql)
.expectValidationError(
SqlPlanningException.class,
"[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order."
)
.verify();
}
@Test @Test
public void testExplainReplaceFromExternalUnauthorized() public void testExplainReplaceFromExternalUnauthorized()
{ {

View File

@ -21,13 +21,17 @@ package org.apache.druid.sql.calcite.parser;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalQualifier; import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlPostfixOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion; import org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion;
@ -119,7 +123,74 @@ public class DruidSqlParserUtilsTest
} }
} }
public static class FloorToGranularityConversionTestErrors public static class ClusteredByColumnsValidationTest
{
/**
* Tests an empty CLUSTERED BY clause
*/
@Test
public void testEmptyClusteredByColumnsValid()
{
final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO);
try {
DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs);
}
catch (ValidationException e) {
Assert.fail("Did not expect an exception" + e.getMessage());
}
}
/**
* Tests clause "CLUSTERED BY DIM1, DIM2 ASC, 3"
*/
@Test
public void testClusteredByColumnsValid()
{
final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO);
clusteredByArgs.add(new SqlIdentifier("DIM1", SqlParserPos.ZERO));
clusteredByArgs.add(new SqlIdentifier("DIM2 ASC", SqlParserPos.ZERO));
clusteredByArgs.add(SqlLiteral.createExactNumeric("3", SqlParserPos.ZERO));
try {
DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs);
}
catch (ValidationException e) {
Assert.fail("Did not expect an exception" + e.getMessage());
}
}
/**
* Tests clause "CLUSTERED BY DIM1, DIM2 ASC, 3, DIM4 DESC"
*/
@Test
public void testClusteredByColumnsWithDescThrowsException()
{
final SqlNodeList clusteredByArgs = new SqlNodeList(SqlParserPos.ZERO);
clusteredByArgs.add(new SqlIdentifier("DIM1", SqlParserPos.ZERO));
clusteredByArgs.add(new SqlIdentifier("DIM2 ASC", SqlParserPos.ZERO));
clusteredByArgs.add(SqlLiteral.createExactNumeric("3", SqlParserPos.ZERO));
final SqlBasicCall sqlBasicCall = new SqlBasicCall(
new SqlPostfixOperator("DESC", SqlKind.DESCENDING, 2, null, null, null),
new SqlNode[]{
new SqlIdentifier("DIM4", SqlParserPos.ZERO)
},
new SqlParserPos(0, 3)
);
clusteredByArgs.add(sqlBasicCall);
ValidationException e = Assert.assertThrows(
ValidationException.class,
() -> DruidSqlParserUtils.validateClusteredByColumns(clusteredByArgs)
);
Assert.assertEquals(
"[`DIM4` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order.",
e.getMessage()
);
}
}
public static class FloorToGranularityConversionErrorsTest
{ {
/** /**
* Tests clause like "PARTITIONED BY 'day'" * Tests clause like "PARTITIONED BY 'day'"

View File

@ -1,4 +1,4 @@
LogicalInsert(target=[druid.dst], partitionedBy=[{type=period, period=P1D, timeZone=UTC, origin=null}], clusteredBy=[2, `dim1` DESC, CEIL(`m2`)]) LogicalInsert(target=[druid.dst], partitionedBy=[{type=period, period=P1D, timeZone=UTC, origin=null}], clusteredBy=[2, `dim1`, CEIL(`m2`)])
LogicalSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[DESC], dir2=[ASC]) LogicalSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC])
LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$1], ceil_m2=[CEIL($6)]) LogicalProject(__time=[$0], floor_m1=[FLOOR($5)], dim1=[$1], ceil_m2=[CEIL($6)])
LogicalTableScan(table=[[druid, foo]]) LogicalTableScan(table=[[druid, foo]])