Add replace statement to sql parser (#12386)

Relevant Issue: #11929

- Add custom replace statement to Druid SQL parser.
- Edit DruidPlanner to convert relevant fields to Query Context.
- Refactor common code with INSERT statements to reuse them for REPLACE where possible.
This commit is contained in:
Adarsh Sanjeev 2022-05-13 10:56:40 +05:30 committed by GitHub
parent 9177515be2
commit 39b3487aa9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1764 additions and 463 deletions

View File

@ -51,6 +51,7 @@ data: {
# List of additional classes and packages to import.
# Example. "org.apache.calcite.sql.*", "java.util.List".
imports: [
"java.util.List"
"org.apache.calcite.sql.SqlNode"
"org.apache.calcite.sql.SqlInsert"
"org.apache.druid.java.util.common.granularity.Granularity"
@ -63,6 +64,7 @@ data: {
# keyword add it to 'nonReservedKeywords' section.
keywords: [
"CLUSTERED"
"OVERWRITE"
"PARTITIONED"
]
@ -218,6 +220,7 @@ data: {
"OTHERS"
"OUTPUT"
"OVERRIDING"
"OVERWRITE"
"PAD"
"PARAMETER_MODE"
"PARAMETER_NAME"
@ -384,6 +387,7 @@ data: {
statementParserMethods: [
"DruidSqlInsertEof()"
"DruidSqlExplain()"
"DruidSqlReplaceEof()"
]
# List of methods for parsing custom literals.
@ -433,8 +437,10 @@ data: {
# given as part of "statementParserMethods", "literalParserMethods" or
# "dataTypeParserMethods".
implementationFiles: [
"common.ftl"
"insert.ftl"
"explain.ftl"
"replace.ftl"
]
includePosixOperators: false

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity() :
{
SqlNode e;
Granularity granularity;
String unparseString;
}
{
(
<HOUR>
{
granularity = Granularities.HOUR;
unparseString = "HOUR";
}
|
<DAY>
{
granularity = Granularities.DAY;
unparseString = "DAY";
}
|
<MONTH>
{
granularity = Granularities.MONTH;
unparseString = "MONTH";
}
|
<YEAR>
{
granularity = Granularities.YEAR;
unparseString = "YEAR";
}
|
<ALL>
{
granularity = Granularities.ALL;
unparseString = "ALL";
}
[
<TIME>
{
unparseString += " TIME";
}
]
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
{
granularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
unparseString = e.toString();
}
)
{
return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
}
}

View File

@ -62,6 +62,8 @@ SqlNode DruidQueryOrSqlQueryOrDml() :
{
(
stmt = DruidSqlInsertEof()
|
stmt = DruidSqlReplaceEof()
|
stmt = SqlQueryOrDml()
)

View File

@ -70,58 +70,3 @@ SqlNodeList ClusterItems() :
return new SqlNodeList(list, s.addAll(list).pos());
}
}
org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity() :
{
SqlNode e = null;
Granularity granularity = null;
String unparseString = null;
}
{
(
<HOUR>
{
granularity = Granularities.HOUR;
unparseString = "HOUR";
}
|
<DAY>
{
granularity = Granularities.DAY;
unparseString = "DAY";
}
|
<MONTH>
{
granularity = Granularities.MONTH;
unparseString = "MONTH";
}
|
<YEAR>
{
granularity = Granularities.YEAR;
unparseString = "YEAR";
}
|
<ALL>
{
granularity = Granularities.ALL;
unparseString = "ALL";
}
[
<TIME>
{
unparseString += " TIME";
}
]
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
{
granularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
unparseString = e.toString();
}
)
{
return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// Taken from syntax of SqlInsert statement from calcite parser, edited for replace syntax
SqlNode DruidSqlReplaceEof() :
{
SqlNode table;
SqlNode source;
SqlNodeList columnList = null;
final Span s;
SqlInsert sqlInsert;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
final Pair<SqlNodeList, SqlNodeList> p;
final SqlNode replaceTimeQuery;
}
{
<REPLACE> { s = span(); }
<INTO>
table = CompoundIdentifier()
[
p = ParenthesizedCompoundIdentifierList() {
if (p.left.size() > 0) {
columnList = p.left;
}
}
]
<OVERWRITE>
replaceTimeQuery = ReplaceTimeQuery()
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
// PARTITIONED BY is necessary, but is kept optional in the grammar. It is asserted that it is not missing in the
// DruidSqlInsert constructor so that we can return a custom error message.
[
<PARTITIONED> <BY>
partitionedBy = PartitionGranularity()
]
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a DruidSqlReplace node after the syntax has been
// validated and throw SQL syntax errors before performing validations in the DruidSqlReplace which can overshadow the
// actual error message.
<EOF>
{
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table, source, columnList);
return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, replaceTimeQuery);
}
}
SqlNode ReplaceTimeQuery() :
{
SqlNode replaceQuery;
}
{
(
<ALL> { replaceQuery = SqlLiteral.createCharString("ALL", getPos()); }
|
// We parse all types of conditions and throw an exception if it is not supported to keep the parsing simple
replaceQuery = WhereOpt()
)
{
return replaceQuery;
}
}

View File

@ -43,6 +43,8 @@ public class DruidSqlInsert extends SqlInsert
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
private final Granularity partitionedBy;
// Used in the unparse function to generate the original query since we convert the string to an enum
private final String partitionedByStringForUnparse;
@Nullable

View File

@ -20,27 +20,48 @@
package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlTimestampLiteral;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.expression.TimeUnits;
import org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.filtration.MoveTimeFiltersToIntervals;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.base.AbstractInterval;
import java.sql.Timestamp;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class DruidSqlParserUtils
{
private static final Logger log = new Logger(DruidSqlParserUtils.class);
public static final String ALL = "all";
/**
* Delegates to {@code convertSqlNodeToGranularity} and converts the exceptions to {@link ParseException}
@ -167,4 +188,195 @@ public class DruidSqlParserUtils
TimeFloorOperatorConversion.SQL_FUNCTION_NAME
));
}
/**
* This method validates and converts a {@link SqlNode} representing a query into an optmizied list of intervals to
* be used in creating an ingestion spec. If the sqlNode is an SqlLiteral of {@link #ALL}, returns a singleton list of
* "ALL". Otherwise, it converts and optimizes the query using {@link MoveTimeFiltersToIntervals} into a list of
* intervals which contain all valid values of time as per the query.
*
* The following validations are performed
* 1. Only __time column and timestamp literals are present in the query
* 2. The interval after optimization is not empty
* 3. The operands in the expression are supported
* 4. The intervals after adjusting for timezone are aligned with the granularity parameter
*
* @param replaceTimeQuery Sql node representing the query
* @param granularity granularity of the query for validation
* @param dateTimeZone timezone
* @return List of string representation of intervals
* @throws ValidationException if the SqlNode cannot be converted to a list of intervals
*/
public static List<String> validateQueryAndConvertToIntervals(
SqlNode replaceTimeQuery,
Granularity granularity,
DateTimeZone dateTimeZone
) throws ValidationException
{
if (replaceTimeQuery instanceof SqlLiteral && ALL.equalsIgnoreCase(((SqlLiteral) replaceTimeQuery).toValue())) {
return ImmutableList.of(ALL);
}
DimFilter dimFilter = convertQueryToDimFilter(replaceTimeQuery, dateTimeZone);
Filtration filtration = Filtration.create(dimFilter);
filtration = MoveTimeFiltersToIntervals.instance().apply(filtration);
List<Interval> intervals = filtration.getIntervals();
if (filtration.getDimFilter() != null) {
throw new ValidationException("Only " + ColumnHolder.TIME_COLUMN_NAME + " column is supported in OVERWRITE WHERE clause");
}
if (intervals.isEmpty()) {
throw new ValidationException("Intervals for replace are empty");
}
for (Interval interval : intervals) {
DateTime intervalStart = interval.getStart();
DateTime intervalEnd = interval.getEnd();
if (!granularity.bucketStart(intervalStart).equals(intervalStart) || !granularity.bucketStart(intervalEnd).equals(intervalEnd)) {
throw new ValidationException("OVERWRITE WHERE clause contains an interval " + intervals +
" which is not aligned with PARTITIONED BY granularity " + granularity);
}
}
return intervals
.stream()
.map(AbstractInterval::toString)
.collect(Collectors.toList());
}
/**
* This method is used to convert an {@link SqlNode} representing a query into a {@link DimFilter} for the same query.
* It takes the timezone as a separate parameter, as Sql timestamps don't contain that information. Supported functions
* are AND, OR, NOT, >, <, >=, <= and BETWEEN operators in the sql query.
*
* @param replaceTimeQuery Sql node representing the query
* @param dateTimeZone timezone
* @return Dimfilter for the query
* @throws ValidationException if the SqlNode cannot be converted a Dimfilter
*/
public static DimFilter convertQueryToDimFilter(SqlNode replaceTimeQuery, DateTimeZone dateTimeZone)
throws ValidationException
{
if (!(replaceTimeQuery instanceof SqlBasicCall)) {
log.error("Expected SqlBasicCall during parsing, but found " + replaceTimeQuery.getClass().getName());
throw new ValidationException("Invalid OVERWRITE WHERE clause");
}
String columnName;
SqlBasicCall sqlBasicCall = (SqlBasicCall) replaceTimeQuery;
List<SqlNode> operandList = sqlBasicCall.getOperandList();
switch (sqlBasicCall.getOperator().getKind()) {
case AND:
List<DimFilter> dimFilters = new ArrayList<>();
for (SqlNode sqlNode : sqlBasicCall.getOperandList()) {
dimFilters.add(convertQueryToDimFilter(sqlNode, dateTimeZone));
}
return new AndDimFilter(dimFilters);
case OR:
dimFilters = new ArrayList<>();
for (SqlNode sqlNode : sqlBasicCall.getOperandList()) {
dimFilters.add(convertQueryToDimFilter(sqlNode, dateTimeZone));
}
return new OrDimFilter(dimFilters);
case NOT:
return new NotDimFilter(convertQueryToDimFilter(sqlBasicCall.getOperandList().get(0), dateTimeZone));
case GREATER_THAN_OR_EQUAL:
columnName = parseColumnName(operandList.get(0));
return new BoundDimFilter(
columnName,
parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
null,
false,
null,
null,
null,
StringComparators.NUMERIC
);
case LESS_THAN_OR_EQUAL:
columnName = parseColumnName(operandList.get(0));
return new BoundDimFilter(
columnName,
null,
parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
null,
false,
null,
null,
StringComparators.NUMERIC
);
case GREATER_THAN:
columnName = parseColumnName(operandList.get(0));
return new BoundDimFilter(
columnName,
parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
null,
true,
null,
null,
null,
StringComparators.NUMERIC
);
case LESS_THAN:
columnName = parseColumnName(operandList.get(0));
return new BoundDimFilter(
columnName,
null,
parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
null,
true,
null,
null,
StringComparators.NUMERIC
);
case BETWEEN:
columnName = parseColumnName(operandList.get(0));
return new BoundDimFilter(
columnName,
parseTimeStampWithTimeZone(operandList.get(1), dateTimeZone),
parseTimeStampWithTimeZone(operandList.get(2), dateTimeZone),
false,
false,
null,
null,
StringComparators.NUMERIC
);
default:
throw new ValidationException("Unsupported operation in OVERWRITE WHERE clause: " + sqlBasicCall.getOperator().getName());
}
}
/**
* Converts a {@link SqlNode} identifier into a string representation
*
* @param sqlNode the sql node
* @return string representing the column name
* @throws ValidationException if the sql node is not an SqlIdentifier
*/
public static String parseColumnName(SqlNode sqlNode) throws ValidationException
{
if (!(sqlNode instanceof SqlIdentifier)) {
throw new ValidationException("Expressions must be of the form __time <operator> TIMESTAMP");
}
return ((SqlIdentifier) sqlNode).getSimple();
}
/**
* Converts a {@link SqlNode} into a timestamp, taking into account the timezone
*
* @param sqlNode the sql node
* @param timeZone timezone
* @return the timestamp string as milliseconds from epoch
* @throws ValidationException if the sql node is not a SqlTimestampLiteral
*/
public static String parseTimeStampWithTimeZone(SqlNode sqlNode, DateTimeZone timeZone) throws ValidationException
{
if (!(sqlNode instanceof SqlTimestampLiteral)) {
throw new ValidationException("Expressions must be of the form __time <operator> TIMESTAMP");
}
Timestamp sqlTimestamp = Timestamp.valueOf(((SqlTimestampLiteral) sqlNode).toFormattedString());
ZonedDateTime zonedTimestamp = sqlTimestamp.toLocalDateTime().atZone(timeZone.toTimeZone().toZoneId());
return String.valueOf(zonedTimestamp.toInstant().toEpochMilli());
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.druid.java.util.common.granularity.Granularity;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* Extends the 'replace' call to hold custom parameters specific to Druid i.e. PARTITIONED BY and the PARTITION SPECS
* This class extends the {@link SqlInsert} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
*/
public class DruidSqlReplace extends SqlInsert
{
public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";
public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER);
private final Granularity partitionedBy;
// Used in the unparse function to generate the original query since we convert the string to an enum
private final String partitionedByStringForUnparse;
private final SqlNode replaceTimeQuery;
/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
*/
public DruidSqlReplace(
@Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nonnull SqlNode replaceTimeQuery
) throws ParseException
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList()
);
if (partitionedBy == null) {
throw new ParseException("REPLACE statements must specify PARTITIONED BY clause explictly");
}
this.partitionedBy = partitionedBy;
this.partitionedByStringForUnparse = Preconditions.checkNotNull(partitionedByStringForUnparse);
this.replaceTimeQuery = replaceTimeQuery;
}
public SqlNode getReplaceTimeQuery()
{
return replaceTimeQuery;
}
public Granularity getPartitionedBy()
{
return partitionedBy;
}
@Nonnull
@Override
public SqlOperator getOperator()
{
return OPERATOR;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
{
writer.startList(SqlWriter.FrameTypeEnum.SELECT);
writer.sep("REPLACE INTO");
final int opLeft = getOperator().getLeftPrec();
final int opRight = getOperator().getRightPrec();
getTargetTable().unparse(writer, opLeft, opRight);
if (getTargetColumnList() != null) {
getTargetColumnList().unparse(writer, opLeft, opRight);
}
writer.newlineAndIndent();
writer.keyword("OVERWRITE");
if (replaceTimeQuery instanceof SqlLiteral) {
writer.keyword("ALL");
} else {
replaceTimeQuery.unparse(writer, leftPrec, rightPrec);
}
writer.newlineAndIndent();
getSource().unparse(writer, 0, 0);
writer.newlineAndIndent();
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
}
}

View File

@ -55,6 +55,7 @@ import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOrderBy;
@ -83,6 +84,8 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.DruidRel;
@ -90,6 +93,7 @@ import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.utils.Throwables;
import org.joda.time.DateTimeZone;
import javax.annotation.Nullable;
import java.io.Closeable;
@ -134,7 +138,7 @@ public class DruidPlanner implements Closeable
public ValidationResult validate(boolean authorizeContextParams) throws SqlParseException, ValidationException
{
resetPlanner();
final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()));
final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()), plannerContext.getTimeZone());
final SqlValidator validator = getValidator();
final SqlNode validatedQueryNode;
@ -150,8 +154,8 @@ public class DruidPlanner implements Closeable
final Set<ResourceAction> resourceActions = new HashSet<>(resourceCollectorShuttle.getResourceActions());
if (parsed.getInsertNode() != null) {
final String targetDataSource = validateAndGetDataSourceForInsert(parsed.getInsertNode());
if (parsed.getInsertOrReplace() != null) {
final String targetDataSource = validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
resourceActions.add(new ResourceAction(new Resource(targetDataSource, ResourceType.DATASOURCE), Action.WRITE));
}
if (authorizeContextParams) {
@ -175,7 +179,7 @@ public class DruidPlanner implements Closeable
{
resetPlanner();
final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()));
final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()), plannerContext.getTimeZone());
final SqlNode validatedQueryNode = planner.validate(parsed.getQueryNode());
final RelRoot rootQueryRel = planner.rel(validatedQueryNode);
@ -187,7 +191,7 @@ public class DruidPlanner implements Closeable
if (parsed.getExplainNode() != null) {
returnedRowType = getExplainStructType(typeFactory);
} else {
returnedRowType = buildQueryMaker(rootQueryRel, parsed.getInsertNode()).getResultType();
returnedRowType = buildQueryMaker(rootQueryRel, parsed.getInsertOrReplace()).getResultType();
}
return new PrepareResult(returnedRowType, parameterTypes);
@ -206,7 +210,7 @@ public class DruidPlanner implements Closeable
{
resetPlanner();
final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()));
final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()), plannerContext.getTimeZone());
try {
if (parsed.getIngestionGranularity() != null) {
@ -220,6 +224,13 @@ public class DruidPlanner implements Closeable
throw new ValidationException("Unable to serialize partition granularity.");
}
if (parsed.getReplaceIntervals() != null) {
plannerContext.getQueryContext().addSystemParam(
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
String.join(",", parsed.getReplaceIntervals())
);
}
// the planner's type factory is not available until after parsing
this.rexBuilder = new RexBuilder(planner.getTypeFactory());
final SqlNode parameterizedQueryNode = rewriteDynamicParameters(parsed.getQueryNode());
@ -227,7 +238,7 @@ public class DruidPlanner implements Closeable
final RelRoot rootQueryRel = planner.rel(validatedQueryNode);
try {
return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), parsed.getInsertNode());
return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), parsed.getInsertOrReplace());
}
catch (Exception e) {
Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class);
@ -236,9 +247,9 @@ public class DruidPlanner implements Closeable
throw e;
}
// If there isn't any INSERT clause, then we should try again with BINDABLE convention. And return without
// If there isn't any ingestion clause, then we should try again with BINDABLE convention. And return without
// any error, if it is plannable by the bindable convention
if (parsed.getInsertNode() == null) {
if (parsed.getInsertOrReplace() == null) {
// Try again with BINDABLE convention. Used for querying Values and metadata tables.
try {
return planWithBindableConvention(rootQueryRel, parsed.getExplainNode());
@ -295,12 +306,11 @@ public class DruidPlanner implements Closeable
private PlannerResult planWithDruidConvention(
final RelRoot root,
@Nullable final SqlExplain explain,
@Nullable final SqlInsert insert
@Nullable final SqlInsert insertOrReplace
) throws ValidationException, RelConversionException
{
final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(root);
final QueryMaker queryMaker = buildQueryMaker(root, insert);
final QueryMaker queryMaker = buildQueryMaker(root, insertOrReplace);
plannerContext.setQueryMaker(queryMaker);
RelNode parameterized = rewriteRelDynamicParameters(possiblyLimitedRoot.rel);
@ -625,7 +635,7 @@ public class DruidPlanner implements Closeable
/**
* Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
* {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link org.apache.calcite.sql.SqlLiteral}
* {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
* replacement
*/
private SqlNode rewriteDynamicParameters(SqlNode parsed)
@ -650,11 +660,11 @@ public class DruidPlanner implements Closeable
private QueryMaker buildQueryMaker(
final RelRoot rootQueryRel,
@Nullable final SqlInsert insert
@Nullable final SqlInsert insertOrReplace
) throws ValidationException
{
if (insert != null) {
final String targetDataSource = validateAndGetDataSourceForInsert(insert);
if (insertOrReplace != null) {
final String targetDataSource = validateAndGetDataSourceForIngest(insertOrReplace);
validateColumnsForIngestion(rootQueryRel);
return queryMakerFactory.buildForInsert(targetDataSource, rootQueryRel, plannerContext);
} else {
@ -674,17 +684,17 @@ public class DruidPlanner implements Closeable
}
/**
* Extract target datasource from a {@link SqlInsert}, and also validate that the INSERT is of a form we support.
* Expects the INSERT target to be either an unqualified name, or a name qualified by the default schema.
* Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support.
* Expects the target datasource to be either an unqualified name, or a name qualified by the default schema.
*/
private String validateAndGetDataSourceForInsert(final SqlInsert insert) throws ValidationException
private String validateAndGetDataSourceForIngest(final SqlInsert insert) throws ValidationException
{
if (insert.isUpsert()) {
throw new ValidationException("UPSERT is not supported.");
}
if (insert.getTargetColumnList() != null) {
throw new ValidationException("INSERT with target column list is not supported.");
throw new ValidationException("Ingestion with target column list is not supported.");
}
final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable();
@ -692,7 +702,7 @@ public class DruidPlanner implements Closeable
if (tableIdentifier.names.isEmpty()) {
// I don't think this can happen, but include a branch for it just in case.
throw new ValidationException("INSERT requires target table.");
throw new ValidationException("Ingestion requires target table.");
} else if (tableIdentifier.names.size() == 1) {
// Unqualified name.
dataSource = Iterables.getOnlyElement(tableIdentifier.names);
@ -705,13 +715,13 @@ public class DruidPlanner implements Closeable
dataSource = tableIdentifier.names.get(1);
} else {
throw new ValidationException(
StringUtils.format("Cannot INSERT into [%s] because it is not a Druid datasource.", tableIdentifier)
StringUtils.format("Cannot ingest into [%s] because it is not a Druid datasource.", tableIdentifier)
);
}
}
try {
IdUtils.validateId("INSERT dataSource", dataSource);
IdUtils.validateId("Ingestion dataSource", dataSource);
}
catch (IllegalArgumentException e) {
throw new ValidationException(e.getMessage());
@ -777,76 +787,45 @@ public class DruidPlanner implements Closeable
private final SqlExplain explain;
@Nullable
private final DruidSqlInsert insert;
private final SqlInsert insertOrReplace;
private final SqlNode query;
@Nullable
private final Granularity ingestionGranularity;
@Nullable
private final List<String> replaceIntervals;
private ParsedNodes(
@Nullable SqlExplain explain,
@Nullable DruidSqlInsert insert,
@Nullable SqlInsert insertOrReplace,
SqlNode query,
@Nullable Granularity ingestionGranularity
@Nullable Granularity ingestionGranularity,
@Nullable List<String> replaceIntervals
)
{
this.explain = explain;
this.insert = insert;
this.insertOrReplace = insertOrReplace;
this.query = query;
this.ingestionGranularity = ingestionGranularity;
this.replaceIntervals = replaceIntervals;
}
static ParsedNodes create(final SqlNode node) throws ValidationException
static ParsedNodes create(final SqlNode node, DateTimeZone dateTimeZone) throws ValidationException
{
SqlExplain explain = null;
DruidSqlInsert druidSqlInsert = null;
SqlNode query = node;
Granularity ingestionGranularity = null;
SqlExplain explain = null;
if (query.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) query;
query = explain.getExplicandum();
}
if (query.getKind() == SqlKind.INSERT) {
druidSqlInsert = (DruidSqlInsert) query;
query = druidSqlInsert.getSource();
// Check if ORDER BY clause is not provided to the underlying query
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
SqlNodeList orderByList = sqlOrderBy.orderList;
if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
throw new ValidationException("Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.");
}
}
ingestionGranularity = druidSqlInsert.getPartitionedBy();
if (druidSqlInsert.getClusteredBy() != null) {
// If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new SqlOrderBy
// node
SqlNode offset = null;
SqlNode fetch = null;
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
// This represents the underlying query free of OFFSET, FETCH and ORDER BY clauses
// For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 10 FETCH 30 ORDER BY dim1 GROUP BY dim1
// this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1
query = sqlOrderBy.query;
offset = sqlOrderBy.offset;
fetch = sqlOrderBy.fetch;
}
// Creates a new SqlOrderBy query, which may have our CLUSTERED BY overwritten
query = new SqlOrderBy(
query.getParserPosition(),
query,
druidSqlInsert.getClusteredBy(),
offset,
fetch
);
if (query instanceof DruidSqlInsert) {
return handleInsert(explain, (DruidSqlInsert) query);
} else if (query instanceof DruidSqlReplace) {
return handleReplace(explain, (DruidSqlReplace) query, dateTimeZone);
}
}
@ -854,7 +833,83 @@ public class DruidPlanner implements Closeable
throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
}
return new ParsedNodes(explain, druidSqlInsert, query, ingestionGranularity);
return new ParsedNodes(explain, null, query, null, null);
}
static ParsedNodes handleInsert(SqlExplain explain, DruidSqlInsert druidSqlInsert) throws ValidationException
{
SqlNode query = druidSqlInsert.getSource();
// Check if ORDER BY clause is not provided to the underlying query
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
SqlNodeList orderByList = sqlOrderBy.orderList;
if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
throw new ValidationException("Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.");
}
}
Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy();
if (druidSqlInsert.getClusteredBy() != null) {
// If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new
// SqlOrderBy node
SqlNode offset = null;
SqlNode fetch = null;
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
// This represents the underlying query free of OFFSET, FETCH and ORDER BY clauses
// For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
// BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1
query = sqlOrderBy.query;
offset = sqlOrderBy.offset;
fetch = sqlOrderBy.fetch;
}
// Creates a new SqlOrderBy query, which may have our CLUSTERED BY overwritten
query = new SqlOrderBy(
query.getParserPosition(),
query,
druidSqlInsert.getClusteredBy(),
offset,
fetch
);
}
if (!query.isA(SqlKind.QUERY)) {
throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
}
return new ParsedNodes(explain, druidSqlInsert, query, ingestionGranularity, null);
}
static ParsedNodes handleReplace(SqlExplain explain, DruidSqlReplace druidSqlReplace, DateTimeZone dateTimeZone)
throws ValidationException
{
SqlNode query = druidSqlReplace.getSource();
// Check if ORDER BY clause is not provided to the underlying query
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
SqlNodeList orderByList = sqlOrderBy.orderList;
if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
throw new ValidationException("Cannot have ORDER BY on a REPLACE query.");
}
}
SqlNode replaceTimeQuery = druidSqlReplace.getReplaceTimeQuery();
if (replaceTimeQuery == null) {
throw new ValidationException("Missing time chunk information in DELETE WHERE clause for replace.");
}
Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy();
List<String> replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, ingestionGranularity, dateTimeZone);
if (!query.isA(SqlKind.QUERY)) {
throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
}
return new ParsedNodes(explain, druidSqlReplace, query, ingestionGranularity, replaceIntervals);
}
@Nullable
@ -864,9 +919,15 @@ public class DruidPlanner implements Closeable
}
@Nullable
public DruidSqlInsert getInsertNode()
public SqlInsert getInsertOrReplace()
{
return insert;
return insertOrReplace;
}
@Nullable
public List<String> getReplaceIntervals()
{
return replaceIntervals;
}
public SqlNode getQueryNode()

