Catalog clustering keys fixes (#16351)

* * add another catalog clustering columns unit test

* * dissallow clusterKeys with descending order

* * make more clear that clustering is re-written into ingest node
whether a catalog table or not

* * when partitionedBy is stored in catalog, user shouldnt need to specify
it in order to specify clustering

* * fix intellij inspection failure
This commit is contained in:
zachjsh 2024-05-03 14:02:56 -04:00 committed by GitHub
parent 4d62c4a917
commit fb7c84fb5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 240 additions and 26 deletions

View File

@ -19,6 +19,8 @@
package org.apache.druid.server.http.catalog;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.CatalogException;
import org.apache.druid.catalog.http.TableEditRequest;
@ -34,10 +36,12 @@ import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableId;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.TestDerbyConnector;
import org.junit.After;
import org.junit.Before;
@ -56,6 +60,7 @@ import static org.junit.Assert.assertThrows;
public class EditorTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
@ -326,7 +331,6 @@ public class EditorTest
// Can't test an empty property set: no table type allows empty
// properties.
// Remove a required property
Map<String, Object> updates = new HashMap<>();
updates.put(DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, null);
cmd = new UpdateProperties(updates);
@ -374,6 +378,37 @@ public class EditorTest
expected,
doEdit(tableName, cmd).spec().properties()
);
// Add a DESC cluster key - should fail
Map<String, Object> updates1 = new HashMap<>();
updates1.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", true)));
assertThrows(
CatalogException.class,
() -> new TableEditor(
catalog,
table.id(),
new UpdateProperties(updates1)
).go()
);
// Add a ASC cluster key - should succeed
updates = new HashMap<>();
updates.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false)));
cmd = new UpdateProperties(updates);
expected = ImmutableMap.of(
DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "PT1H",
DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false))
);
Map<String, Object> actual = doEdit(tableName, cmd).spec().properties();
actual.put(
DatasourceDefn.CLUSTER_KEYS_PROPERTY,
MAPPER.convertValue(actual.get(DatasourceDefn.CLUSTER_KEYS_PROPERTY), ClusterKeySpec.CLUSTER_KEY_LIST_TYPE_REF)
);
assertEquals(
expected,
actual
);
}
@Test

View File

