Move INSERT & REPLACE validation to the Calcite validator (#15908)

This PR contains a portion of the changes from the inactive draft PR for integrating the catalog with the Calcite planner https://github.com/apache/druid/pull/13686 from @paul-rogers, Refactoring the IngestHandler and subclasses to produce a validated SqlInsert instance node instead of the previous Insert source node. The SqlInsert node is then validated in the calcite validator. The validation that is implemented as part of this pr, is only that for the source node, and some of the validation that was previously done in the ingest handlers. As part of this change, the partitionedBy clause can be supplied by the table catalog metadata if it exists, and can be omitted from the ingest time query in this case.
This commit is contained in:
zachjsh 2024-02-22 14:01:59 -05:00 committed by GitHub
parent f37d019fe6
commit 8ebf237576
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 876 additions and 117 deletions

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.catalog.sql;
import org.apache.druid.catalog.CatalogException;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.storage.CatalogStorage;
import org.apache.druid.catalog.storage.CatalogTests;
import org.apache.druid.catalog.sync.CachedMetadataCatalog;
import org.apache.druid.catalog.sync.MetadataCatalog;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.CalciteIngestionDmlTest;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Arrays;
import static org.junit.Assert.fail;
/**
* Test the use of catalog specs to drive MSQ ingestion.
*/
public class CatalogIngestionTest extends CalciteIngestionDmlTest
{
@ClassRule
public static final TestDerbyConnector.DerbyConnectorRule DERBY_CONNECTION_RULE =
new TestDerbyConnector.DerbyConnectorRule();
/**
* Signature for the foo datasource after applying catalog metadata.
*/
private static final RowSignature FOO_SIGNATURE = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("extra1", ColumnType.STRING)
.add("dim2", ColumnType.STRING)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG)
.add("m1", ColumnType.DOUBLE)
.add("extra2", ColumnType.LONG)
.add("extra3", ColumnType.STRING)
.add("m2", ColumnType.DOUBLE)
.build();
private static CatalogStorage storage;
@Override
public CatalogResolver createCatalogResolver()
{
CatalogTests.DbFixture dbFixture = new CatalogTests.DbFixture(DERBY_CONNECTION_RULE);
storage = dbFixture.storage;
MetadataCatalog catalog = new CachedMetadataCatalog(
storage,
storage.schemaRegistry(),
storage.jsonMapper()
);
return new LiveCatalogResolver(catalog);
}
@Override
public void finalizeTestFramework(SqlTestFramework sqlTestFramework)
{
super.finalizeTestFramework(sqlTestFramework);
buildTargetDatasources();
buildFooDatasource();
}
private void buildTargetDatasources()
{
TableMetadata spec = TableBuilder.datasource("hourDs", "PT1H")
.build();
createTableMetadata(spec);
}
public void buildFooDatasource()
{
TableMetadata spec = TableBuilder.datasource("foo", "ALL")
.timeColumn()
.column("extra1", null)
.column("dim2", null)
.column("dim1", null)
.column("cnt", null)
.column("m1", Columns.DOUBLE)
.column("extra2", Columns.LONG)
.column("extra3", Columns.STRING)
.hiddenColumns(Arrays.asList("dim3", "unique_dim1"))
.sealed(true)
.build();
createTableMetadata(spec);
}
private void createTableMetadata(TableMetadata table)
{
try {
storage.tables().create(table);
}
catch (CatalogException e) {
fail(e.getMessage());
}
}
/**
* If the segment grain is given in the catalog then use this value is used.
*/
@Test
public void testInsertHourGrain()
{
testIngestionQuery()
.sql("INSERT INTO hourDs\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2")
.context(queryContextWithGranularity(Granularities.HOUR))
.build()
)
.verify();
}
/**
* If the segment grain is given in the catalog, and also by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testInsertHourGrainWithDay()
{
testIngestionQuery()
.sql("INSERT INTO hourDs\n" +
"SELECT * FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
}

View File

@ -162,6 +162,9 @@ import org.apache.druid.sql.calcite.export.TestExportStorageConnector;
import org.apache.druid.sql.calcite.export.TestExportStorageConnectorProvider;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.external.HttpOperatorConversion;
import org.apache.druid.sql.calcite.external.InlineOperatorConversion;
import org.apache.druid.sql.calcite.external.LocalOperatorConversion;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -351,6 +354,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
binder.install(new NestedDataModule());
NestedDataModule.registerHandlersAndSerde();
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HttpOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, InlineOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, LocalOperatorConversion.class);
}
@Override

View File

@ -107,14 +107,14 @@ SqlTypeNameSpec DruidType() :
}
// Parses the supported file formats for export.
String FileFormat() :
SqlIdentifier FileFormat() :
{
SqlNode format;
}
{
format = SimpleIdentifier()
{
return format.toString();
return (SqlIdentifier) format;
}
}

View File

@ -35,7 +35,7 @@ SqlNode DruidSqlInsertEof() :
final Pair<SqlNodeList, SqlNodeList> p;
SqlGranularityLiteral partitionedBy = null;
SqlNodeList clusteredBy = null;
String exportFileFormat = null;
SqlIdentifier exportFileFormat = null;
}
{
(

View File

@ -30,7 +30,7 @@ SqlNode DruidSqlReplaceEof() :
SqlNodeList clusteredBy = null;
final Pair<SqlNodeList, SqlNodeList> p;
SqlNode replaceTimeQuery = null;
String exportFileFormat = null;
SqlIdentifier exportFileFormat = null;
}
{
<REPLACE> { s = span(); }
@ -90,7 +90,7 @@ SqlNode DruidSqlReplaceEof() :
<EOF>
{
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, destination, source, columnList);
return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, replaceTimeQuery, exportFileFormat);
return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, exportFileFormat, replaceTimeQuery);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.parser;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
@ -43,7 +44,7 @@ public abstract class DruidSqlIngest extends SqlInsert
@Nullable
protected final SqlNodeList clusteredBy;
@Nullable
private final String exportFileFormat;
private final SqlIdentifier exportFileFormat;
public DruidSqlIngest(
SqlParserPos pos,
@ -53,7 +54,7 @@ public abstract class DruidSqlIngest extends SqlInsert
SqlNodeList columnList,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
@Nullable SqlIdentifier exportFileFormat
)
{
super(pos, keywords, targetTable, source, columnList);
@ -76,7 +77,7 @@ public abstract class DruidSqlIngest extends SqlInsert
}
@Nullable
public String getExportFileFormat()
public SqlIdentifier getExportFileFormat()
{
return exportFileFormat;
}
@ -88,6 +89,7 @@ public abstract class DruidSqlIngest extends SqlInsert
.addAll(super.getOperandList())
.add(partitionedBy)
.add(clusteredBy)
.add(exportFileFormat)
.build();
}
}

View File

@ -19,12 +19,14 @@
package org.apache.druid.sql.calcite.parser;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -39,13 +41,13 @@ public class DruidSqlInsert extends DruidSqlIngest
public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity";
// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
public static final SqlOperator OPERATOR = DruidSqlIngestOperator.INSERT_OPERATOR;
public static DruidSqlInsert create(
@Nonnull SqlInsert insertNode,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
@Nullable SqlIdentifier exportFileFormat
)
{
return new DruidSqlInsert(
@ -74,7 +76,7 @@ public class DruidSqlInsert extends DruidSqlIngest
SqlNodeList columnList,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
@Nullable SqlIdentifier exportFileFormat
)
{
super(
@ -110,7 +112,7 @@ public class DruidSqlInsert extends DruidSqlIngest
writer.newlineAndIndent();
if (getExportFileFormat() != null) {
writer.keyword("AS");
writer.print(getExportFileFormat());
writer.print(getExportFileFormat().toString());
writer.newlineAndIndent();
}
getSource().unparse(writer, 0, 0);

View File

@ -19,16 +19,16 @@
package org.apache.druid.sql.calcite.parser;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
import org.apache.druid.sql.calcite.planner.DruidSqlIngestOperator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -43,7 +43,7 @@ public class DruidSqlReplace extends DruidSqlIngest
{
public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";
public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER);
public static final SqlOperator OPERATOR = DruidSqlIngestOperator.REPLACE_OPERATOR;
private final SqlNode replaceTimeQuery;
@ -51,8 +51,8 @@ public class DruidSqlReplace extends DruidSqlIngest
@Nonnull SqlInsert insertNode,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable SqlNode replaceTimeQuery,
@Nullable String exportFileFormat
@Nullable SqlIdentifier exportFileFormat,
@Nullable SqlNode replaceTimeQuery
)
{
return new DruidSqlReplace(
@ -63,8 +63,8 @@ public class DruidSqlReplace extends DruidSqlIngest
insertNode.getTargetColumnList(),
partitionedBy,
clusteredBy,
replaceTimeQuery,
exportFileFormat
exportFileFormat,
replaceTimeQuery
);
}
@ -82,8 +82,8 @@ public class DruidSqlReplace extends DruidSqlIngest
SqlNodeList columnList,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable SqlNode replaceTimeQuery,
@Nullable String exportFileFormat
@Nullable SqlIdentifier exportFileFormat,
@Nullable SqlNode replaceTimeQuery
)
{
super(
@ -137,7 +137,7 @@ public class DruidSqlReplace extends DruidSqlIngest
if (getExportFileFormat() != null) {
writer.keyword("AS");
writer.print(getExportFileFormat());
writer.print(getExportFileFormat().toString());
writer.newlineAndIndent();
}

View File

@ -283,7 +283,7 @@ public class CalcitePlanner implements Planner, ViewExpander
public RelRoot rel(SqlNode sql)
{
ensure(CalcitePlanner.State.STATE_4_VALIDATED);
SqlNode validatedSqlNode = Objects.requireNonNull(
Objects.requireNonNull(
this.validatedSqlNode,
"validatedSqlNode is null. Need to call #validate() first"
);
@ -295,11 +295,11 @@ public class CalcitePlanner implements Planner, ViewExpander
final SqlToRelConverter.Config config =
sqlToRelConverterConfig.withTrimUnusedFields(false);
final SqlToRelConverter sqlToRelConverter =
new SqlToRelConverter(this, validator,
new DruidSqlToRelConverter(this, validator,
createCatalogReader(), cluster, convertletTable, config
);
RelRoot root =
sqlToRelConverter.convertQuery(validatedSqlNode, false, true);
sqlToRelConverter.convertQuery(sql, false, true);
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
final RelBuilder relBuilder =
config.getRelBuilderFactory().create(cluster, null);

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.parser.SqlGranularityLiteral;
import java.util.HashSet;
import java.util.Set;
public class DruidSqlIngestOperator extends SqlSpecialOperator implements AuthorizableOperator
{
public static final SqlSpecialOperator INSERT_OPERATOR =
new DruidSqlInsertOperator();
public static final SqlSpecialOperator REPLACE_OPERATOR =
new DruidSqlReplaceOperator();
public static class DruidSqlInsertOperator extends DruidSqlIngestOperator
{
public DruidSqlInsertOperator()
{
super("INSERT");
}
@Override
public SqlCall createCall(
SqlLiteral functionQualifier,
SqlParserPos pos,
SqlNode... operands
)
{
return new DruidSqlInsert(
pos,
(SqlNodeList) operands[0],
operands[1],
operands[2],
(SqlNodeList) operands[3],
(SqlGranularityLiteral) operands[4],
(SqlNodeList) operands[5],
(SqlIdentifier) operands[6]
);
}
}
public static class DruidSqlReplaceOperator extends DruidSqlIngestOperator
{
public DruidSqlReplaceOperator()
{
super("REPLACE");
}
@Override
public SqlCall createCall(
SqlLiteral functionQualifier,
SqlParserPos pos,
SqlNode... operands
)
{
return new DruidSqlReplace(
pos,
(SqlNodeList) operands[0],
operands[1],
operands[2],
(SqlNodeList) operands[3],
(SqlGranularityLiteral) operands[4],
(SqlNodeList) operands[5],
(SqlIdentifier) operands[6],
operands[7]
);
}
}
public DruidSqlIngestOperator(String name)
{
super(name, SqlKind.INSERT);
}
@Override
public Set<ResourceAction> computeResources(SqlCall call, boolean inputSourceTypeSecurityEnabled)
{
// resource actions are computed in the respective ingest handlers.
return new HashSet<>();
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable.ViewExpander;
import org.apache.calcite.prepare.Prepare.CatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.SqlToRelConverter;
public class DruidSqlToRelConverter extends SqlToRelConverter
{
public DruidSqlToRelConverter(
final ViewExpander viewExpander,
final SqlValidator validator,
final CatalogReader catalogReader,
RelOptCluster cluster,
final SqlRexConvertletTable convertletTable,
final Config config
)
{
super(viewExpander, validator, catalogReader, cluster, convertletTable, config);
}
/**
* Convert a Druid {@code INSERT} or {@code REPLACE} statement. The code is the same
* as the normal conversion, except we don't actually create the final modify node.
* Druid has its own special way to handle inserts. (This should probably change in
* some future, but doing so requires changes in the SQL engine and MSQ, which is a bit
* invasive.)
*/
@Override
protected RelNode convertInsert(SqlInsert call)
{
// Get the target type: the column types we want to write into the target datasource.
final RelDataType targetRowType = validator.getValidatedNodeType(call);
assert targetRowType != null;
// Convert the underlying SELECT. We pushed the CLUSTERED BY clause into the SELECT
// as its ORDER BY. We claim this is the top query because MSQ doesn't actually
// use the Calcite insert node.
RelNode sourceRel = convertQueryRecursive(call.getSource(), true, targetRowType).project();
// We omit the column mapping and insert node that Calcite normally provides.
// Presumably MSQ does these its own way.
return sourceRel;
}
}