View File

@ -0,0 +1,339 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.SqlLifecycle;
import org.apache.druid.sql.SqlLifecycleFactory;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
{
protected static final Map<String, Object> DEFAULT_CONTEXT =
ImmutableMap.<String, Object>builder()
.put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
.build();
protected static final RowSignature FOO_TABLE_SIGNATURE =
RowSignature.builder()
.addTimeColumn()
.add("cnt", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("dim2", ColumnType.STRING)
.add("dim3", ColumnType.STRING)
.add("m1", ColumnType.FLOAT)
.add("m2", ColumnType.DOUBLE)
.add("unique_dim1", HyperUniquesAggregatorFactory.TYPE)
.build();
protected final ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("a,b,1\nc,d,2\n"),
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0),
RowSignature.builder()
.add("x", ColumnType.STRING)
.add("y", ColumnType.STRING)
.add("z", ColumnType.LONG)
.build()
);
protected boolean didTest = false;
@After
@Override
public void tearDown() throws Exception
{
super.tearDown();
// Catch situations where tests forgot to call "verify" on their tester.
if (!didTest) {
throw new ISE("Test was not run; did you call verify() on a tester?");
}
}
protected String externSql(final ExternalDataSource externalDataSource)
{
try {
return StringUtils.format(
"TABLE(extern(%s, %s, %s))",
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputSource())),
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputFormat())),
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getSignature()))
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
protected Map<String, Object> queryContextWithGranularity(Granularity granularity)
{
String granularityString = null;
try {
granularityString = queryJsonMapper.writeValueAsString(granularity);
}
catch (JsonProcessingException e) {
Assert.fail(e.getMessage());
}
return ImmutableMap.of(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, granularityString);
}
protected IngestionDmlTester testIngestionQuery()
{
return new IngestionDmlTester();
}
public class IngestionDmlTester
{
private String sql;
private PlannerConfig plannerConfig = new PlannerConfig();
private Map<String, Object> queryContext = DEFAULT_CONTEXT;
private AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
private String expectedTargetDataSource;
private RowSignature expectedTargetSignature;
private List<ResourceAction> expectedResources;
private Query expectedQuery;
private Matcher<Throwable> validationErrorMatcher;
private IngestionDmlTester()
{
// Nothing to do.
}
public IngestionDmlTester sql(final String sql)
{
this.sql = sql;
return this;
}
protected IngestionDmlTester sql(final String sqlPattern, final Object arg, final Object... otherArgs)
{
final Object[] args = new Object[otherArgs.length + 1];
args[0] = arg;
System.arraycopy(otherArgs, 0, args, 1, otherArgs.length);
this.sql = StringUtils.format(sqlPattern, args);
return this;
}
public IngestionDmlTester context(final Map<String, Object> context)
{
this.queryContext = context;
return this;
}
public IngestionDmlTester authentication(final AuthenticationResult authenticationResult)
{
this.authenticationResult = authenticationResult;
return this;
}
public IngestionDmlTester expectTarget(
final String expectedTargetDataSource,
final RowSignature expectedTargetSignature
)
{
this.expectedTargetDataSource = Preconditions.checkNotNull(expectedTargetDataSource, "expectedTargetDataSource");
this.expectedTargetSignature = Preconditions.checkNotNull(expectedTargetSignature, "expectedTargetSignature");
return this;
}
public IngestionDmlTester expectResources(final ResourceAction... expectedResources)
{
this.expectedResources = Arrays.asList(expectedResources);
return this;
}
@SuppressWarnings("rawtypes")
public IngestionDmlTester expectQuery(final Query expectedQuery)
{
this.expectedQuery = expectedQuery;
return this;
}
public IngestionDmlTester expectValidationError(Matcher<Throwable> validationErrorMatcher)
{
this.validationErrorMatcher = validationErrorMatcher;
return this;
}
public IngestionDmlTester expectValidationError(Class<? extends Throwable> clazz)
{
return expectValidationError(CoreMatchers.instanceOf(clazz));
}
public IngestionDmlTester expectValidationError(Class<? extends Throwable> clazz, String message)
{
return expectValidationError(
CoreMatchers.allOf(
CoreMatchers.instanceOf(clazz),
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(message))
)
);
}
public void verify()
{
if (didTest) {
// It's good form to only do one test per method.
// This also helps us ensure that "verify" actually does get called.
throw new ISE("Use one @Test method per tester");
}
didTest = true;
if (sql == null) {
throw new ISE("Test must have SQL statement");
}
try {
log.info("SQL: %s", sql);
queryLogHook.clearRecordedQueries();
if (validationErrorMatcher != null) {
verifyValidationError();
} else {
verifySuccess();
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private void verifyValidationError()
{
if (expectedTargetDataSource != null) {
throw new ISE("Test must not have expectedTargetDataSource");
}
if (expectedResources != null) {
throw new ISE("Test must not have expectedResources");
}
if (expectedQuery != null) {
throw new ISE("Test must not have expectedQuery");
}
final SqlLifecycleFactory sqlLifecycleFactory = getSqlLifecycleFactory(
plannerConfig,
new AuthConfig(),
createOperatorTable(),
createMacroTable(),
CalciteTests.TEST_AUTHORIZER_MAPPER,
queryJsonMapper
);
final SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
sqlLifecycle.initialize(sql, new QueryContext(queryContext));
final Throwable e = Assert.assertThrows(
Throwable.class,
() -> {
sqlLifecycle.validateAndAuthorize(authenticationResult);
sqlLifecycle.plan();
}
);
MatcherAssert.assertThat(e, validationErrorMatcher);
Assert.assertTrue(queryLogHook.getRecordedQueries().isEmpty());
}
private void verifySuccess() throws Exception
{
if (expectedTargetDataSource == null) {
throw new ISE("Test must have expectedTargetDataSource");
}
if (expectedResources == null) {
throw new ISE("Test must have expectedResources");
}
final List<Query> expectedQueries =
expectedQuery == null
? Collections.emptyList()
: Collections.singletonList(recursivelyOverrideContext(expectedQuery, queryContext));
Assert.assertEquals(
ImmutableSet.copyOf(expectedResources),
analyzeResources(plannerConfig, new AuthConfig(), sql, queryContext, authenticationResult)
);
final List<Object[]> results =
getResults(plannerConfig, queryContext, Collections.emptyList(), sql, authenticationResult);
verifyResults(
sql,
expectedQueries,
Collections.singletonList(new Object[]{expectedTargetDataSource, expectedTargetSignature}),
results
);
}
}
protected static ResourceAction viewRead(final String viewName)
{
return new ResourceAction(new Resource(viewName, ResourceType.VIEW), Action.READ);
}
protected static ResourceAction dataSourceRead(final String dataSource)
{
return new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.READ);
}
protected static ResourceAction dataSourceWrite(final String dataSource)
{
return new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE);
}
}

