INSERT/REPLACE dimension target column types are validated against source input expressions (#15962)

* * address remaining comments from https://github.com/apache/druid/pull/15836

* *  address remaining comments from https://github.com/apache/druid/pull/15908

* * add test that exposes relational algebra issue

* * simplify test exposing issue

* * fix

* * add tests for sealed / non-sealed

* * update test descriptions

* * fix test failure when -Ddruid.generic.useDefaultValueForNull=true

* * check type assignment based on natice Druid types

* * add tests that cover missing jacoco coverage

* * add replace tests

* * add more tests and comments about column ordering

* * simplify tests

* * review comments

* * remove commented line

* * STRING family types should be validated as non-null
This commit is contained in:
zachjsh 2024-03-25 12:34:07 -04:00 committed by GitHub
parent 82f443340d
commit 8370db106c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1320 additions and 207 deletions

View File

@ -1,173 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sql;
import org.apache.druid.catalog.CatalogException;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.catalog.sync.CachedMetadataCatalog;
import org.apache.druid.catalog.sync.MetadataCatalog;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.CalciteIngestionDmlTest;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Arrays;
import static org.junit.Assert.fail;
/**
* Test the use of catalog specs to drive MSQ ingestion.
*/
public class CatalogIngestionTest extends CalciteIngestionDmlTest
{
@RegisterExtension
public static final DerbyConnectorRule5 DERBY_CONNECTION_RULE = new DerbyConnectorRule5();
/**
* Signature for the foo datasource after applying catalog metadata.
*/
private static final RowSignature FOO_SIGNATURE = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("extra1", ColumnType.STRING)
.add("dim2", ColumnType.STRING)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m1", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.add("m2", ColumnType.DOUBLE)
.build();
private static CatalogStorage storage;
@Override
public CatalogResolver createCatalogResolver()
{
CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE);
storage = dbFixture.storage;
MetadataCatalog catalog = new CachedMetadataCatalog(
storage,
storage.schemaRegistry(),
storage.jsonMapper()
);
return new LiveCatalogResolver(catalog);
}
@Override
public void finalizeTestFramework(SqlTestFramework sqlTestFramework)
{
super.finalizeTestFramework(sqlTestFramework);
buildTargetDatasources();
buildFooDatasource();
}
private void buildTargetDatasources()
{
TableMetadata spec = TableBuilder.datasource("hourDs", "PT1H")
.build();
createTableMetadata(spec);
}
public void buildFooDatasource()
{
TableMetadata spec = TableBuilder.datasource("foo", "ALL")
.timeColumn()
.column("extra1", null)
.column("dim2", null)
.column("dim1", null)
.column("cnt", null)
.column("m1", Columns.DOUBLE)
.column("extra2", Columns.LONG)
.column("extra3", Columns.STRING)
.hiddenColumns(Arrays.asList("dim3", "unique_dim1"))
.sealed(true)
.build();
createTableMetadata(spec);
}
private void createTableMetadata(TableMetadata table)
{
try {
storage.tables().create(table);
}
catch (CatalogException e) {
fail(e.getMessage());
}
}
/**
* If the segment grain is given in the catalog then use this value is used.
*/
@Test
public void testInsertHourGrain()
{
testIngestionQuery()
.sql("INSERT INTO hourDs\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2")
.context(queryContextWithGranularity(Granularities.HOUR))
.build()
)
.verify();
}
/**
* If the segment grain is given in the catalog, and also by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testInsertHourGrainWithDay()
{
testIngestionQuery()
.sql("INSERT INTO hourDs\n" +
"SELECT * FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sql;
import org.apache.druid.catalog.CatalogException;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.catalog.sync.CachedMetadataCatalog;
import org.apache.druid.catalog.sync.MetadataCatalog;
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
import org.apache.druid.sql.calcite.CalciteCatalogInsertTest;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.Assert.fail;
/**
* Test the use of catalog specs to drive MSQ ingestion.
*/
public class CatalogInsertTest extends CalciteCatalogInsertTest
{
@RegisterExtension
public static final DerbyConnectorRule5 DERBY_CONNECTION_RULE = new DerbyConnectorRule5();
private static CatalogStorage storage;
@Override
public CatalogResolver createCatalogResolver()
{
CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE);
storage = dbFixture.storage;
MetadataCatalog catalog = new CachedMetadataCatalog(
storage,
storage.schemaRegistry(),
storage.jsonMapper()
);
return new LiveCatalogResolver(catalog);
}
@Override
public void finalizeTestFramework(SqlTestFramework sqlTestFramework)
{
super.finalizeTestFramework(sqlTestFramework);
buildDatasources();
}
public void buildDatasources()
{
resolvedTables.forEach((datasourceName, datasourceTable) -> {
DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType());
}
);
if (catalogMetadata.hiddenColumns() != null && !catalogMetadata.hiddenColumns().isEmpty()) {
tableBuilder.hiddenColumns(catalogMetadata.hiddenColumns());
}
if (catalogMetadata.isSealed()) {
tableBuilder.sealed(true);
}
if (catalogMetadata.clusterKeys() != null && !catalogMetadata.clusterKeys().isEmpty()) {
tableBuilder.clusterColumns(catalogMetadata.clusterKeys().toArray(new ClusterKeySpec[0]));
}
createTableMetadata(tableBuilder.build());
});
DatasourceFacade catalogMetadata =
((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType());
}
);
}
private void createTableMetadata(TableMetadata table)
{
try {
storage.tables().create(table);
}
catch (CatalogException e) {
fail(e.getMessage());
}
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sql;
import org.apache.druid.catalog.CatalogException;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.catalog.sync.CachedMetadataCatalog;
import org.apache.druid.catalog.sync.MetadataCatalog;
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
import org.apache.druid.sql.calcite.CalciteCatalogReplaceTest;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.Assert.fail;
/**
* Test the use of catalog specs to drive MSQ ingestion.
*/
public class CatalogReplaceTest extends CalciteCatalogReplaceTest
{
@RegisterExtension
public static final DerbyConnectorRule5 DERBY_CONNECTION_RULE = new DerbyConnectorRule5();
private static CatalogStorage storage;
@Override
public CatalogResolver createCatalogResolver()
{
CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE);
storage = dbFixture.storage;
MetadataCatalog catalog = new CachedMetadataCatalog(
storage,
storage.schemaRegistry(),
storage.jsonMapper()
);
return new LiveCatalogResolver(catalog);
}
@Override
public void finalizeTestFramework(SqlTestFramework sqlTestFramework)
{
super.finalizeTestFramework(sqlTestFramework);
buildDatasources();
}
public void buildDatasources()
{
resolvedTables.forEach((datasourceName, datasourceTable) -> {
DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType());
}
);
if (catalogMetadata.hiddenColumns() != null && !catalogMetadata.hiddenColumns().isEmpty()) {
tableBuilder.hiddenColumns(catalogMetadata.hiddenColumns());
}
if (catalogMetadata.isSealed()) {
tableBuilder.sealed(true);
}
if (catalogMetadata.clusterKeys() != null && !catalogMetadata.clusterKeys().isEmpty()) {
tableBuilder.clusterColumns(catalogMetadata.clusterKeys().toArray(new ClusterKeySpec[0]));
}
createTableMetadata(tableBuilder.build());
});
DatasourceFacade catalogMetadata =
((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType());
}
);
}
private void createTableMetadata(TableMetadata table)
{
try {
storage.tables().create(table);
}
catch (CatalogException e) {
fail(e.getMessage());
}
}
}