View File

@ -19,32 +19,72 @@
package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.prepare.BaseDruidSqlValidator;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.runtime.CalciteException;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindow;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.IdentifierNamespace;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql.validate.SqlValidatorTable;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* Druid extended SQL validator. (At present, it doesn't actually
* have any extensions yet, but it will soon.)
*/
class DruidSqlValidator extends BaseDruidSqlValidator
{
private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
// Copied here from MSQE since that extension is not visible here.
public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";
public interface ValidatorContext
{
Map<String, Object> queryContextMap();
CatalogResolver catalog();
String druidSchemaName();
ObjectMapper jsonMapper();
}
private final PlannerContext plannerContext;
protected DruidSqlValidator(
@ -113,6 +153,302 @@ class DruidSqlValidator extends BaseDruidSqlValidator
super.validateWindow(windowOrId, scope, call);
}
@Override
public void validateInsert(final SqlInsert insert)
{
final DruidSqlIngest ingestNode = (DruidSqlIngest) insert;
if (insert.isUpsert()) {
throw InvalidSqlInput.exception("UPSERT is not supported.");
}
// SQL-style INSERT INTO dst (a, b, c) is not (yet) supported.
final String operationName = insert.getOperator().getName();
if (insert.getTargetColumnList() != null) {
throw InvalidSqlInput.exception(
"Operation [%s] cannot be run with a target column list, given [%s (%s)]",
operationName,
ingestNode.getTargetTable(), ingestNode.getTargetColumnList()
);
}
// The target namespace is both the target table ID and the row type for that table.
final SqlValidatorNamespace targetNamespace = getNamespace(insert);
final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
// The target is a new or existing datasource.
final DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
// An existing datasource may have metadata.
final DatasourceFacade tableMetadata = table == null ? null : table.effectiveMetadata().catalogMetadata();
// Validate segment granularity, which depends on nothing else.
if (!(ingestNode.getTargetTable() instanceof ExternalDestinationSqlIdentifier)) {
Granularity effectiveGranularity = getEffectiveGranularity(operationName, ingestNode, tableMetadata);
// Note: though this is the validator, we cheat a bit and write the target
// granularity into the query context. Perhaps this step should be done
// during conversion, however, we've just worked out the granularity, so we
// do it here instead.
try {
plannerContext.queryContextMap().put(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
plannerContext.getPlannerToolbox().jsonMapper().writeValueAsString(effectiveGranularity)
);
}
catch (JsonProcessingException e) {
throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", effectiveGranularity);
}
}
// The source must be a SELECT
final SqlNode source = insert.getSource();
// Validate the source statement.
// Because of the non-standard Druid semantics, we can't define the target type: we don't know
// the target columns yet, and we can't infer types when they must come from the SELECT.
// Normally, the target type is known, and is pushed into the SELECT. In Druid, the SELECT
// usually defines the target types, unless the catalog says otherwise. Since catalog entries
// are optional, we don't know the target type until we validate the SELECT. (Also, we won't
// know names and we match by name.) Thus, we'd have to validate (to know names and types)
// to get the target types, but we need the target types to validate. Catch-22. So, we punt.
final SqlValidatorScope scope;
if (source instanceof SqlSelect) {
final SqlSelect sqlSelect = (SqlSelect) source;
validateSelect(sqlSelect, unknownType);
scope = null;
} else {
scope = scopes.get(source);
validateQuery(source, scope, unknownType);
}
final SqlValidatorNamespace sourceNamespace = namespaces.get(source);
final RelRecordType sourceType = (RelRecordType) sourceNamespace.getRowType();
// Determine the output (target) schema.
final RelDataType targetType = validateTargetType(scope, insertNs, insert, sourceType, tableMetadata);
// Set the type for the INSERT/REPLACE node
setValidatedNodeType(insert, targetType);
// Segment size
if (tableMetadata != null && !plannerContext.queryContextMap().containsKey(CTX_ROWS_PER_SEGMENT)) {
final Integer targetSegmentRows = tableMetadata.targetSegmentRows();
if (targetSegmentRows != null) {
plannerContext.queryContextMap().put(CTX_ROWS_PER_SEGMENT, targetSegmentRows);
}
}
}
/**
* Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
* or insert into an existing one. If the target exists, it must be a datasource. If it
* does not exist, the target must be in the datasource schema, normally "druid".
*/
private DatasourceTable validateInsertTarget(
final SqlValidatorNamespace targetNamespace,
final IdentifierNamespace insertNs,
final String operationName
)
{
// Get the target table ID
final SqlIdentifier destId = insertNs.getId();
if (destId.names.isEmpty()) {
// I don't think this can happen, but include a branch for it just in case.
throw InvalidSqlInput.exception("Operation [%s] requires a target table", operationName);
}
// Druid does not support 3+ part names.
final int n = destId.names.size();
if (n > 2) {
throw InvalidSqlInput.exception("Druid does not support 3+ part names: [%s]", destId, operationName);
}
String tableName = destId.names.get(n - 1);
// If this is a 2-part name, the first part must be the datasource schema.
if (n == 2 && !plannerContext.getPlannerToolbox().druidSchemaName().equals(destId.names.get(0))) {
throw InvalidSqlInput.exception(
"Table [%s] does not support operation [%s] because it is not a Druid datasource",
destId,
operationName
);
}
try {
// Try to resolve the table. Will fail if this is an INSERT into a new table.
validateNamespace(targetNamespace, unknownType);
SqlValidatorTable target = insertNs.resolve().getTable();
try {
return target.unwrap(DatasourceTable.class);
}
catch (Exception e) {
throw InvalidSqlInput.exception(
"Table [%s] does not support operation [%s] because it is not a Druid datasource",
destId,
operationName
);
}
}
catch (CalciteContextException e) {
// Something failed. Let's make sure it was the table lookup.
// The check is kind of a hack, but its the best we can do given that Calcite
// didn't expect this non-SQL use case.
if (e.getCause() instanceof SqlValidatorException && e.getMessage()
.contains(StringUtils.format("Object '%s' not found", tableName))) {
// The catalog implementation may be "strict": and require that the target
// table already exists, rather than the default "lenient" mode that can
// create a new table.
if (plannerContext.getPlannerToolbox().catalogResolver().ingestRequiresExistingTable()) {
throw InvalidSqlInput.exception("Cannot %s into [%s] because it does not exist", operationName, destId);
}
// New table. Validate the shape of the name.
IdUtils.validateId("table", tableName);
return null;
}
throw e;
}
}
/**
* Gets the effective PARTITIONED BY granularity. Resolves the granularity from the granularity specified on the
* ingest node, and on the table metadata as stored in catalog, if any. Mismatches between the 2 granularities are
* allowed if both are specified. The granularity specified on the ingest node is taken to be the effective
* granulartiy if specified. If no granulartiy is specified on either the ingestNode or in the table catalog entry
* for the table, an error is thrown.
*
* @param operationName The operation name
* @param ingestNode The ingest node.
* @param tableMetadata The table metadata as stored in the catalog, if any.
*
* @return The effective granularity
* @throws org.apache.druid.error.DruidException indicating invalud Sql if both the ingest node and table metadata
* for the respective target table have no PARTITIONED BY granularity defined.
*/
private Granularity getEffectiveGranularity(
final String operationName,
final DruidSqlIngest ingestNode,
@Nullable final DatasourceFacade tableMetadata
)
{
Granularity effectiveGranularity = null;
final Granularity ingestionGranularity = ingestNode.getPartitionedBy() != null
? ingestNode.getPartitionedBy().getGranularity()
: null;
if (ingestionGranularity != null) {
DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(ingestNode, ingestionGranularity);
effectiveGranularity = ingestionGranularity;
} else {
final Granularity definedGranularity = tableMetadata == null
? null
: tableMetadata.segmentGranularity();
if (definedGranularity != null) {
// Should already have been checked when creating the catalog entry
DruidSqlParserUtils.validateSupportedGranularityForPartitionedBy(null, definedGranularity);
effectiveGranularity = definedGranularity;
}
}
if (effectiveGranularity == null) {
throw InvalidSqlInput.exception(
"Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.",
operationName);
}
return effectiveGranularity;
}
/**
* Compute and validate the target type. In normal SQL, the engine would insert
* a project operator after the SELECT before the write to cast columns from the
* input type to the (compatible) defined output type. Druid doesn't work that way.
* In MSQ, the output the just is the input type. If the user wants to control the
* output type, then the user must manually insert any required CAST: Druid is not
* in the business of changing the type to suit the catalog.
* <p>
* As a result, we first propagate column names and types using Druid rules: the
* output is exactly what SELECT says it is. We then apply restrictions from the
* catalog. If the table is strict, only column names from the catalog can be
* used.
*/
private RelDataType validateTargetType(
SqlValidatorScope scope,
final IdentifierNamespace insertNs,
SqlInsert insert,
RelRecordType sourceType,
DatasourceFacade tableMetadata
)
{
final List<RelDataTypeField> sourceFields = sourceType.getFieldList();
for (final RelDataTypeField sourceField : sourceFields) {
// Check that there are no unnamed columns in the insert.
if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) {
throw InvalidSqlInput.exception(
"Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually "
+ "the result of applying a function without having an AS clause, please ensure that all function calls"
+ "are named with an AS clause as in \"func(X) as myColumn\"."
);
}
}
if (tableMetadata == null) {
return sourceType;
}
final boolean isStrict = tableMetadata.isSealed();
final List<Map.Entry<String, RelDataType>> fields = new ArrayList<>();
for (RelDataTypeField sourceField : sourceFields) {
final String colName = sourceField.getName();
final DatasourceFacade.ColumnFacade definedCol = tableMetadata.column(colName);
if (definedCol == null) {
if (isStrict) {
throw InvalidSqlInput.exception(
"Column [%s] is not defined in the target table [%s] strict schema",
colName,
insert.getTargetTable()
);
}
// Table is not strict: add a new column based on the SELECT column.
fields.add(Pair.of(colName, sourceField.getType()));
continue;
}
// If the column name is defined, but no type is given then, use the
// column type from SELECT.
if (!definedCol.hasType()) {
fields.add(Pair.of(colName, sourceField.getType()));
continue;
}
// Both the column name and type are provided. Use the name and type
// from the catalog.
// Note to future readers: this check is preliminary. It works for the
// simple column types and has not yet been extended to complex types, aggregates,
// types defined in extensions, etc. It may be that SQL
// has types that Druid cannot store. This may crop up with types defined in
// extensions which are not loaded. Those details are not known at the time
// of this code so we are not yet in a position to make the right decision.
// This is a task to be revisited when we have more information.
final String sqlTypeName = definedCol.sqlStorageType();
if (sqlTypeName == null) {
// Don't know the storage type. Just skip this one: Druid types are
// fluid so let Druid sort out what to store. This is probably not the right
// answer, but should avoid problems until full type system support is completed.
fields.add(Pair.of(colName, sourceField.getType()));
continue;
}
RelDataType relType = typeFactory.createSqlType(SqlTypeName.get(sqlTypeName));
fields.add(Pair.of(
colName,
typeFactory.createTypeWithNullability(relType, true)
));
}
// Perform the SQL-standard check: that the SELECT column can be
// converted to the target type. This check is retained to mimic SQL
// behavior, but doesn't do anything because we enforced exact type
// matches above.
final RelDataType targetType = typeFactory.createStructType(fields);
final SqlValidatorTable target = insertNs.resolve().getTable();
checkTypeAssignment(scope, target, sourceType, targetType, insert);
return targetType;
}
private boolean isPrecedingOrFollowing(@Nullable SqlNode bound)
{
if (bound == null) {

View File

@ -33,7 +33,6 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
@ -55,28 +54,23 @@ import org.apache.druid.sql.destination.TableDestination;
import org.apache.druid.storage.ExportStorageProvider;
import java.util.List;
import java.util.regex.Pattern;
public abstract class IngestHandler extends QueryHandler
{
private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
protected final Granularity ingestionGranularity;
protected Granularity ingestionGranularity;
protected IngestDestination targetDatasource;
private SqlNode validatedQueryNode;
IngestHandler(
HandlerContext handlerContext,
DruidSqlIngest ingestNode,
SqlNode queryNode,
SqlExplain explain
)
{
super(handlerContext, queryNode, explain);
ingestionGranularity = ingestNode.getPartitionedBy() != null ? ingestNode.getPartitionedBy().getGranularity() : null;
handlerContext.hook().captureInsert(ingestNode);
super(handlerContext, explain);
}
protected static SqlNode convertQuery(DruidSqlIngest sqlNode)
protected static SqlNode convertSourceQuery(DruidSqlIngest sqlNode)
{
SqlNode query = sqlNode.getSource();
@ -98,6 +92,7 @@ public abstract class IngestHandler extends QueryHandler
if (!query.isA(SqlKind.QUERY)) {
throw InvalidSqlInput.exception("Unexpected SQL statement type [%s], expected it to be a QUERY", query.getKind());
}
return query;
}
@ -123,7 +118,7 @@ public abstract class IngestHandler extends QueryHandler
.build("Export statements do not support a PARTITIONED BY or CLUSTERED BY clause.");
}
final String exportFileFormat = ingestNode().getExportFileFormat();
final SqlIdentifier exportFileFormat = ingestNode().getExportFileFormat();
if (exportFileFormat == null) {
throw InvalidSqlInput.exception(
"Exporting rows into an EXTERN destination requires an AS clause to specify the format, but none was found.",
@ -132,7 +127,7 @@ public abstract class IngestHandler extends QueryHandler
} else {
handlerContext.plannerContext().queryContextMap().put(
DruidSqlIngest.SQL_EXPORT_FILE_FORMAT,
exportFileFormat
exportFileFormat.toString()
);
}
}
@ -143,13 +138,6 @@ public abstract class IngestHandler extends QueryHandler
if (ingestNode().getTargetTable() instanceof ExternalDestinationSqlIdentifier) {
validateExport();
} else {
if (ingestNode().getPartitionedBy() == null) {
throw InvalidSqlInput.exception(
"Operation [%s] requires a PARTITIONED BY to be explicitly defined, but none was found.",
operationName()
);
}
if (ingestNode().getExportFileFormat() != null) {
throw InvalidSqlInput.exception(
"The AS <format> clause should only be specified while exporting rows into an EXTERN destination.",
@ -158,19 +146,6 @@ public abstract class IngestHandler extends QueryHandler
}
}
try {
PlannerContext plannerContext = handlerContext.plannerContext();
if (ingestionGranularity != null) {
plannerContext.queryContextMap().put(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
plannerContext.getJsonMapper().writeValueAsString(ingestionGranularity)
);
}
}
catch (JsonProcessingException e) {
throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", ingestionGranularity);
}
super.validate();
// Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes
// the number of rows inserted to be limited which is likely to be confusing and unintended.
if (handlerContext.queryContextMap().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) {
@ -180,9 +155,31 @@ public abstract class IngestHandler extends QueryHandler
operationName()
);
}
DruidSqlIngest ingestNode = ingestNode();
DruidSqlIngest validatedNode = (DruidSqlIngest) validate(ingestNode);
validatedQueryNode = validatedNode.getSource();
// This context key is set during validation in
// org.apache.druid.sql.calcite.planner.DruidSqlValidator.validateInsert.
String effectiveGranularity = (String) handlerContext.queryContextMap()
.get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY);
try {
ingestionGranularity = effectiveGranularity != null
? handlerContext.jsonMapper().readValue(effectiveGranularity, Granularity.class)
: null;
}
catch (JsonProcessingException e) {
// this should never happen, since the granularity value is validated before being written to contextMap.
throw InvalidSqlInput.exception(e, "Invalid partition granularity [%s]", effectiveGranularity);
}
targetDatasource = validateAndGetDataSourceForIngest();
}
@Override
protected SqlNode validatedQueryNode()
{
return validatedQueryNode;
}
@Override
protected RelDataType returnedRowType()
{
@ -202,27 +199,11 @@ public abstract class IngestHandler extends QueryHandler
private IngestDestination validateAndGetDataSourceForIngest()
{
final SqlInsert insert = ingestNode();
if (insert.isUpsert()) {
throw InvalidSqlInput.exception("UPSERT is not supported.");
}
if (insert.getTargetColumnList() != null) {
throw InvalidSqlInput.exception(
"Operation [%s] cannot be run with a target column list, given [%s (%s)]",
operationName(),
insert.getTargetTable(), insert.getTargetColumnList()
);
}
final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable();
final IngestDestination dataSource;
if (tableIdentifier.names.isEmpty()) {
// I don't think this can happen, but include a branch for it just in case.
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build("Operation [%s] requires a target table", operationName());
} else if (tableIdentifier instanceof ExternalDestinationSqlIdentifier) {
if (tableIdentifier instanceof ExternalDestinationSqlIdentifier) {
ExternalDestinationSqlIdentifier externalDestination = ((ExternalDestinationSqlIdentifier) tableIdentifier);
ExportStorageProvider storageProvider = externalDestination.toExportStorageProvider(handlerContext.jsonMapper());
dataSource = new ExportDestination(storageProvider);
@ -264,7 +245,6 @@ public abstract class IngestHandler extends QueryHandler
@Override
protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException
{
validateColumnsForIngestion(rootQueryRel);
return handlerContext.engine().buildQueryMakerForInsert(
targetDatasource,
rootQueryRel,
@ -272,20 +252,6 @@ public abstract class IngestHandler extends QueryHandler
);
}
private void validateColumnsForIngestion(RelRoot rootQueryRel)
{
// Check that there are no unnamed columns in the insert.
for (Pair<Integer, String> field : rootQueryRel.fields) {
if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) {
throw InvalidSqlInput.exception(
"Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually "
+ "the result of applying a function without having an AS clause, please ensure that all function calls"
+ "are named with an AS clause as in \"func(X) as myColumn\"."
);
}
}
}
/**
* Handler for the INSERT statement.
*/
@ -299,13 +265,24 @@ public abstract class IngestHandler extends QueryHandler
SqlExplain explain
)
{
super(
handlerContext,
sqlNode,
convertQuery(sqlNode),
explain
);
this.sqlNode = sqlNode;
super(handlerContext, explain);
this.sqlNode = convertQuery(sqlNode);
handlerContext.hook().captureInsert(sqlNode);
}
protected static DruidSqlInsert convertQuery(DruidSqlIngest sqlNode)
{
SqlNode query = convertSourceQuery(sqlNode);
return DruidSqlInsert.create(new SqlInsert(
sqlNode.getParserPosition(),
(SqlNodeList) sqlNode.getOperandList().get(0),
sqlNode.getOperandList().get(1),
query,
(SqlNodeList) sqlNode.getOperandList().get(3)),
sqlNode.getPartitionedBy(),
sqlNode.getClusteredBy(),
sqlNode.getExportFileFormat());
}
@Override
@ -355,11 +332,29 @@ public abstract class IngestHandler extends QueryHandler
{
super(
handlerContext,
sqlNode,
convertQuery(sqlNode),
explain
);
this.sqlNode = sqlNode;
this.sqlNode = convertQuery(sqlNode);
handlerContext.hook().captureInsert(sqlNode);
}
protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode)
{
SqlNode query = convertSourceQuery(sqlNode);
return DruidSqlReplace.create(
new SqlInsert(
sqlNode.getParserPosition(),
(SqlNodeList) sqlNode.getOperandList().get(0),
sqlNode.getOperandList().get(1),
query,
(SqlNodeList) sqlNode.getOperandList().get(3)
),
sqlNode.getPartitionedBy(),
sqlNode.getClusteredBy(),
sqlNode.getExportFileFormat(),
sqlNode.getReplaceTimeQuery()
);
}
@Override
@ -390,12 +385,12 @@ public abstract class IngestHandler extends QueryHandler
);
}
super.validate();
List<String> replaceIntervalsList = DruidSqlParserUtils.validateQueryAndConvertToIntervals(
replaceTimeQuery,
ingestionGranularity,
handlerContext.timeZone()
);
super.validate();
if (replaceIntervalsList != null) {
replaceIntervals = String.join(",", replaceIntervalsList);
handlerContext.queryContextMap().put(

View File

@ -93,27 +93,24 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
{
static final EmittingLogger log = new EmittingLogger(QueryHandler.class);
protected SqlNode queryNode;
protected SqlExplain explain;
protected SqlNode validatedQueryNode;
private boolean isPrepared;
protected RelRoot rootQueryRel;
private PrepareResult prepareResult;
protected RexBuilder rexBuilder;
public QueryHandler(SqlStatementHandler.HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain)
public QueryHandler(HandlerContext handlerContext, SqlExplain explain)
{
super(handlerContext);
this.queryNode = sqlNode;
this.explain = explain;
}
@Override
public void validate()
protected SqlNode validate(SqlNode root)
{
CalcitePlanner planner = handlerContext.planner();
SqlNode validatedQueryNode;
try {
validatedQueryNode = planner.validate(rewriteParameters());
validatedQueryNode = planner.validate(rewriteParameters(root));
}
catch (ValidationException e) {
throw DruidPlanner.translateException(e);
@ -126,9 +123,10 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
);
validatedQueryNode.accept(resourceCollectorShuttle);
resourceActions = resourceCollectorShuttle.getResourceActions();
return validatedQueryNode;
}
private SqlNode rewriteParameters()
private SqlNode rewriteParameters(SqlNode original)
{
// Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
// {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
@ -140,9 +138,9 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
// contains parameters, but no values were provided.
PlannerContext plannerContext = handlerContext.plannerContext();
if (plannerContext.getParameters().isEmpty()) {
return queryNode;
return original;
} else {
return queryNode.accept(new SqlParameterizerShuttle(plannerContext));
return original.accept(new SqlParameterizerShuttle(plannerContext));
}
}
@ -153,6 +151,7 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
return;
}
isPrepared = true;
SqlNode validatedQueryNode = validatedQueryNode();
rootQueryRel = handlerContext.planner().rel(validatedQueryNode);
handlerContext.hook().captureQueryRel(rootQueryRel);
final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
@ -177,6 +176,8 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
return prepareResult;
}
protected abstract SqlNode validatedQueryNode();
protected abstract RelDataType returnedRowType();
private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory)
@ -690,13 +691,17 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
public static class SelectHandler extends QueryHandler
{
private final SqlNode queryNode;
private SqlNode validatedQueryNode;
public SelectHandler(
HandlerContext handlerContext,
SqlNode sqlNode,
SqlExplain explain
)
{
super(handlerContext, sqlNode, explain);
super(handlerContext, explain);
this.queryNode = sqlNode;
}
@Override
@ -705,7 +710,13 @@ public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHand
if (!handlerContext.plannerContext().featureAvailable(EngineFeature.CAN_SELECT)) {
throw InvalidSqlInput.exception("Cannot execute SELECT with SQL engine [%s]", handlerContext.engine().name());
}
super.validate();
validatedQueryNode = validate(queryNode);
}
@Override
protected SqlNode validatedQueryNode()
{
return validatedQueryNode;
}
@Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.calcite.avatica.SqlType;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
@ -37,6 +38,7 @@ import org.apache.druid.sql.calcite.export.TestExportStorageConnector;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.destination.ExportDestination;
import org.apache.druid.sql.http.SqlParameter;
import org.apache.druid.storage.StorageConfig;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.local.LocalFileExportStorageProvider;
@ -46,6 +48,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import java.util.Collections;
import java.util.List;
public class CalciteExportTest extends CalciteIngestionDmlTest
@ -176,6 +179,59 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
.verify();
}
@Test
public void testInsertIntoExternParameterized()
{
testIngestionQuery()
.sql(StringUtils.format("INSERT INTO EXTERN(%s()) "
+ "AS CSV "
+ "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME))
.parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val")))
.expectQuery(
Druids.newScanQueryBuilder()
.dataSource(
"foo"
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(equality("dim2", "val", ColumnType.STRING))
.columns("dim2")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.build()
)
.expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME))
.expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build())
.verify();
}
// Disabled until replace supports external destinations. To be enabled after that point.
@Test
@Ignore
public void testReplaceIntoExternParameterized()
{
testIngestionQuery()
.sql(StringUtils.format("REPLACE INTO EXTERN(%s()) "
+ "AS CSV "
+ "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME))
.parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val")))
.expectQuery(
Druids.newScanQueryBuilder()
.dataSource(
"foo"
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(equality("dim2", "val", ColumnType.STRING))
.columns("dim2")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.build()
)
.expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME))
.expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build())
.verify();
}
@Test
public void testExportWithoutFormat()
{

View File

@ -1679,7 +1679,6 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "partitioned by DAY\n"
+ "clustered by channel";
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
testIngestionQuery().context(context).sql(sqlString)
.expectValidationError(
new DruidExceptionMatcher(
@ -1708,7 +1707,6 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "partitioned by DAY\n"
+ "clustered by channel";
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
testIngestionQuery().context(context).sql(sqlString)
.expectValidationError(
new DruidExceptionMatcher(
@ -1736,7 +1734,6 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "partitioned by DAY\n"
+ "clustered by channel";
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
testIngestionQuery().context(context).sql(sqlString)
.expectValidationError(
new DruidExceptionMatcher(
@ -1765,7 +1762,6 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "partitioned by DAY\n"
+ "clustered by channel";
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100);
testIngestionQuery().context(context).sql(sqlString)
.expectValidationError(
new DruidExceptionMatcher(