View File

@ -20,110 +20,44 @@
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.SqlLifecycle;
import org.apache.druid.sql.SqlLifecycleFactory;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
{
private static final Map<String, Object> DEFAULT_CONTEXT =
ImmutableMap.<String, Object>builder()
.put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
.build();
private static final RowSignature FOO_TABLE_SIGNATURE =
RowSignature.builder()
.addTimeColumn()
.add("cnt", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("dim2", ColumnType.STRING)
.add("dim3", ColumnType.STRING)
.add("m1", ColumnType.FLOAT)
.add("m2", ColumnType.DOUBLE)
.add("unique_dim1", HyperUniquesAggregatorFactory.TYPE)
.build();
private final ExternalDataSource externalDataSource = new ExternalDataSource(
new InlineInputSource("a,b,1\nc,d,2\n"),
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0),
RowSignature.builder()
.add("x", ColumnType.STRING)
.add("y", ColumnType.STRING)
.add("z", ColumnType.LONG)
.build()
);
private static final Map<String, Object> PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT = ImmutableMap.of(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
"{\"type\":\"all\"}"
);
private boolean didTest = false;
@After
@Override
public void tearDown() throws Exception
{
super.tearDown();
// Catch situations where tests forgot to call "verify" on their tester.
if (!didTest) {
throw new ISE("Test was not run; did you call verify() on a tester?");
}
}
@Test
public void testInsertFromTable()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
@ -141,7 +75,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertFromView()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM view.aview PARTITIONED BY ALL TIME")
.expectTarget("dst", RowSignature.builder().add("dim1_firstchar", ColumnType.STRING).build())
.expectResources(viewRead("aview"), dataSourceWrite("dst"))
@ -161,7 +95,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertIntoExistingTable()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO foo SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("foo", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
@ -179,7 +113,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertIntoQualifiedTable()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO druid.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
@ -197,25 +131,25 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertIntoInvalidDataSourceName()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO \"in/valid\" SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "INSERT dataSource cannot contain the '/' character.")
.expectValidationError(SqlPlanningException.class, "Ingestion dataSource cannot contain the '/' character.")
.verify();
}
@Test
public void testInsertUsingColumnList()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "INSERT with target column list is not supported.")
.expectValidationError(SqlPlanningException.class, "Ingestion with target column list is not supported.")
.verify();
}
@Test
public void testUpsert()
{
testInsertQuery()
testIngestionQuery()
.sql("UPSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "UPSERT is not supported.")
.verify();
@ -224,11 +158,11 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertIntoSystemTable()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource."
"Cannot ingest into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource."
)
.verify();
}
@ -236,11 +170,11 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertIntoView()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [view.aview] because it is not a Druid datasource."
"Cannot ingest into [view.aview] because it is not a Druid datasource."
)
.verify();
}
@ -248,7 +182,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertFromUnauthorizedDataSource()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM \"%s\" PARTITIONED BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
.expectValidationError(ForbiddenException.class)
.verify();
@ -257,7 +191,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertIntoUnauthorizedDataSource()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO \"%s\" SELECT * FROM foo PARTITIONED BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
.expectValidationError(ForbiddenException.class)
.verify();
@ -266,11 +200,11 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertIntoNonexistentSchema()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [nonexistent.dst] because it is not a Druid datasource."
"Cannot ingest into [nonexistent.dst] because it is not a Druid datasource."
)
.verify();
}
@ -278,7 +212,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertFromExternal()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", externalDataSource.getSignature())
@ -304,7 +238,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.add("dim1", ColumnType.STRING)
.build();
testInsertQuery()
testIngestionQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo PARTITIONED BY TIME_FLOOR(__time, 'PT1H')")
.expectTarget("dst", targetRowSignature)
@ -353,7 +287,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
Assert.fail(e.getMessage());
}
testInsertQuery()
testIngestionQuery()
.sql(StringUtils.format(
"INSERT INTO druid.dst SELECT __time, dim1 FROM foo PARTITIONED BY %s",
partitionedByArgument
@ -384,7 +318,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.add("dim1", ColumnType.STRING)
.add("ceil_m2", ColumnType.DOUBLE)
.build();
testInsertQuery()
testIngestionQuery()
.sql(
"INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo "
@ -424,7 +358,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.add("dim1", ColumnType.STRING)
.build();
testInsertQuery()
testIngestionQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo PARTITIONED BY DAY CLUSTERED BY 2, dim1")
.expectTarget("dst", targetRowSignature)
@ -456,7 +390,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.add("dim1", ColumnType.STRING)
.build();
testInsertQuery()
testIngestionQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo LIMIT 10 OFFSET 20 PARTITIONED BY DAY")
.expectTarget("dst", targetRowSignature)
@ -583,7 +517,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
+ queryJsonMapper.writeValueAsString(expectedQuery)
+ "], signature=[{x:STRING, y:STRING, z:LONG}])\n";
// Use testQuery for EXPLAIN (not testInsertQuery).
// Use testQuery for EXPLAIN (not testIngestionQuery).
testQuery(
new PlannerConfig(),
StringUtils.format(
@ -600,14 +534,14 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
)
);
// Not using testInsertQuery, so must set didTest manually to satisfy the check in tearDown.
// Not using testIngestionQuery, so must set didTest manually to satisfy the check in tearDown.
didTest = true;
}
@Test
public void testExplainInsertFromExternalUnauthorized()
{
// Use testQuery for EXPLAIN (not testInsertQuery).
// Use testQuery for EXPLAIN (not testIngestionQuery).
Assert.assertThrows(
ForbiddenException.class,
() ->
@ -621,14 +555,14 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
)
);
// Not using testInsertQuery, so must set didTest manually to satisfy the check in tearDown.
// Not using testIngestionQuery, so must set didTest manually to satisfy the check in tearDown.
didTest = true;
}
@Test
public void testInsertFromExternalUnauthorized()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
.expectValidationError(ForbiddenException.class)
.verify();
@ -639,7 +573,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
{
// INSERT with a particular column ordering.
testInsertQuery()
testIngestionQuery()
.sql(
"INSERT INTO dst SELECT x || y AS xy, z FROM %s PARTITIONED BY ALL TIME CLUSTERED BY 1, 2",
externSql(externalDataSource)
@ -670,7 +604,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
{
// INSERT with rollup.
testInsertQuery()
testIngestionQuery()
.sql(
"INSERT INTO dst SELECT x, SUM(z) AS sum_z, COUNT(*) AS cnt FROM %s GROUP BY 1 PARTITIONED BY ALL TIME",
externSql(externalDataSource)
@ -706,7 +640,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
{
// INSERT with rollup into a single row (no GROUP BY exprs).
testInsertQuery()
testIngestionQuery()
.sql(
"INSERT INTO dst SELECT COUNT(*) AS cnt FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
@ -734,7 +668,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertWithInvalidSelectStatement()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO t SELECT channel, added as count FROM foo PARTITIONED BY ALL") // count is a keyword
.expectValidationError(
CoreMatchers.allOf(
@ -748,7 +682,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertWithUnnamedColumnInSelectStatement()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO t SELECT dim1, dim2 || '-lol' FROM foo PARTITIONED BY ALL")
.expectValidationError(
SqlPlanningException.class,
@ -763,7 +697,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertWithInvalidColumnNameInIngest()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO t SELECT __time, dim1 AS EXPR$0 FROM foo PARTITIONED BY ALL")
.expectValidationError(
SqlPlanningException.class,
@ -778,7 +712,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
@Test
public void testInsertWithUnnamedColumnInNestedSelectStatement()
{
testInsertQuery()
testIngestionQuery()
.sql("INSERT INTO test "
+ "SELECT __time, * FROM "
+ "(SELECT __time, LOWER(dim1) FROM foo) PARTITIONED BY ALL TIME")
@ -791,238 +725,4 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
)
.verify();
}
private String externSql(final ExternalDataSource externalDataSource)
{
try {
return StringUtils.format(
"TABLE(extern(%s, %s, %s))",
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputSource())),
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getInputFormat())),
Calcites.escapeStringLiteral(queryJsonMapper.writeValueAsString(externalDataSource.getSignature()))
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private Map<String, Object> queryContextWithGranularity(Granularity granularity)
{
String granularityString = null;
try {
granularityString = queryJsonMapper.writeValueAsString(granularity);
}
catch (JsonProcessingException e) {
Assert.fail(e.getMessage());
}
return ImmutableMap.of(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, granularityString);
}
private InsertDmlTester testInsertQuery()
{
return new InsertDmlTester();
}
public class InsertDmlTester
{
private String sql;
private PlannerConfig plannerConfig = new PlannerConfig();
private Map<String, Object> queryContext = DEFAULT_CONTEXT;
private AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
private String expectedTargetDataSource;
private RowSignature expectedTargetSignature;
private List<ResourceAction> expectedResources;
private Query expectedQuery;
private Matcher<Throwable> validationErrorMatcher;
private InsertDmlTester()
{
// Nothing to do.
}
public InsertDmlTester sql(final String sql)
{
this.sql = sql;
return this;
}
private InsertDmlTester sql(final String sqlPattern, final Object arg, final Object... otherArgs)
{
final Object[] args = new Object[otherArgs.length + 1];
args[0] = arg;
System.arraycopy(otherArgs, 0, args, 1, otherArgs.length);
this.sql = StringUtils.format(sqlPattern, args);
return this;
}
public InsertDmlTester context(final Map<String, Object> context)
{
this.queryContext = context;
return this;
}
public InsertDmlTester authentication(final AuthenticationResult authenticationResult)
{
this.authenticationResult = authenticationResult;
return this;
}
public InsertDmlTester expectTarget(
final String expectedTargetDataSource,
final RowSignature expectedTargetSignature
)
{
this.expectedTargetDataSource = Preconditions.checkNotNull(expectedTargetDataSource, "expectedTargetDataSource");
this.expectedTargetSignature = Preconditions.checkNotNull(expectedTargetSignature, "expectedTargetSignature");
return this;
}
public InsertDmlTester expectResources(final ResourceAction... expectedResources)
{
this.expectedResources = Arrays.asList(expectedResources);
return this;
}
@SuppressWarnings("rawtypes")
public InsertDmlTester expectQuery(final Query expectedQuery)
{
this.expectedQuery = expectedQuery;
return this;
}
public InsertDmlTester expectValidationError(Matcher<Throwable> validationErrorMatcher)
{
this.validationErrorMatcher = validationErrorMatcher;
return this;
}
public InsertDmlTester expectValidationError(Class<? extends Throwable> clazz)
{
return expectValidationError(CoreMatchers.instanceOf(clazz));
}
public InsertDmlTester expectValidationError(Class<? extends Throwable> clazz, String message)
{
return expectValidationError(
CoreMatchers.allOf(
CoreMatchers.instanceOf(clazz),
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(message))
)
);
}
public void verify()
{
if (didTest) {
// It's good form to only do one test per method.
// This also helps us ensure that "verify" actually does get called.
throw new ISE("Use one @Test method per tester");
}
didTest = true;
if (sql == null) {
throw new ISE("Test must have SQL statement");
}
try {
log.info("SQL: %s", sql);
queryLogHook.clearRecordedQueries();
if (validationErrorMatcher != null) {
verifyValidationError();
} else {
verifySuccess();
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private void verifyValidationError()
{
if (expectedTargetDataSource != null) {
throw new ISE("Test must not have expectedTargetDataSource");
}
if (expectedResources != null) {
throw new ISE("Test must not have expectedResources");
}
if (expectedQuery != null) {
throw new ISE("Test must not have expectedQuery");
}
final SqlLifecycleFactory sqlLifecycleFactory = getSqlLifecycleFactory(
plannerConfig,
new AuthConfig(),
createOperatorTable(),
createMacroTable(),
CalciteTests.TEST_AUTHORIZER_MAPPER,
queryJsonMapper
);
final SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
sqlLifecycle.initialize(sql, new QueryContext(queryContext));
final Throwable e = Assert.assertThrows(
Throwable.class,
() -> {
sqlLifecycle.validateAndAuthorize(authenticationResult);
sqlLifecycle.plan();
}
);
MatcherAssert.assertThat(e, validationErrorMatcher);
Assert.assertTrue(queryLogHook.getRecordedQueries().isEmpty());
}
private void verifySuccess() throws Exception
{
if (expectedTargetDataSource == null) {
throw new ISE("Test must have expectedTargetDataSource");
}
if (expectedResources == null) {
throw new ISE("Test must have expectedResources");
}
final List<Query> expectedQueries =
expectedQuery == null
? Collections.emptyList()
: Collections.singletonList(recursivelyOverrideContext(expectedQuery, queryContext));
Assert.assertEquals(
ImmutableSet.copyOf(expectedResources),
analyzeResources(plannerConfig, sql, authenticationResult)
);
final List<Object[]> results =
getResults(plannerConfig, queryContext, Collections.emptyList(), sql, authenticationResult);
verifyResults(
sql,
expectedQueries,
Collections.singletonList(new Object[]{expectedTargetDataSource, expectedTargetSignature}),
results
);
}
}
private static ResourceAction viewRead(final String viewName)
{
return new ResourceAction(new Resource(viewName, ResourceType.VIEW), Action.READ);
}
private static ResourceAction dataSourceRead(final String dataSource)
{
return new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.READ);
}
private static ResourceAction dataSourceWrite(final String dataSource)
{
return new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE);
}
}

View File

@ -0,0 +1,656 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
private static final Map<String, Object> REPLACE_ALL_TIME_CHUNKS = ImmutableMap.of(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
"{\"type\":\"all\"}",
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
"all"
);
protected Map<String, Object> addReplaceTimeChunkToQueryContext(Map<String, Object> context, String replaceTimeChunks)
{
return ImmutableMap.<String, Object>builder()
.putAll(context)
.put(DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS, replaceTimeChunks)
.build();
}
@Test
public void testReplaceFromTableWithReplaceAll()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}
@Test
public void testReplaceFromTableWithDeleteWhereClause()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' "
+ "SELECT * FROM foo PARTITIONED BY DAY")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(
addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.DAY),
"2000-01-01T00:00:00.000Z/2000-01-02T00:00:00.000Z"
)
)
.build()
)
.verify();
}
@Test
public void testReplaceFromTableWithTimeZoneInQueryContext()
{
HashMap<String, Object> context = new HashMap<>(DEFAULT_CONTEXT);
context.put(PlannerContext.CTX_SQL_TIME_ZONE, "+05:30");
testIngestionQuery()
.context(context)
.sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 05:30:00' AND __time < TIMESTAMP '2000-01-02 05:30:00' "
+ "SELECT * FROM foo PARTITIONED BY DAY")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(
addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.DAY),
"2000-01-01T00:00:00.000Z/2000-01-02T00:00:00.000Z"
)
)
.build()
)
.verify();
}
@Test
public void testReplaceFromTableWithIntervalLargerThanOneGranularity()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE "
+ "__time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-05-01' "
+ "SELECT * FROM foo PARTITIONED BY MONTH")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(
addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.MONTH),
"2000-01-01T00:00:00.000Z/2000-05-01T00:00:00.000Z"
)
)
.build()
)
.verify();
}
@Test
public void testReplaceFromTableWithComplexDeleteWhereClause()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE "
+ "__time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-02-01' "
+ "OR __time >= TIMESTAMP '2000-03-01' AND __time < TIMESTAMP '2000-04-01' "
+ "SELECT * FROM foo PARTITIONED BY MONTH")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(
addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.MONTH),
"2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z,2000-03-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"
)
)
.build()
)
.verify();
}
@Test
public void testReplaceFromTableWithBetweenClause()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE "
+ "__time BETWEEN TIMESTAMP '2000-01-01' AND TIMESTAMP '2000-01-31 23:59:59.999' "
+ "SELECT * FROM foo PARTITIONED BY MONTH")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(
addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.MONTH),
"2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z"
)
)
.build()
)
.verify();
}
@Test
public void testReplaceForUnsupportedDeleteWhereClause()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE __time LIKE '20__-02-01' SELECT * FROM foo PARTITIONED BY MONTH")
.expectValidationError(
SqlPlanningException.class,
"Unsupported operation in OVERWRITE WHERE clause: LIKE"
)
.verify();
}
@Test
public void testReplaceForInvalidDeleteWhereClause()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE TRUE SELECT * FROM foo PARTITIONED BY MONTH")
.expectValidationError(
SqlPlanningException.class,
"Invalid OVERWRITE WHERE clause"
)
.verify();
}
@Test
public void testReplaceForDeleteWhereClauseOnUnsupportedColumns()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE dim1 > TIMESTAMP '2000-01-05 00:00:00' SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Only __time column is supported in OVERWRITE WHERE clause"
)
.verify();
}
@Test
public void testReplaceWithOrderBy()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE query.")
.verify();
}
@Test
public void testReplaceForMisalignedPartitionInterval()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-05 00:00:00' AND __time <= TIMESTAMP '2000-01-06 00:00:00' SELECT * FROM foo PARTITIONED BY MONTH")
.expectValidationError(
SqlPlanningException.class,
"OVERWRITE WHERE clause contains an interval [2000-01-05T00:00:00.000Z/2000-01-06T00:00:00.001Z] which is not aligned with PARTITIONED BY granularity {type=period, period=P1M, timeZone=UTC, origin=null}"
)
.verify();
}
@Test
public void testReplaceForInvalidPartition()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-05 00:00:00' AND __time <= TIMESTAMP '2000-02-05 00:00:00' SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"OVERWRITE WHERE clause contains an interval [2000-01-05T00:00:00.000Z/2000-02-05T00:00:00.001Z] which is not aligned with PARTITIONED BY granularity AllGranularity"
)
.verify();
}
@Test
public void testReplaceFromTableWithEmptyInterval()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE "
+ "__time < TIMESTAMP '2000-01-01' AND __time > TIMESTAMP '2000-01-01' "
+ "SELECT * FROM foo PARTITIONED BY MONTH")
.expectValidationError(
SqlPlanningException.class,
"Intervals for replace are empty"
)
.verify();
}
@Test
public void testReplaceForWithInvalidInterval()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-INVALID0:00' AND __time <= TIMESTAMP '2000-02-05 00:00:00' SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class)
.verify();
}
@Test
public void testReplaceForWithoutPartitionSpec()
{
testIngestionQuery()
.sql("REPLACE INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class)
.verify();
}
@Test
public void testReplaceFromView()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM view.aview PARTITIONED BY ALL TIME")
.expectTarget("dst", RowSignature.builder().add("dim1_firstchar", ColumnType.STRING).build())
.expectResources(viewRead("aview"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "substring(\"dim1\", 0, 1)", ColumnType.STRING))
.filters(selector("dim2", "a", null))
.columns("v0")
.context(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}
@Test
public void testReplaceIntoQualifiedTable()
{
testIngestionQuery()
.sql("REPLACE INTO druid.dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}
@Test
public void testReplaceIntoInvalidDataSourceName()
{
testIngestionQuery()
.sql("REPLACE INTO \"in/valid\" OVERWRITE ALL SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "Ingestion dataSource cannot contain the '/' character.")
.verify();
}
@Test
public void testReplaceUsingColumnList()
{
testIngestionQuery()
.sql("REPLACE INTO dst (foo, bar) OVERWRITE ALL SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "Ingestion with target column list is not supported.")
.verify();
}
@Test
public void testReplaceIntoSystemTable()
{
testIngestionQuery()
.sql("REPLACE INTO INFORMATION_SCHEMA.COLUMNS OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot ingest into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource."
)
.verify();
}
@Test
public void testReplaceIntoView()
{
testIngestionQuery()
.sql("REPLACE INTO view.aview OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot ingest into [view.aview] because it is not a Druid datasource."
)
.verify();
}
@Test
public void testReplaceFromUnauthorizedDataSource()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM \"%s\" PARTITIONED BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
.expectValidationError(ForbiddenException.class)
.verify();
}
@Test
public void testReplaceIntoUnauthorizedDataSource()
{
testIngestionQuery()
.sql("REPLACE INTO \"%s\" OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
.expectValidationError(ForbiddenException.class)
.verify();
}
@Test
public void testReplaceIntoNonexistentSchema()
{
testIngestionQuery()
.sql("REPLACE INTO nonexistent.dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot ingest into [nonexistent.dst] because it is not a Druid datasource."
)
.verify();
}
@Test
public void testReplaceFromExternal()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", externalDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}
@Test
public void testReplaceWithPartitionedByAndLimitOffset()
{
RowSignature targetRowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("floor_m1", ColumnType.FLOAT)
.add("dim1", ColumnType.STRING)
.build();
testIngestionQuery()
.sql(
"REPLACE INTO druid.dst OVERWRITE ALL SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo LIMIT 10 OFFSET 20 PARTITIONED BY DAY")
.expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "dim1", "v0")
.virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")", ColumnType.FLOAT))
.limit(10)
.offset(20)
.context(
addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.DAY),
"all"
)
)
.build()
)
.verify();
}
@Test
public void testReplaceWithPartitionedByContainingInvalidGranularity() throws Exception
{
// Throws a ValidationException, which gets converted to a SqlPlanningException before throwing to end user
try {
testQuery(
"REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY 'invalid_granularity'",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("Exception should be thrown");
}
catch (SqlPlanningException e) {
assertEquals(
"Encountered 'invalid_granularity' after PARTITIONED BY. Expected HOUR, DAY, MONTH, YEAR, ALL TIME, FLOOR function or TIME_FLOOR function",
e.getMessage()
);
}
didTest = true;
}
@Test
public void testExplainReplaceFromExternal() throws Exception
{
// Skip vectorization since otherwise the "context" will change for each subtest.
skipVectorize();
final ScanQuery expectedQuery = newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(
queryJsonMapper.readValue(
"{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}",
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
)
)
.build();
final String expectedExplanation =
"DruidQueryRel(query=["
+ queryJsonMapper.writeValueAsString(expectedQuery)
+ "], signature=[{x:STRING, y:STRING, z:LONG}])\n";
// Use testQuery for EXPLAIN (not testIngestionQuery).
testQuery(
new PlannerConfig(),
StringUtils.format(
"EXPLAIN PLAN FOR REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
),
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
ImmutableList.of(
new Object[]{
expectedExplanation,
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"
}
)
);
// Not using testIngestionQuery, so must set didTest manually to satisfy the check in tearDown.
didTest = true;
}
@Test
public void testExplainReplaceFromExternalUnauthorized()
{
// Use testQuery for EXPLAIN (not testIngestionQuery).
Assert.assertThrows(
ForbiddenException.class,
() ->
testQuery(
StringUtils.format(
"EXPLAIN PLAN FOR REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
),
ImmutableList.of(),
ImmutableList.of()
)
);
// Not using testIngestionQuery, so must set didTest manually to satisfy the check in tearDown.
didTest = true;
}
@Test
public void testReplaceFromExternalUnauthorized()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
.expectValidationError(ForbiddenException.class)
.verify();
}
@Test
public void testReplaceFromExternalProjectSort()
{
testIngestionQuery()
.sql(
"REPLACE INTO dst OVERWRITE ALL SELECT x || y AS xy, z FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", RowSignature.builder().add("xy", ColumnType.STRING).add("z", ColumnType.LONG).build())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
newScanQueryBuilder()
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn("v0", "concat(\"x\",\"y\")", ColumnType.STRING))
.columns("v0", "z")
.context(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}
@Test
public void testReplaceFromExternalAggregate()
{
testIngestionQuery()
.sql(
"REPLACE INTO dst OVERWRITE ALL SELECT x, SUM(z) AS sum_z, COUNT(*) AS cnt FROM %s GROUP BY 1 PARTITIONED BY ALL TIME",
externSql(externalDataSource)
)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget(
"dst",
RowSignature.builder()
.add("x", ColumnType.STRING)
.add("sum_z", ColumnType.LONG)
.add("cnt", ColumnType.LONG)
.build()
)
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
GroupByQuery.builder()
.setDataSource(externalDataSource)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("x", "d0")))
.setAggregatorSpecs(
new LongSumAggregatorFactory("a0", "z"),
new CountAggregatorFactory("a1")
)
.setContext(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}
@Test
public void testReplaceFromExternalAggregateAll()
{
testIngestionQuery()
.sql(
"REPLACE INTO dst OVERWRITE ALL SELECT COUNT(*) AS cnt FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget(
"dst",
RowSignature.builder()
.add("cnt", ColumnType.LONG)
.build()
)
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
.expectQuery(
GroupByQuery.builder()
.setDataSource(externalDataSource)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(new CountAggregatorFactory("a0"))
.setContext(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.parser;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.junit.Test;
import java.io.StringReader;
import static org.junit.Assert.assertEquals;
/**
* A class containing unit tests for testing implmentations of {@link org.apache.calcite.sql.SqlNode#unparse(SqlWriter, int, int)}
* in custom Druid SqlNode classes, like {@link DruidSqlInsert} and {@link DruidSqlReplace}.
*/
public class DruidSqlUnparseTest
{
@Test
public void testUnparseInsert() throws ParseException
{
String sqlQuery = "INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME";
String prettySqlQuery = "INSERT INTO \"dst\"\n"
+ "(SELECT *\n"
+ " FROM \"foo\") PARTITIONED BY ALL TIME";
DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
DruidSqlInsert druidSqlReplace = (DruidSqlInsert) druidSqlParser.DruidSqlInsertEof();
druidSqlReplace.unparse(sqlWriter, 0, 0);
assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
}
@Test
public void testUnparseReplaceAll() throws ParseException
{
String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME";
String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+ "OVERWRITE ALL\n"
+ "(SELECT *\n"
+ " FROM \"foo\")\n"
+ "PARTITIONED BY ALL TIME";
DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
DruidSqlReplace druidSqlReplace = (DruidSqlReplace) druidSqlParser.DruidSqlReplaceEof();
druidSqlReplace.unparse(sqlWriter, 0, 0);
assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
}
@Test
public void testUnparseReplaceWhere() throws ParseException
{
String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT * FROM foo PARTITIONED BY DAY";
String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+ "OVERWRITE \"__time\" >= TIMESTAMP '2000-01-01 00:00:00' AND \"__time\" < TIMESTAMP '2000-01-02 00:00:00'\n"
+ "(SELECT *\n"
+ " FROM \"foo\")\n"
+ "PARTITIONED BY DAY";
DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
DruidSqlReplace druidSqlReplace = (DruidSqlReplace) druidSqlParser.DruidSqlReplaceEof();
druidSqlReplace.unparse(sqlWriter, 0, 0);
assertEquals(prettySqlQuery, sqlWriter.toSqlString().getSql());
}
private final SqlWriter sqlWriter = new SqlPrettyWriter(CalciteSqlDialect.DEFAULT);
private static DruidSqlParserImpl createTestParser(String parseString)
{
DruidSqlParserImplFactory druidSqlParserImplFactory = new DruidSqlParserImplFactory();
DruidSqlParserImpl druidSqlParser = (DruidSqlParserImpl) druidSqlParserImplFactory.getParser(new StringReader(parseString));
druidSqlParser.setUnquotedCasing(Casing.TO_LOWER);
druidSqlParser.setQuotedCasing(Casing.TO_LOWER);
druidSqlParser.setIdentifierMaxLength(20);
return druidSqlParser;
}
}