View File

@ -241,18 +241,7 @@ public class DruidSqlParserUtils
private static Granularity convertSqlLiteralCharToGranularity(SqlLiteral literal)
{
String value = literal.getValueAs(String.class);
try {
return Granularity.fromString(value);
}
catch (IllegalArgumentException e) {
try {
return new PeriodGranularity(new Period(value), null, null);
}
catch (Exception e2) {
throw makeInvalidPartitionByException(literal);
}
}
return convertStringToGranularity(literal.getValueAs(String.class), literal);
}
private static Granularity convertSqlIdentiferToGranularity(SqlIdentifier identifier)
@ -260,7 +249,11 @@ public class DruidSqlParserUtils
if (identifier.names.isEmpty()) {
throw makeInvalidPartitionByException(identifier);
}
String value = identifier.names.get(0);
return convertStringToGranularity(identifier.names.get(0), identifier);
}
private static Granularity convertStringToGranularity(String value, SqlNode node)
{
try {
return Granularity.fromString(value);
}
@ -269,7 +262,7 @@ public class DruidSqlParserUtils
return new PeriodGranularity(new Period(value), null, null);
}
catch (Exception e2) {
throw makeInvalidPartitionByException(identifier);
throw makeInvalidPartitionByException(node);
}
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.prepare.BaseDruidSqlValidator;
import org.apache.calcite.prepare.CalciteCatalogReader;
@ -34,25 +33,34 @@ import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlUpdate;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindow;
import org.apache.calcite.sql.SqlWith;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.IdentifierNamespace;
import org.apache.calcite.sql.validate.SqlNonNullableAccessors;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql.validate.SqlValidatorTable;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Static;
import org.apache.calcite.util.Util;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
@ -64,6 +72,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
/**
@ -77,14 +86,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
// Copied here from MSQE since that extension is not visible here.
public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
public interface ValidatorContext
{
Map<String, Object> queryContextMap();
CatalogResolver catalog();
String druidSchemaName();
ObjectMapper jsonMapper();
}
private final PlannerContext plannerContext;
protected DruidSqlValidator(
@ -153,6 +154,12 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
super.validateWindow(windowOrId, scope, call);
}
/**
* Most of the implementation here is copied over from {@link org.apache.calcite.sql.validate.SqlValidator#validateInsert(SqlInsert)}
* we've extended, refactored, and extracted methods, to fit out needs, and added comments where appropriate.
*
* @param insert INSERT statement
*/
@Override
public void validateInsert(final SqlInsert insert)
{
@ -173,7 +180,10 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
}
// The target namespace is both the target table ID and the row type for that table.
final SqlValidatorNamespace targetNamespace = getNamespace(insert);
final SqlValidatorNamespace targetNamespace = Objects.requireNonNull(
getNamespace(insert),
() -> "namespace for " + insert
);
final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
// The target is a new or existing datasource.
final DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
@ -226,6 +236,20 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
// Determine the output (target) schema.
final RelDataType targetType = validateTargetType(scope, insertNs, insert, sourceType, tableMetadata);
// WITH node type is computed to be the type of the body recursively in
// org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery}. If this computed type
// is different than the type validated and stored for the node in memory a nasty relational
// algebra error will occur in org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType.
// During the validateTargetType call above, the WITH body node validated type may be updated
// with any coercions applied. We update the validated node type of the WITH node here so
// that they are consistent.
if (source instanceof SqlWith) {
final RelDataType withBodyType = getValidatedNodeTypeIfKnown(((SqlWith) source).body);
if (withBodyType != null) {
setValidatedNodeType(source, withBodyType);
}
}
// Set the type for the INSERT/REPLACE node
setValidatedNodeType(insert, targetType);
@ -379,10 +403,11 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
for (final RelDataTypeField sourceField : sourceFields) {
// Check that there are no unnamed columns in the insert.
if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) {
throw InvalidSqlInput.exception(
throw buildCalciteContextException(
"Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually "
+ "the result of applying a function without having an AS clause, please ensure that all function calls"
+ "are named with an AS clause as in \"func(X) as myColumn\"."
+ "are named with an AS clause as in \"func(X) as myColumn\".",
getSqlNodeFor(insert, sourceFields.indexOf(sourceField))
);
}
}
@ -424,19 +449,26 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
// extensions which are not loaded. Those details are not known at the time
// of this code so we are not yet in a position to make the right decision.
// This is a task to be revisited when we have more information.
final String sqlTypeName = definedCol.sqlStorageType();
if (sqlTypeName == null) {
if (definedCol.sqlStorageType() == null) {
// Don't know the storage type. Just skip this one: Druid types are
// fluid so let Druid sort out what to store. This is probably not the right
// answer, but should avoid problems until full type system support is completed.
fields.add(Pair.of(colName, sourceField.getType()));
continue;
}
RelDataType relType = typeFactory.createSqlType(SqlTypeName.get(sqlTypeName));
fields.add(Pair.of(
colName,
typeFactory.createTypeWithNullability(relType, true)
));
SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType());
RelDataType relType = typeFactory.createSqlType(sqlTypeName);
if (NullHandling.replaceWithDefault() && !SqlTypeFamily.STRING.contains(relType)) {
fields.add(Pair.of(
colName,
relType
));
} else {
fields.add(Pair.of(
colName,
typeFactory.createTypeWithNullability(relType, true)
));
}
}
// Perform the SQL-standard check: that the SELECT column can be
@ -449,6 +481,89 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
return targetType;
}
@Override
protected void checkTypeAssignment(
@Nullable SqlValidatorScope sourceScope,
SqlValidatorTable table,
RelDataType sourceRowType,
RelDataType targetRowType,
final SqlNode query
)
{
final List<RelDataTypeField> sourceFields = sourceRowType.getFieldList();
List<RelDataTypeField> targetFields = targetRowType.getFieldList();
final int sourceCount = sourceFields.size();
for (int i = 0; i < sourceCount; ++i) {
RelDataType sourceFielRelDataType = sourceFields.get(i).getType();
RelDataType targetFieldRelDataType = targetFields.get(i).getType();
ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType);
ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType);
if (targetFieldColumnType != ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)) {
SqlNode node = getNthExpr(query, i, sourceCount);
String targetTypeString;
String sourceTypeString;
if (SqlTypeUtil.areCharacterSetsMismatched(
sourceFielRelDataType,
targetFieldRelDataType)) {
sourceTypeString = sourceFielRelDataType.getFullTypeString();
targetTypeString = targetFieldRelDataType.getFullTypeString();
} else {
sourceTypeString = sourceFielRelDataType.toString();
targetTypeString = targetFieldRelDataType.toString();
}
throw newValidationError(node,
Static.RESOURCE.typeNotAssignable(
targetFields.get(i).getName(), targetTypeString,
sourceFields.get(i).getName(), sourceTypeString));
}
}
// the call to base class definition will insert implicit casts / coercions where needed.
super.checkTypeAssignment(sourceScope, table, sourceRowType, targetRowType, query);
}
/**
* Locates the n'th expression in an INSERT or UPDATE query.
*
* @param query Query
* @param ordinal Ordinal of expression
* @param sourceCount Number of expressions
* @return Ordinal'th expression, never null
*/
private static SqlNode getNthExpr(SqlNode query, int ordinal, int sourceCount)
{
if (query instanceof SqlInsert) {
SqlInsert insert = (SqlInsert) query;
if (insert.getTargetColumnList() != null) {
return insert.getTargetColumnList().get(ordinal);
} else {
return getNthExpr(
insert.getSource(),
ordinal,
sourceCount);
}
} else if (query instanceof SqlUpdate) {
SqlUpdate update = (SqlUpdate) query;
if (update.getSourceExpressionList() != null) {
return update.getSourceExpressionList().get(ordinal);
} else {
return getNthExpr(
SqlNonNullableAccessors.getSourceSelect(update),
ordinal, sourceCount);
}
} else if (query instanceof SqlSelect) {
SqlSelect select = (SqlSelect) query;
SqlNodeList selectList = SqlNonNullableAccessors.getSelectList(select);
if (selectList.size() == sourceCount) {
return selectList.get(ordinal);
} else {
return query; // give up
}
} else {
return query; // give up
}
}
private boolean isPrecedingOrFollowing(@Nullable SqlNode bound)
{
if (bound == null) {
@ -530,4 +645,17 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
pos.getEndColumnNum()
);
}
private SqlNode getSqlNodeFor(SqlInsert insert, int idx)
{
SqlNode src = insert.getSource();
if (src instanceof SqlSelect) {
SqlSelect sqlSelect = (SqlSelect) src;
SqlNodeList selectList = sqlSelect.getSelectList();
if (idx < selectList.size()) {
return selectList.get(idx);
}
}
return src;
}
}