@ -110,6 +110,34 @@ public class DatasourceDefn extends TableDefn
}
}
public static class ClusterKeysDefn extends ModelProperties.ListPropertyDefn<ClusterKeySpec>
{
public ClusterKeysDefn()
{
super(
CLUSTER_KEYS_PROPERTY,
"ClusterKeySpec list",
new TypeReference<List<ClusterKeySpec>>() {}
);
}
@Override
public void validate(Object value, ObjectMapper jsonMapper)
{
if (value == null) {
return;
}
List<ClusterKeySpec> clusterKeys = decode(value, jsonMapper);
for (ClusterKeySpec clusterKey : clusterKeys) {
if (clusterKey.desc()) {
throw new IAE(
StringUtils.format("Cannot specify DESC clustering key [%s]. Only ASC is supported.", clusterKey)
);
}
}
}
}
public DatasourceDefn()
{
super(
@ -118,11 +146,7 @@ public class DatasourceDefn extends TableDefn
Arrays.asList(
new SegmentGranularityFieldDefn(),
new ModelProperties.IntPropertyDefn(TARGET_SEGMENT_ROWS_PROPERTY),
new ModelProperties.ListPropertyDefn<ClusterKeySpec>(
CLUSTER_KEYS_PROPERTY,
"cluster keys",
new TypeReference<List<ClusterKeySpec>>() { }
),
new ClusterKeysDefn(),
new HiddenColumnsDefn(),
new ModelProperties.BooleanPropertyDefn(SEALED_PROPERTY)
),

View File

@ -20,6 +20,7 @@
package org.apache.druid.catalog.model.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.catalog.CatalogTest;
@ -116,6 +117,28 @@ public class DatasourceTableTest
}
}
@Test
public void testSpecWithClusterKeyProp()
{
{
TableSpec spec = new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", true))),
null
);
expectValidationFails(spec);
}
{
TableSpec spec = new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false))),
null
);
expectValidationSucceeds(spec);
}
}
@Test
public void testAllProperties()
{
@ -125,6 +148,7 @@ public class DatasourceTableTest
.put(DatasourceDefn.TARGET_SEGMENT_ROWS_PROPERTY, 1_000_000)
.put(DatasourceDefn.HIDDEN_COLUMNS_PROPERTY, Arrays.asList("foo", "bar"))
.put(DatasourceDefn.SEALED_PROPERTY, true)
.put(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", false)))
.build();
TableSpec spec = new TableSpec(DatasourceDefn.TABLE_TYPE, props, null);
@ -132,6 +156,7 @@ public class DatasourceTableTest
assertEquals("P1D", facade.segmentGranularityString());
assertEquals(1_000_000, (int) facade.targetSegmentRows());
assertEquals(Arrays.asList("foo", "bar"), facade.hiddenColumns());
assertEquals(Collections.singletonList(new ClusterKeySpec("clusterKeyA", false)), facade.clusterKeys());
assertTrue(facade.isSealed());
}

View File

@ -93,11 +93,7 @@ SqlNode DruidSqlInsertEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a DruidSqlInsert node after the syntax has been

View File

@ -77,11 +77,7 @@ SqlNode DruidSqlReplaceEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a DruidSqlReplace node after the syntax has been

View File

@ -291,6 +291,7 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
SqlSelect select,
SqlNode enclosingNode)
{
SqlNodeList catalogClustering = null;
if (enclosingNode instanceof DruidSqlIngest) {
// The target is a new or existing datasource.
// The target namespace is both the target table ID and the row type for that table.
@ -308,11 +309,10 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
final DatasourceFacade tableMetadata = table == null
? null
: table.effectiveMetadata().catalogMetadata();
// Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
final SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode, catalogClustering);
return new SelectNamespace(this, select, enclosingNode);
catalogClustering = convertCatalogClustering(tableMetadata);
}
// Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode, catalogClustering);
}
return super.createSelectNamespace(select, enclosingNode);
}
@ -408,6 +408,7 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
}
final SqlSelect select = (SqlSelect) source;
DruidSqlParserUtils.validateClusteredByColumns(clusteredBy);
select.setOrderBy(clusteredBy);
}
@ -451,6 +452,17 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
}
if (effectiveGranularity == null) {
SqlNode source = ingestNode.getSource();
while (source instanceof SqlWith) {
source = ((SqlWith) source).getOperandList().get(1);
}
final SqlSelect select = (SqlSelect) source;
if (select.getOrderList() != null) {
throw DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
}
throw InvalidSqlInput.exception(
"Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.",
operationName);

View File

@ -264,6 +264,55 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(
DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "ALL",
DatasourceDefn.CLUSTER_KEYS_PROPERTY,
ImmutableList.of(
new ClusterKeySpec("dim1", false),
new ClusterKeySpec("dim2", false)
)
),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null),
new ColumnSpec("dim1", Columns.STRING, null),
new ColumnSpec("dim2", Columns.STRING, null),
new ColumnSpec("cnt", Columns.LONG, null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder()
.addTimeColumn()
.add("dim1", ColumnType.STRING)
.add("dim2", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.build()),
false
)
),
"tableWithClusteringDesc", new DatasourceTable(
FOO_TABLE_SIGNATURE,
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("tableWithClusteringDesc"),
RowSignature.builder()
.addTimeColumn()
.add("dim1", ColumnType.STRING)
.add("dim2", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.build(),
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"tableWithClusteringDesc",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(
@ -523,7 +572,7 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("b", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("d", ScanQuery.Order.DESCENDING)
new ScanQuery.OrderBy("d", ScanQuery.Order.ASCENDING)
)
)
// Scan query lists columns in alphabetical order independent of the
@ -570,7 +619,6 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME\n" +
"CLUSTERED BY dim1")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("tableWithClustering", signature)
@ -664,11 +712,11 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
/**
* Insert into a catalog table that has clustering defined on the table definition, but user specifies
* clustering on the ingest query on column that has not been defined in the table catalog definition.
* or in the select clause. Should fail with validation error.
* clustering on the ingest query on column that has not been specified in the select clause. Should fail with
* validation error.
*/
@Test
public void testInsertTableWithClusteringWithClusteringOnBadColumn()
public void testInsertTableWithQueryDefinedClusteringOnNonSelectedColumn()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") + "\n" +
@ -690,6 +738,84 @@ public abstract class CalciteCatalogIngestionDmlTest extends CalciteIngestionDml
.verify();
}
/**
* Insert into a catalog table that has clustering defined on the table definition, but one of the clustering
* columns specified has not been specified in the select clause. Should fail with validation error.
*/
@Test
public void testInsertTableWithCatalogDefinedClusteringOnNonSelectedColumn()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "tableWithClustering") + "\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" e AS dim3,\n" +
" 1 AS cnt\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Column 'dim2' not found in any table (line [0], column [0])")
.verify();
}
/**
* Insert into a catalog table that has clustering defined on the table definition, but one of the clustering
* columns specified has not been specified in the select clause. Should fail with validation error.
*/
@Test
public void testInsertTableWithCatalogDefinedClusteringDesc()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "tableWithClusteringDesc") + "\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" d AS dim2,\n" +
" e AS dim3,\n" +
" 1 AS cnt\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Invalid CLUSTERED BY clause [`dim2` DESC]: cannot sort in descending order.")
.verify();
}
/**
* Insert into a catalog table that has clustering defined on the table definition, but one of the clustering
* columns specified has not been specified in the select clause. Should fail with validation error.
*/
@Test
public void testInsertTableWithQueryDefinedClusteringDesc()
{
testIngestionQuery()
.sql(StringUtils.format(dmlPrefixPattern, "tableWithClusteringDesc") + "\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" d AS dim2,\n" +
" e AS dim3,\n" +
" 1 AS cnt\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME\n" +
"CLUSTERED BY dim1 DESC")
.expectValidationError(
DruidException.class,
"Invalid CLUSTERED BY clause [`dim1` DESC]: cannot sort in descending order.")
.verify();
}
/**
* Adding a new column during group by ingestion that is not defined in a non-sealed table should succeed.
*/

View File

@ -1113,7 +1113,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql(
"INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo "
+ "CLUSTERED BY 2, dim1 DESC, CEIL(m2)"
+ "CLUSTERED BY 2, dim1, CEIL(m2)"
)
.expectValidationError(invalidSqlIs(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"