diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java index 8686414cc0b..2556ce74cf0 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/server/http/catalog/EditorTest.java @@ -19,6 +19,8 @@ package org.apache.druid.server.http.catalog; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import org.apache.curator.shaded.com.google.common.collect.ImmutableMap; import org.apache.druid.catalog.CatalogException; import org.apache.druid.catalog.http.TableEditRequest; @@ -34,10 +36,12 @@ import org.apache.druid.catalog.model.ColumnSpec; import org.apache.druid.catalog.model.Columns; import org.apache.druid.catalog.model.TableId; import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.table.ClusterKeySpec; import org.apache.druid.catalog.model.table.DatasourceDefn; import org.apache.druid.catalog.model.table.TableBuilder; import org.apache.druid.catalog.storage.CatalogStorage; import org.apache.druid.catalog.storage.CatalogTests; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.metadata.TestDerbyConnector; import org.junit.After; import org.junit.Before; @@ -56,6 +60,7 @@ import static org.junit.Assert.assertThrows; public class EditorTest { + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); @@ -326,7 +331,6 @@ public class EditorTest // Can't test an empty property set: no table type allows empty // properties. - // Remove a required property Map updates = new HashMap<>(); updates.put(DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, null); cmd = new UpdateProperties(updates); @@ -374,6 +378,37 @@ public class EditorTest expected, doEdit(tableName, cmd).spec().properties() ); + + // Add a DESC cluster key - should fail + Map updates1 = new HashMap<>(); + updates1.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", true))); + + assertThrows( + CatalogException.class, + () -> new TableEditor( + catalog, + table.id(), + new UpdateProperties(updates1) + ).go() + ); + + // Add a ASC cluster key - should succeed + updates = new HashMap<>(); + updates.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false))); + cmd = new UpdateProperties(updates); + expected = ImmutableMap.of( + DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "PT1H", + DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false)) + ); + Map actual = doEdit(tableName, cmd).spec().properties(); + actual.put( + DatasourceDefn.CLUSTER_KEYS_PROPERTY, + MAPPER.convertValue(actual.get(DatasourceDefn.CLUSTER_KEYS_PROPERTY), ClusterKeySpec.CLUSTER_KEY_LIST_TYPE_REF) + ); + assertEquals( + expected, + actual + ); } @Test diff --git a/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java b/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java index 46d21bf1279..b3dc953cf8e 100644 --- a/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java +++ b/server/src/main/java/org/apache/druid/catalog/model/table/DatasourceDefn.java @@ -110,6 +110,34 @@ public class DatasourceDefn extends TableDefn } } + public static class ClusterKeysDefn extends ModelProperties.ListPropertyDefn + { + public ClusterKeysDefn() + { + super( + CLUSTER_KEYS_PROPERTY, + "ClusterKeySpec list", + new TypeReference>() {} + ); + } + + @Override + public void validate(Object value, ObjectMapper jsonMapper) + { + if (value == null) { + return; + } + List clusterKeys = decode(value, jsonMapper); + for (ClusterKeySpec clusterKey : clusterKeys) { + if (clusterKey.desc()) { + throw new IAE( + StringUtils.format("Cannot specify DESC clustering key [%s]. Only ASC is supported.", clusterKey) + ); + } + } + } + } + public DatasourceDefn() { super( @@ -118,11 +146,7 @@ public class DatasourceDefn extends TableDefn Arrays.asList( new SegmentGranularityFieldDefn(), new ModelProperties.IntPropertyDefn(TARGET_SEGMENT_ROWS_PROPERTY), - new ModelProperties.ListPropertyDefn( - CLUSTER_KEYS_PROPERTY, - "cluster keys", - new TypeReference>() { } - ), + new ClusterKeysDefn(), new HiddenColumnsDefn(), new ModelProperties.BooleanPropertyDefn(SEALED_PROPERTY) ), diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java index 1bccf580eb3..b93db97acc6 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/DatasourceTableTest.java @@ -20,6 +20,7 @@ package org.apache.druid.catalog.model.table; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.catalog.CatalogTest; @@ -116,6 +117,28 @@ public class DatasourceTableTest } } + @Test + public void testSpecWithClusterKeyProp() + { + { + TableSpec spec = new TableSpec( + DatasourceDefn.TABLE_TYPE, + ImmutableMap.of(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", true))), + null + ); + expectValidationFails(spec); + } + + { + TableSpec spec = new TableSpec( + DatasourceDefn.TABLE_TYPE, + ImmutableMap.of(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false))), + null + ); + expectValidationSucceeds(spec); + } + } + @Test public void testAllProperties() { @@ -125,6 +148,7 @@ public class DatasourceTableTest .put(DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000) .put(DatasourceDefn.HIDDEN_COLUMNS_PROPERTY, Arrays.asList("foo", "bar")) .put(DatasourceDefn.SEALED_PROPERTY, true) + .put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false))) .build(); TableSpec spec = new TableSpec(DatasourceDefn.TABLE_TYPE, props, null); @@ -132,6 +156,7 @@ public class DatasourceTableTest assertEquals("P1D", facade.segmentGranularityString()); assertEquals(1_000_000, (int) facade.targetSegmentRows()); assertEquals(Arrays.asList("foo", "bar"), facade.hiddenColumns()); + assertEquals(Collections.singletonList(new ClusterKeySpec("clusterKeyA", false)), facade.clusterKeys()); assertTrue(facade.isSealed()); } diff --git a/sql/src/main/codegen/includes/insert.ftl b/sql/src/main/codegen/includes/insert.ftl index 323757dd78d..0f053a4f655 100644 --- a/sql/src/main/codegen/includes/insert.ftl +++ b/sql/src/main/codegen/includes/insert.ftl @@ -93,11 +93,7 @@ SqlNode DruidSqlInsertEof() : clusteredBy = ClusteredBy() ] { - if (clusteredBy != null && partitionedBy == null) { - throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing( - "CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause" - ); - } + } // EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times. // The reason for adding EOF here is to ensure that we create a DruidSqlInsert node after the syntax has been diff --git a/sql/src/main/codegen/includes/replace.ftl b/sql/src/main/codegen/includes/replace.ftl index b8851167036..10449c3fccb 100644 --- a/sql/src/main/codegen/includes/replace.ftl +++ b/sql/src/main/codegen/includes/replace.ftl @@ -77,11 +77,7 @@ SqlNode DruidSqlReplaceEof() : clusteredBy = ClusteredBy() ] { - if (clusteredBy != null && partitionedBy == null) { - throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing( - "CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause" - ); - } + } // EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times. // The reason for adding EOF here is to ensure that we create a DruidSqlReplace node after the syntax has been diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index aa04a55eb50..b4f006ce97e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -291,6 +291,7 @@ public class DruidSqlValidator extends BaseDruidSqlValidator SqlSelect select, SqlNode enclosingNode) { + SqlNodeList catalogClustering = null; if (enclosingNode instanceof DruidSqlIngest) { // The target is a new or existing datasource. // The target namespace is both the target table ID and the row type for that table. @@ -308,11 +309,10 @@ public class DruidSqlValidator extends BaseDruidSqlValidator final DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata(); - // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause - final SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata); - rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode, catalogClustering); - return new SelectNamespace(this, select, enclosingNode); + catalogClustering = convertCatalogClustering(tableMetadata); } + // Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause + rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode, catalogClustering); } return super.createSelectNamespace(select, enclosingNode); } @@ -408,6 +408,7 @@ public class DruidSqlValidator extends BaseDruidSqlValidator } final SqlSelect select = (SqlSelect) source; + DruidSqlParserUtils.validateClusteredByColumns(clusteredBy); select.setOrderBy(clusteredBy); } @@ -451,6 +452,17 @@ public class DruidSqlValidator extends BaseDruidSqlValidator } if (effectiveGranularity == null) { + SqlNode source = ingestNode.getSource(); + while (source instanceof SqlWith) { + source = ((SqlWith) source).getOperandList().get(1); + } + final SqlSelect select = (SqlSelect) source; + + if (select.getOrderList() != null) { + throw DruidSqlParserUtils.problemParsing( + "CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause" + ); + } throw InvalidSqlInput.exception( "Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.", operationName); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java index 45b4b40e41c..c85e6336de5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCatalogIngestionDmlTest.java @@ -264,6 +264,55 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml null, null ), + new TableSpec( + DatasourceDefn.TABLE_TYPE, + ImmutableMap.of( + DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "ALL", + DatasourceDefn.CLUSTER_KEYS_PROPERTY, + ImmutableList.of( + new ClusterKeySpec("dim1", false), + new ClusterKeySpec("dim2", false) + ) + ), + ImmutableList.of( + new ColumnSpec("__time", Columns.TIME_COLUMN, null), + new ColumnSpec("dim1", Columns.STRING, null), + new ColumnSpec("dim2", Columns.STRING, null), + new ColumnSpec("cnt", Columns.LONG, null) + ) + ), + MAPPER + )), + DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .build()), + false + ) + ), + "tableWithClusteringDesc", new DatasourceTable( + FOO_TABLE_SIGNATURE, + new DatasourceTable.PhysicalDatasourceMetadata( + new TableDataSource("tableWithClusteringDesc"), + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("cnt", ColumnType.LONG) + .build(), + false, + false + ), + new DatasourceTable.EffectiveMetadata( + new DatasourceFacade(new ResolvedTable( + new TableDefn( + "tableWithClusteringDesc", + DatasourceDefn.TABLE_TYPE, + null, + null + ), new TableSpec( DatasourceDefn.TABLE_TYPE, ImmutableMap.of( @@ -523,7 +572,7 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml .orderBy( ImmutableList.of( new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING), - new ScanQuery.OrderBy("d", ScanQuery.Order.DESCENDING) + new ScanQuery.OrderBy("d", ScanQuery.Order.ASCENDING) ) ) // Scan query lists columns in alphabetical order independent of the @@ -570,7 +619,6 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + " format => 'csv'))\n" + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + - "PARTITIONED BY ALL TIME\n" + "CLUSTERED BY dim1") .authentication(CalciteTests.SUPER_USER_AUTH_RESULT) .expectTarget("tableWithClustering", signature) @@ -664,11 +712,11 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml /** * Insert into a catalog table that has clustering defined on the table definition, but user specifies - * clustering on the ingest query on column that has not been defined in the table catalog definition. - * or in the select clause. Should fail with validation error. + * clustering on the ingest query on column that has not been specified in the select clause. Should fail with + * validation error. */ @Test - public void testInsertTableWithClusteringWithClusteringOnBadColumn() + public void testInsertTableWithQueryDefinedClusteringOnNonSelectedColumn() { testIngestionQuery() .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") + "\n" + @@ -690,6 +738,84 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml .verify(); } + /** + * Insert into a catalog table that has clustering defined on the table definition, but one of the clustering + * columns specified has not been specified in the select clause. Should fail with validation error. + */ + @Test + public void testInsertTableWithCatalogDefinedClusteringOnNonSelectedColumn() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " e AS dim3,\n" + + " 1 AS cnt\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Column 'dim2' not found in any table (line [0], column [0])") + .verify(); + } + + /** + * Insert into a catalog table that has clustering defined on the table definition, but one of the clustering + * columns specified has not been specified in the select clause. Should fail with validation error. + */ + @Test + public void testInsertTableWithCatalogDefinedClusteringDesc() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "tableWithClusteringDesc") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " d AS dim2,\n" + + " e AS dim3,\n" + + " 1 AS cnt\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME") + .expectValidationError( + DruidException.class, + "Invalid CLUSTERED BY clause [`dim2` DESC]: cannot sort in descending order.") + .verify(); + } + + /** + * Insert into a catalog table that has clustering defined on the table definition, but one of the clustering + * columns specified has not been specified in the select clause. Should fail with validation error. + */ + @Test + public void testInsertTableWithQueryDefinedClusteringDesc() + { + testIngestionQuery() + .sql(StringUtils.format(dmlPrefixPattern, "tableWithClusteringDesc") + "\n" + + "SELECT\n" + + " TIME_PARSE(a) AS __time,\n" + + " b AS dim1,\n" + + " d AS dim2,\n" + + " e AS dim3,\n" + + " 1 AS cnt\n" + + "FROM TABLE(inline(\n" + + " data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" + + " format => 'csv'))\n" + + " (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" + + "PARTITIONED BY ALL TIME\n" + + "CLUSTERED BY dim1 DESC") + .expectValidationError( + DruidException.class, + "Invalid CLUSTERED BY clause [`dim1` DESC]: cannot sort in descending order.") + .verify(); + } + /** * Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed. */ diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index 8b62bd66805..63e12aec2c3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1113,7 +1113,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest .sql( "INSERT INTO druid.dst " + "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo " - + "CLUSTERED BY 2, dim1 DESC, CEIL(m2)" + + "CLUSTERED BY 2, dim1, CEIL(m2)" ) .expectValidationError(invalidSqlIs( "CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"