View File

@ -175,7 +175,7 @@ public class DatasourceTable extends DruidTable
this.columns = columns;
}
private static Map<String, EffectiveColumnMetadata> toEffectiveColumns(RowSignature rowSignature)
public static Map<String, EffectiveColumnMetadata> toEffectiveColumns(RowSignature rowSignature)
{
Map<String, EffectiveColumnMetadata> columns = new HashMap<>();
for (int i = 0; i < rowSignature.size(); i++) {

View File

@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.model.ColumnSpec;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableDefn;
import org.apache.druid.catalog.model.TableSpec;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.DruidTable;
public class CalciteCatalogIngestionDmlTest extends CalciteIngestionDmlTest
{
public ImmutableMap<String, DruidTable> resolvedTables = ImmutableMap.of(
"hourDs", new DatasourceTable(
RowSignature.builder().addTimeColumn().build(),
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("hourDs"),
RowSignature.builder().addTimeColumn().build(),
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"foo",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.SEGMENT_GRANULARITY_PROPERTY, "PT1H"),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(
RowSignature.builder()
.addTimeColumn()
.build()),
false
)
),
"noPartitonedBy", new DatasourceTable(
RowSignature.builder().addTimeColumn().build(),
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("hourDs"),
RowSignature.builder().addTimeColumn().build(),
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"foo",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(
RowSignature.builder()
.addTimeColumn()
.build()),
false
)
),
"strictTableWithNoDefinedSchema", new DatasourceTable(
RowSignature.builder().build(),
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("strictTableWithNoDefinedSchema"),
RowSignature.builder().build(),
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"strictTableWithNoDefinedSchema",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(DatasourceDefn.TABLE_TYPE, ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true), null),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(RowSignature.builder().build()),
false
)
),
"foo", new DatasourceTable(
FOO_TABLE_SIGNATURE,
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("foo"),
FOO_TABLE_SIGNATURE,
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"foo",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null),
new ColumnSpec("dim1", Columns.STRING, null),
new ColumnSpec("dim2", Columns.STRING, null),
new ColumnSpec("dim3", Columns.STRING, null),
new ColumnSpec("cnt", Columns.LONG, null),
new ColumnSpec("m1", Columns.FLOAT, null),
new ColumnSpec("m2", Columns.DOUBLE, null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(FOO_TABLE_SIGNATURE),
false
)
),
"fooSealed", new DatasourceTable(
FOO_TABLE_SIGNATURE,
new DatasourceTable.PhysicalDatasourceMetadata(
new TableDataSource("foo"),
FOO_TABLE_SIGNATURE,
false,
false
),
new DatasourceTable.EffectiveMetadata(
new DatasourceFacade(new ResolvedTable(
new TableDefn(
"foo",
DatasourceDefn.TABLE_TYPE,
null,
null
),
new TableSpec(
DatasourceDefn.TABLE_TYPE,
ImmutableMap.of(DatasourceDefn.SEALED_PROPERTY, true),
ImmutableList.of(
new ColumnSpec("__time", Columns.TIME_COLUMN, null),
new ColumnSpec("dim1", Columns.STRING, null),
new ColumnSpec("dim2", Columns.STRING, null),
new ColumnSpec("dim3", Columns.STRING, null),
new ColumnSpec("cnt", Columns.LONG, null),
new ColumnSpec("m1", Columns.FLOAT, null),
new ColumnSpec("m2", Columns.DOUBLE, null)
)
),
MAPPER
)),
DatasourceTable.EffectiveMetadata.toEffectiveColumns(FOO_TABLE_SIGNATURE),
false
)
)
);
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Override
public CatalogResolver createCatalogResolver()
{
return new CatalogResolver.NullCatalogResolver() {
@Override
public DruidTable resolveDatasource(
final String tableName,
final DatasourceTable.PhysicalDatasourceMetadata dsMetadata
)
{
if (resolvedTables.get(tableName) != null) {
return resolvedTables.get(tableName);
}
return dsMetadata == null ? null : new DatasourceTable(dsMetadata);
}
};
}
}

View File

@ -0,0 +1,320 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.Externals;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.jupiter.api.Test;
/**
* Test for INSERT DML statements for tables defined in catalog.
*/
public class CalciteCatalogInsertTest extends CalciteCatalogIngestionDmlTest
{
/**
* If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
* value from the catalog.
*/
@Test
public void testInsertHourGrainPartitonedByFromCatalog()
{
testIngestionQuery()
.sql("INSERT INTO hourDs\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.HOUR))
.build()
)
.verify();
}
/**
* If the segment grain is given in the catalog, and also by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testInsertHourGrainWithDayPartitonedByFromQuery()
{
testIngestionQuery()
.sql("INSERT INTO hourDs\n" +
"SELECT * FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
/**
* If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
* validation error.
*/
@Test
public void testInsertNoPartitonedByFromCatalog()
{
testIngestionQuery()
.sql("INSERT INTO noPartitonedBy\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectValidationError(
DruidException.class,
"Operation [INSERT] requires a PARTITIONED BY to be explicitly defined, but none was found."
)
.verify();
}
/**
* If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
{
testIngestionQuery()
.sql("INSERT INTO noPartitonedBy\n" +
"SELECT * FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
/**
* Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
*/
@Test
public void testInsertAddNonDefinedColumnIntoNonSealedCatalogTable()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.build();
testIngestionQuery()
.sql("INSERT INTO foo\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2,\n" +
" e AS extra3\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
expressionVirtualColumn("v1", "1", ColumnType.LONG),
expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("b", "e", "v0", "v1", "v2", "v3")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
/**
* Adding a new column during ingestion that is not defined in a sealed table should fail with
* proper validation error.
*/
@Test
public void testInsertAddNonDefinedColumnIntoSealedCatalogTable()
{
testIngestionQuery()
.sql("INSERT INTO fooSealed\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectValidationError(
DruidException.class,
"Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
)
.verify();
}
/**
* Inserting into a catalog table with a WITH source succeeds
*/
@Test
public void testInsertWithSourceIntoCatalogTable()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.build();
testIngestionQuery()
.sql("INSERT INTO \"foo\"\n" +
"WITH \"ext\" AS (\n" +
" SELECT *\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
")\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2,\n" +
" e AS extra3\n" +
"FROM \"ext\"\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
expressionVirtualColumn("v1", "1", ColumnType.LONG),
expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema.
.columns("b", "e", "v0", "v1", "v2", "v3")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
@Test
public void testInsertIntoExistingStrictNoDefinedSchema()
{
testIngestionQuery()
.sql("INSERT INTO strictTableWithNoDefinedSchema SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
.verify();
}
@Test
public void testInsertIntoExistingWithIncompatibleTypeAssignment()
{
testIngestionQuery()
.sql("INSERT INTO foo\n"
+ "SELECT\n"
+ " __time AS __time,\n"
+ " ARRAY[dim1] AS dim1\n"
+ "FROM foo\n"
+ "PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
.verify();
}
}

View File

@ -0,0 +1,316 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.Externals;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.jupiter.api.Test;
/**
* Test for REPLACE DML statements for tables defined in catalog.
*/
public class CalciteCatalogReplaceTest extends CalciteCatalogIngestionDmlTest
{
/**
* If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
* value from the catalog.
*/
@Test
public void testReplaceHourGrainPartitonedByFromCatalog()
{
testIngestionQuery()
.sql("REPLACE INTO hourDs OVERWRITE ALL\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.HOUR))
.build()
)
.verify();
}
/**
* If the segment grain is given in the catalog, and also by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testReplaceHourGrainWithDayPartitonedByFromQuery()
{
testIngestionQuery()
.sql("REPLACE INTO hourDs OVERWRITE ALL\n" +
"SELECT *FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
/**
* If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
* validation error.
*/
@Test
public void testInsertNoPartitonedByFromCatalog()
{
testIngestionQuery()
.sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectValidationError(
DruidException.class,
"Operation [REPLACE] requires a PARTITIONED BY to be explicitly defined, but none was found."
)
.verify();
}
/**
* If the segment grain is absent in the catalog, but given by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testInsertNoPartitonedByWithDayPartitonedByFromQuery()
{
testIngestionQuery()
.sql("REPLACE INTO noPartitonedBy OVERWRITE ALL\n" +
"SELECT * FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("noPartitonedBy", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceWrite("noPartitonedBy"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
/**
* Adding a new column during ingestion that is not defined in a non-sealed table should succeed.
*/
@Test
public void testReplaceAddNonDefinedColumnIntoNonSealedCatalogTable()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.build();
testIngestionQuery()
.sql("REPLACE INTO foo OVERWRITE ALL\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2,\n" +
" e AS extra3\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
expressionVirtualColumn("v1", "1", ColumnType.LONG),
expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema. Here we just check that the
// set of columns is correct, but not their order.
.columns("b", "e", "v0", "v1", "v2", "v3")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
/**
* Adding a new column during ingestion that is not defined in a sealed table should fail with
* proper validation error.
*/
@Test
public void testReplaceAddNonDefinedColumnIntoSealedCatalogTable()
{
testIngestionQuery()
.sql("REPLACE INTO fooSealed OVERWRITE ALL\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectValidationError(
DruidException.class,
"Column [extra2] is not defined in the target table [druid.fooSealed] strict schema"
)
.verify();
}
/**
* Replacing into a catalog table with a WITH source succeeds
*/
@Test
public void testReplaceWithSourceIntoCatalogTable()
{
ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("2022-12-26T12:34:56,extra,10,\"20\",foo\n"),
new CsvInputFormat(ImmutableList.of("a", "b", "c", "d", "e"), null, false, false, 0),
RowSignature.builder()
.add("a", ColumnType.STRING)
.add("b", ColumnType.STRING)
.add("c", ColumnType.LONG)
.add("d", ColumnType.STRING)
.add("e", ColumnType.STRING)
.build()
);
final RowSignature signature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m2", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.build();
testIngestionQuery()
.sql("REPLACE INTO \"foo\" OVERWRITE ALL\n" +
"WITH \"ext\" AS (\n" +
" SELECT *\n" +
"FROM TABLE(inline(\n" +
" data => ARRAY['2022-12-26T12:34:56,extra,10,\"20\",foo'],\n" +
" format => 'csv'))\n" +
" (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e VARCHAR)\n" +
")\n" +
"SELECT\n" +
" TIME_PARSE(a) AS __time,\n" +
" b AS dim1,\n" +
" 1 AS cnt,\n" +
" c AS m2,\n" +
" CAST(d AS BIGINT) AS extra2,\n" +
" e AS extra3\n" +
"FROM \"ext\"\n" +
"PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("foo", signature)
.expectResources(dataSourceWrite("foo"), Externals.externalRead("EXTERNAL"))
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "timestamp_parse(\"a\",null,'UTC')", ColumnType.LONG),
expressionVirtualColumn("v1", "1", ColumnType.LONG),
expressionVirtualColumn("v2", "CAST(\"c\", 'DOUBLE')", ColumnType.DOUBLE),
expressionVirtualColumn("v3", "CAST(\"d\", 'LONG')", ColumnType.LONG)
)
// Scan query lists columns in alphabetical order independent of the
// SQL project list or the defined schema. Here we just check that the
// set of columns is correct, but not their order.
.columns("b", "e", "v0", "v1", "v2", "v3")
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
@Test
public void testReplaceIntoExistingStrictNoDefinedSchema()
{
testIngestionQuery()
.sql("REPLACE INTO strictTableWithNoDefinedSchema OVERWRITE ALL SELECT __time AS __time FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Column [__time] is not defined in the target table [druid.strictTableWithNoDefinedSchema] strict schema")
.verify();
}
@Test
public void testReplaceIntoExistingWithIncompatibleTypeAssignment()
{
testIngestionQuery()
.sql("REPLACE INTO foo OVERWRITE ALL\n"
+ "SELECT\n"
+ " __time AS __time,\n"
+ " ARRAY[dim1] AS dim1\n"
+ "FROM foo\n"
+ "PARTITIONED BY ALL TIME")
.expectValidationError(
DruidException.class,
"Cannot assign to target field 'dim1' of type VARCHAR from source field 'dim1' of type VARCHAR ARRAY (line [4], column [3])")
.verify();
}
}

View File

@ -1644,6 +1644,15 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.verify();
}
@Test
public void testInsertWithInvalidColumnName2InIngest()
{
testIngestionQuery()
.sql("INSERT INTO t SELECT __time, 1+1 FROM foo PARTITIONED BY ALL")
.expectValidationError(invalidSqlContains("Insertion requires columns to be named"))
.verify();
}
@Test
public void testInsertWithUnnamedColumnInNestedSelectStatement()
{

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import org.apache.druid.error.DruidException;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.CatalogResolver.NullCatalogResolver;
import org.junit.jupiter.api.Test;
/**
* Test for the "strict" feature of the catalog which can restrict INSERT statements
* to only work with existing datasources. The strict option is a config option which
* we enable only for this one test.
*/
public class CalciteStrictInsertTest extends CalciteIngestionDmlTest
{
@Override
public CatalogResolver createCatalogResolver()
{
return new NullCatalogResolver() {
@Override
public boolean ingestRequiresExistingTable()
{
return true;
}
};
}
@Test
public void testInsertIntoNewTable()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(DruidException.class, "Cannot INSERT into [dst] because it does not exist")
.verify();
}
@Test
public void testInsertIntoExisting()
{
testIngestionQuery()
.sql("INSERT INTO druid.numfoo SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("numfoo", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("numfoo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
}