Add syntax support for PARTITIONED BY/CLUSTERED BY in INSERT queries (#12163)

This PR aims to add parser changes for supporting PARTITIONED BY and CLUSTERED BY as proposed in the issue #11929.
This commit is contained in:
Laksh Singla 2022-02-08 16:23:15 +05:30 committed by GitHub
parent ae71e05fc5
commit 4add2510ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1686 additions and 29 deletions

View File

@ -28,6 +28,17 @@
Reference: https://github.com/apache/druid/pull/7894/files
-->
<FindBugsFilter>
<Match>
<Or>
<Class name="org.apache.druid.sql.calcite.parser.SimpleCharStream"/>
<Class name="org.apache.druid.sql.calcite.parser.TokenMgrError"/>
<Class name="org.apache.druid.sql.calcite.parser.Token"/>
<Class name="org.apache.druid.sql.calcite.parser.DruidSqlParserImplTokenManager"/>
<Class name="org.apache.druid.sql.calcite.parser.DruidSqlParserImplConstants"/>
<Class name="org.apache.druid.sql.calcite.parser.DruidSqlParserImpl"/>
<Class name="org.apache.druid.sql.calcite.parser.DruidSqlParserImpl$JJCalls"/>
</Or>
</Match>
<!-- Ignore "equals" bugs for JsonInclude filter classes. They rely on strange-looking "equals" methods. -->
<Match>
<And>

23
pom.xml
View File

@ -82,6 +82,8 @@
<apache.ranger.gson.version>2.2.4</apache.ranger.gson.version>
<avatica.version>1.17.0</avatica.version>
<avro.version>1.9.2</avro.version>
<!-- sql/src/main/codegen/config.fmpp is based on a file from calcite-core, and needs to be
updated when upgrading Calcite. Refer to the top-level comments in that file for details. -->
<calcite.version>1.21.0</calcite.version>
<datasketches.version>3.0.0</datasketches.version>
<datasketches.memory.version>2.0.0</datasketches.memory.version>
@ -1314,6 +1316,9 @@
</signaturesFiles>
<excludes>
<exclude>**/SomeAvroDatum.class</exclude>
<exclude>**/DruidSqlParserImpl.class</exclude>
<exclude>**/DruidSqlParserImplTokenManager.class</exclude>
<exclude>**/SimpleCharStream.class</exclude>
</excludes>
<suppressAnnotations>
<annotation>**.SuppressForbidden</annotation>
@ -1501,7 +1506,12 @@
</plugins>
<pluginManagement>
<plugins>
<plugin>
<plugin>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
@ -1601,6 +1611,16 @@
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
@ -1888,6 +1908,7 @@
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>.editorconfig</exclude>
<exclude>**/hadoop.indexer.libs.version</exclude>
<exclude>**/codegen/**</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -180,6 +180,11 @@
<artifactId>validation-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
@ -255,7 +260,126 @@
</execution>
</executions>
</plugin>
<!-- Following plugins and their configurations are used to generate the custom Calcite's SQL parser for Druid -->
<!-- Extracts the Parser.jj from Calcite to ${project.build.directory}, where all the Freemarker templates are -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<!-- Copy the templates present in the codegen directory of druid-sql containing custom SQL rules to
${project.build.directory}/codegen -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<!-- "Plugs in" the Calcite's Parser.jj with the variables present in config.fmpp. These contain the custom rules
as well as the class to which the custom implementation will get generated -->
<plugin>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>${project.build.directory}/generated-sources/annotations</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- Creates a Java class for the custom parser from the Parser.jj -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/annotations</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/annotations</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- Adds the path of the generated parser to the classpath -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/generated-sources/annotations</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- Build for Druid's custom SQL parser complete -->
</project>

View File

@ -0,0 +1,447 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to you under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is an FMPP (http://fmpp.sourceforge.net/) configuration file to
# allow clients to extend Calcite's SQL parser to support application specific
# SQL statements, literals or data types.
#
# Calcite's parser grammar file (Parser.jj) is written in javacc
# (http://javacc.java.net/) with Freemarker (http://freemarker.org/) variables
# to allow clients to:
# 1. have custom parser implementation class and package name.
# 2. insert new parser method implementations written in javacc to parse
# custom:
# a) SQL statements.
# b) literals.
# c) data types.
# 3. add new keywords to support custom SQL constructs added as part of (2).
# 4. add import statements needed by inserted custom parser implementations.
#
# Parser template file (Parser.jj) along with this file are packaged as
# part of the calcite-core-<version>.jar under "codegen" directory.
# This file is directly copied from calite-core-1.21.0.jar/codegen/config.fmpp, and then modified slightly.
# While not a necessary requirement, it would be ideal if it is kept in line with calcite-core's version. In the newer
# Calcite versions, there is a default_config.fmpp which will free us from maintaining this file.
#
# Following clauses are modified in the file:
# 1. data.parser.package & data.parser.class
# 2. data.parser.imports
# 3. data.parser.keywords (Added "CLUSTERED", "PARTITIONED")
# 4. data.parser.statementParserMethods
# 5. data.parser.implementationFiles
data: {
parser: {
# Generated parser implementation package and class name.
package: "org.apache.druid.sql.calcite.parser",
class: "DruidSqlParserImpl",
# List of additional classes and packages to import.
# Example. "org.apache.calcite.sql.*", "java.util.List".
imports: [
"org.apache.calcite.sql.SqlNode"
"org.apache.calcite.sql.SqlInsert"
"org.apache.druid.java.util.common.granularity.Granularity"
"org.apache.druid.java.util.common.granularity.Granularities"
"org.apache.druid.sql.calcite.parser.DruidSqlInsert"
"org.apache.druid.sql.calcite.parser.DruidSqlParserUtils"
]
# List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved
# keyword add it to 'nonReservedKeywords' section.
keywords: [
"CLUSTERED"
"PARTITIONED"
]
# List of keywords from "keywords" section that are not reserved.
nonReservedKeywords: [
"A"
"ABSENT"
"ABSOLUTE"
"ACTION"
"ADA"
"ADD"
"ADMIN"
"AFTER"
"ALWAYS"
"APPLY"
"ASC"
"ASSERTION"
"ASSIGNMENT"
"ATTRIBUTE"
"ATTRIBUTES"
"BEFORE"
"BERNOULLI"
"BREADTH"
"C"
"CASCADE"
"CATALOG"
"CATALOG_NAME"
"CENTURY"
"CHAIN"
"CHARACTER_SET_CATALOG"
"CHARACTER_SET_NAME"
"CHARACTER_SET_SCHEMA"
"CHARACTERISTICS"
"CHARACTERS"
"CLASS_ORIGIN"
"COBOL"
"COLLATION"
"COLLATION_CATALOG"
"COLLATION_NAME"
"COLLATION_SCHEMA"
"COLUMN_NAME"
"COMMAND_FUNCTION"
"COMMAND_FUNCTION_CODE"
"COMMITTED"
"CONDITION_NUMBER"
"CONDITIONAL"
"CONNECTION"
"CONNECTION_NAME"
"CONSTRAINT_CATALOG"
"CONSTRAINT_NAME"
"CONSTRAINT_SCHEMA"
"CONSTRAINTS"
"CONSTRUCTOR"
"CONTINUE"
"CURSOR_NAME"
"DATA"
"DATABASE"
"DATETIME_INTERVAL_CODE"
"DATETIME_INTERVAL_PRECISION"
"DECADE"
"DEFAULTS"
"DEFERRABLE"
"DEFERRED"
"DEFINED"
"DEFINER"
"DEGREE"
"DEPTH"
"DERIVED"
"DESC"
"DESCRIPTION"
"DESCRIPTOR"
"DIAGNOSTICS"
"DISPATCH"
"DOMAIN"
"DOW"
"DOY"
"DYNAMIC_FUNCTION"
"DYNAMIC_FUNCTION_CODE"
"ENCODING"
"EPOCH"
"ERROR"
"EXCEPTION"
"EXCLUDE"
"EXCLUDING"
"FINAL"
"FIRST"
"FOLLOWING"
"FORMAT"
"FORTRAN"
"FOUND"
"FRAC_SECOND"
"G"
"GENERAL"
"GENERATED"
"GEOMETRY"
"GO"
"GOTO"
"GRANTED"
"HIERARCHY"
"IGNORE"
"IMMEDIATE"
"IMMEDIATELY"
"IMPLEMENTATION"
"INCLUDING"
"INCREMENT"
"INITIALLY"
"INPUT"
"INSTANCE"
"INSTANTIABLE"
"INVOKER"
"ISODOW"
"ISOYEAR"
"ISOLATION"
"JAVA"
"JSON"
"K"
"KEY"
"KEY_MEMBER"
"KEY_TYPE"
"LABEL"
"LAST"
"LENGTH"
"LEVEL"
"LIBRARY"
"LOCATOR"
"M"
"MAP"
"MATCHED"
"MAXVALUE"
"MICROSECOND"
"MESSAGE_LENGTH"
"MESSAGE_OCTET_LENGTH"
"MESSAGE_TEXT"
"MILLISECOND"
"MILLENNIUM"
"MINVALUE"
"MORE_"
"MUMPS"
"NAME"
"NAMES"
"NANOSECOND"
"NESTING"
"NORMALIZED"
"NULLABLE"
"NULLS"
"NUMBER"
"OBJECT"
"OCTETS"
"OPTION"
"OPTIONS"
"ORDERING"
"ORDINALITY"
"OTHERS"
"OUTPUT"
"OVERRIDING"
"PAD"
"PARAMETER_MODE"
"PARAMETER_NAME"
"PARAMETER_ORDINAL_POSITION"
"PARAMETER_SPECIFIC_CATALOG"
"PARAMETER_SPECIFIC_NAME"
"PARAMETER_SPECIFIC_SCHEMA"
"PARTIAL"
"PASCAL"
"PASSING"
"PASSTHROUGH"
"PAST"
"PATH"
"PLACING"
"PLAN"
"PLI"
"PRECEDING"
"PRESERVE"
"PRIOR"
"PRIVILEGES"
"PUBLIC"
"QUARTER"
"READ"
"RELATIVE"
"REPEATABLE"
"REPLACE"
"RESPECT"
"RESTART"
"RESTRICT"
"RETURNED_CARDINALITY"
"RETURNED_LENGTH"
"RETURNED_OCTET_LENGTH"
"RETURNED_SQLSTATE"
"RETURNING"
"ROLE"
"ROUTINE"
"ROUTINE_CATALOG"
"ROUTINE_NAME"
"ROUTINE_SCHEMA"
"ROW_COUNT"
"SCALAR"
"SCALE"
"SCHEMA"
"SCHEMA_NAME"
"SCOPE_CATALOGS"
"SCOPE_NAME"
"SCOPE_SCHEMA"
"SECTION"
"SECURITY"
"SELF"
"SEQUENCE"
"SERIALIZABLE"
"SERVER"
"SERVER_NAME"
"SESSION"
"SETS"
"SIMPLE"
"SIZE"
"SOURCE"
"SPACE"
"SPECIFIC_NAME"
"SQL_BIGINT"
"SQL_BINARY"
"SQL_BIT"
"SQL_BLOB"
"SQL_BOOLEAN"
"SQL_CHAR"
"SQL_CLOB"
"SQL_DATE"
"SQL_DECIMAL"
"SQL_DOUBLE"
"SQL_FLOAT"
"SQL_INTEGER"
"SQL_INTERVAL_DAY"
"SQL_INTERVAL_DAY_TO_HOUR"
"SQL_INTERVAL_DAY_TO_MINUTE"
"SQL_INTERVAL_DAY_TO_SECOND"
"SQL_INTERVAL_HOUR"
"SQL_INTERVAL_HOUR_TO_MINUTE"
"SQL_INTERVAL_HOUR_TO_SECOND"
"SQL_INTERVAL_MINUTE"
"SQL_INTERVAL_MINUTE_TO_SECOND"
"SQL_INTERVAL_MONTH"
"SQL_INTERVAL_SECOND"
"SQL_INTERVAL_YEAR"
"SQL_INTERVAL_YEAR_TO_MONTH"
"SQL_LONGVARBINARY"
"SQL_LONGVARNCHAR"
"SQL_LONGVARCHAR"
"SQL_NCHAR"
"SQL_NCLOB"
"SQL_NUMERIC"
"SQL_NVARCHAR"
"SQL_REAL"
"SQL_SMALLINT"
"SQL_TIME"
"SQL_TIMESTAMP"
"SQL_TINYINT"
"SQL_TSI_DAY"
"SQL_TSI_FRAC_SECOND"
"SQL_TSI_HOUR"
"SQL_TSI_MICROSECOND"
"SQL_TSI_MINUTE"
"SQL_TSI_MONTH"
"SQL_TSI_QUARTER"
"SQL_TSI_SECOND"
"SQL_TSI_WEEK"
"SQL_TSI_YEAR"
"SQL_VARBINARY"
"SQL_VARCHAR"
"STATE"
"STATEMENT"
"STRUCTURE"
"STYLE"
"SUBCLASS_ORIGIN"
"SUBSTITUTE"
"TABLE_NAME"
"TEMPORARY"
"TIES"
"TIMESTAMPADD"
"TIMESTAMPDIFF"
"TOP_LEVEL_COUNT"
"TRANSACTION"
"TRANSACTIONS_ACTIVE"
"TRANSACTIONS_COMMITTED"
"TRANSACTIONS_ROLLED_BACK"
"TRANSFORM"
"TRANSFORMS"
"TRIGGER_CATALOG"
"TRIGGER_NAME"
"TRIGGER_SCHEMA"
"TYPE"
"UNBOUNDED"
"UNCOMMITTED"
"UNCONDITIONAL"
"UNDER"
"UNNAMED"
"USAGE"
"USER_DEFINED_TYPE_CATALOG"
"USER_DEFINED_TYPE_CODE"
"USER_DEFINED_TYPE_NAME"
"USER_DEFINED_TYPE_SCHEMA"
"UTF8"
"UTF16"
"UTF32"
"VERSION"
"VIEW"
"WEEK"
"WRAPPER"
"WORK"
"WRITE"
"XML"
"ZONE"
]
# List of additional join types. Each is a method with no arguments.
# Example: LeftSemiJoin()
joinTypes: [
]
# List of methods for parsing custom SQL statements.
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
"DruidSqlInsert()"
]
# List of methods for parsing custom literals.
# Return type of method implementation should be "SqlNode".
# Example: ParseJsonLiteral().
literalParserMethods: [
]
# List of methods for parsing custom data types.
# Return type of method implementation should be "SqlTypeNameSpec".
# Example: SqlParseTimeStampZ().
dataTypeParserMethods: [
]
# List of methods for parsing builtin function calls.
# Return type of method implementation should be "SqlNode".
# Example: DateFunctionCall().
builtinFunctionCallMethods: [
]
# List of methods for parsing extensions to "ALTER <scope>" calls.
# Each must accept arguments "(SqlParserPos pos, String scope)".
# Example: "SqlUploadJarNode"
alterStatementParserMethods: [
]
# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
createStatementParserMethods: [
]
# List of methods for parsing extensions to "DROP" calls.
# Each must accept arguments "(SqlParserPos pos)".
dropStatementParserMethods: [
]
# Binary operators tokens
binaryOperatorsTokens: [
]
# Binary operators initialization
extraBinaryExpressions: [
]
# List of files in @includes directory that have parser method
# implementations for parsing custom SQL statements, literals or types
# given as part of "statementParserMethods", "literalParserMethods" or
# "dataTypeParserMethods".
implementationFiles: [
"insert.ftl"
]
includePosixOperators: false
includeCompoundIdentifier: true
includeBraces: true
includeAdditionalDeclarations: false
}
}
freemarkerLinks: {
includes: includes/
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
SqlNode DruidSqlInsert() :
{
SqlNode insertNode;
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = null;
SqlNodeList clusteredBy = null;
}
{
insertNode = SqlInsert()
<PARTITIONED> <BY>
partitionedBy = PartitionGranularity()
[
<CLUSTERED> <BY>
clusteredBy = ClusterItems()
]
{
if (!(insertNode instanceof SqlInsert)) {
// This shouldn't be encountered, but done as a defensive practice. SqlInsert() always returns a node of type
// SqlInsert
return insertNode;
}
SqlInsert sqlInsert = (SqlInsert) insertNode;
return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy);
}
}
SqlNodeList ClusterItems() :
{
List<SqlNode> list;
final Span s;
SqlNode e;
}
{
e = OrderItem() {
s = span();
list = startList(e);
}
(
LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
)*
{
return new SqlNodeList(list, s.addAll(list).pos());
}
}
org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity() :
{
SqlNode e = null;
org.apache.druid.java.util.common.granularity.Granularity granularity = null;
String unparseString = null;
}
{
(
<HOUR>
{
granularity = Granularities.HOUR;
unparseString = "HOUR";
}
|
<DAY>
{
granularity = Granularities.DAY;
unparseString = "DAY";
}
|
<MONTH>
{
granularity = Granularities.MONTH;
unparseString = "MONTH";
}
|
<YEAR>
{
granularity = Granularities.YEAR;
unparseString = "YEAR";
}
|
<ALL> <TIME>
{
granularity = Granularities.ALL;
unparseString = "ALL TIME";
}
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
{
granularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
unparseString = e.toString();
}
)
{
return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
}
}

View File

@ -52,8 +52,9 @@ import java.util.stream.Collectors;
public class TimeFloorOperatorConversion implements SqlOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("TIME_FLOOR")
public static final String SQL_FUNCTION_NAME = "TIME_FLOOR";
public static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(SQL_FUNCTION_NAME)
.operandTypes(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER, SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER)
.requiredOperands(2)
.returnTypeCascadeNullable(SqlTypeName.TIMESTAMP)

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.druid.java.util.common.granularity.Granularity;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* Extends the 'insert' call to hold custom parameters specific to Druid i.e. PARTITIONED BY and CLUSTERED BY
* This class extends the {@link SqlInsert} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
*/
public class DruidSqlInsert extends SqlInsert
{
public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity";
// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
private final Granularity partitionedBy;
private final String partitionedByStringForUnparse;
@Nullable
private final SqlNodeList clusteredBy;
public DruidSqlInsert(
@Nonnull SqlInsert insertNode,
@Nonnull Granularity partitionedBy,
@Nonnull String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
)
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList()
);
Preconditions.checkNotNull(partitionedBy); // Shouldn't hit due to how the parser is written
this.partitionedBy = partitionedBy;
Preconditions.checkNotNull(partitionedByStringForUnparse);
this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.clusteredBy = clusteredBy;
}
@Nullable
public SqlNodeList getClusteredBy()
{
return clusteredBy;
}
public Granularity getPartitionedBy()
{
return partitionedBy;
}
@Nonnull
@Override
public SqlOperator getOperator()
{
return OPERATOR;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
{
super.unparse(writer, leftPrec, rightPrec);
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
if (getClusteredBy() != null) {
writer.keyword("CLUSTERED BY");
SqlWriter.Frame frame = writer.startList("", "");
for (SqlNode clusterByOpts : getClusteredBy().getList()) {
clusterByOpts.unparse(writer, leftPrec, rightPrec);
}
writer.endList(frame);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.parser;
import org.apache.calcite.sql.parser.SqlAbstractParserImpl;
import org.apache.calcite.sql.parser.SqlParserImplFactory;
import java.io.Reader;
public class DruidSqlParserImplFactory implements SqlParserImplFactory
{
@Override
public SqlAbstractParserImpl getParser(Reader stream)
{
return new DruidSqlParserImpl(stream);
}
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.expression.TimeUnits;
import org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion;
import org.joda.time.Period;
import java.util.List;
public class DruidSqlParserUtils
{
private static final Logger log = new Logger(DruidSqlParserUtils.class);
/**
* Delegates to {@code convertSqlNodeToGranularity} and converts the exceptions to {@link ParseException}
* with the underlying message
*/
public static Granularity convertSqlNodeToGranularityThrowingParseExceptions(SqlNode sqlNode) throws ParseException
{
try {
return convertSqlNodeToGranularity(sqlNode);
}
catch (Exception e) {
log.debug(e, StringUtils.format("Unable to convert %s to a valid granularity.", sqlNode.toString()));
throw new ParseException(e.getMessage());
}
}
/**
* This method is used to extract the granularity from a SqlNode representing following function calls:
* 1. FLOOR(__time TO TimeUnit)
* 2. TIME_FLOOR(__time, 'PT1H')
*
* Validation on the sqlNode is contingent to following conditions:
* 1. sqlNode is an instance of SqlCall
* 2. Operator is either one of TIME_FLOOR or FLOOR
* 3. Number of operands in the call are 2
* 4. First operand is a SimpleIdentifier representing __time
* 5. If operator is TIME_FLOOR, the second argument is a literal, and can be converted to the Granularity class
* 6. If operator is FLOOR, the second argument is a TimeUnit, and can be mapped using {@link TimeUnits}
*
* Since it is to be used primarily while parsing the SqlNode, it is wrapped in {@code convertSqlNodeToGranularityThrowingParseExceptions}
*
* @param sqlNode SqlNode representing a call to a function
* @return Granularity as intended by the function call
* @throws ParseException SqlNode cannot be converted a granularity
*/
public static Granularity convertSqlNodeToGranularity(SqlNode sqlNode) throws ParseException
{
final String genericParseFailedMessageFormatString = "Encountered %s after PARTITIONED BY. "
+ "Expected HOUR, DAY, MONTH, YEAR, ALL TIME, FLOOR function or %s function";
if (!(sqlNode instanceof SqlCall)) {
throw new ParseException(StringUtils.format(
genericParseFailedMessageFormatString,
sqlNode.toString(),
TimeFloorOperatorConversion.SQL_FUNCTION_NAME
));
}
SqlCall sqlCall = (SqlCall) sqlNode;
String operatorName = sqlCall.getOperator().getName();
Preconditions.checkArgument(
"FLOOR".equalsIgnoreCase(operatorName)
|| TimeFloorOperatorConversion.SQL_FUNCTION_NAME.equalsIgnoreCase(operatorName),
StringUtils.format(
"PARTITIONED BY clause only supports FLOOR(__time TO <unit> and %s(__time, period) functions",
TimeFloorOperatorConversion.SQL_FUNCTION_NAME
)
);
List<SqlNode> operandList = sqlCall.getOperandList();
Preconditions.checkArgument(
operandList.size() == 2,
StringUtils.format("%s in PARTITIONED BY clause must have two arguments", operatorName)
);
// Check if the first argument passed in the floor function is __time
SqlNode timeOperandSqlNode = operandList.get(0);
Preconditions.checkArgument(
timeOperandSqlNode.getKind().equals(SqlKind.IDENTIFIER),
StringUtils.format("First argument to %s in PARTITIONED BY clause can only be __time", operatorName)
);
SqlIdentifier timeOperandSqlIdentifier = (SqlIdentifier) timeOperandSqlNode;
Preconditions.checkArgument(
timeOperandSqlIdentifier.getSimple().equals(ColumnHolder.TIME_COLUMN_NAME),
StringUtils.format("First argument to %s in PARTITIONED BY clause can only be __time", operatorName)
);
// If the floor function is of form TIME_FLOOR(__time, 'PT1H')
if (operatorName.equalsIgnoreCase(TimeFloorOperatorConversion.SQL_FUNCTION_NAME)) {
SqlNode granularitySqlNode = operandList.get(1);
Preconditions.checkArgument(
granularitySqlNode.getKind().equals(SqlKind.LITERAL),
"Second argument to TIME_FLOOR in PARTITIONED BY clause must be a period like 'PT1H'"
);
String granularityString = SqlLiteral.unchain(granularitySqlNode).toValue();
Period period;
try {
period = new Period(granularityString);
}
catch (IllegalArgumentException e) {
throw new ParseException(StringUtils.format("%s is an invalid period string", granularitySqlNode.toString()));
}
return new PeriodGranularity(period, null, null);
} else if ("FLOOR".equalsIgnoreCase(operatorName)) { // If the floor function is of form FLOOR(__time TO DAY)
SqlNode granularitySqlNode = operandList.get(1);
// In future versions of Calcite, this can be checked via
// granularitySqlNode.getKind().equals(SqlKind.INTERVAL_QUALIFIER)
Preconditions.checkArgument(
granularitySqlNode instanceof SqlIntervalQualifier,
"Second argument to the FLOOR function in PARTITIONED BY clause is not a valid granularity. "
+ "Please refer to the documentation of FLOOR function"
);
SqlIntervalQualifier granularityIntervalQualifier = (SqlIntervalQualifier) granularitySqlNode;
Period period = TimeUnits.toPeriod(granularityIntervalQualifier.timeUnitRange);
Preconditions.checkNotNull(
period,
StringUtils.format(
"%s is not a valid granularity for ingestion",
granularityIntervalQualifier.timeUnitRange.toString()
)
);
return new PeriodGranularity(period, null, null);
}
// Shouldn't reach here
throw new ParseException(StringUtils.format(
genericParseFailedMessageFormatString,
sqlNode.toString(),
TimeFloorOperatorConversion.SQL_FUNCTION_NAME
));
}
}

View File

@ -56,6 +56,8 @@ import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidator;
@ -68,6 +70,7 @@ import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -80,6 +83,7 @@ import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.DruidRel;
@ -199,6 +203,18 @@ public class DruidPlanner implements Closeable
final ParsedNodes parsed = ParsedNodes.create(planner.parse(plannerContext.getSql()));
try {
if (parsed.getIngestionGranularity() != null) {
plannerContext.getQueryContext().put(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
plannerContext.getJsonMapper().writeValueAsString(parsed.getIngestionGranularity())
);
}
}
catch (JsonProcessingException e) {
throw new ValidationException("Unable to serialize partition granularity.");
}
// the planner's type factory is not available until after parsing
this.rexBuilder = new RexBuilder(planner.getTypeFactory());
final SqlNode parameterizedQueryNode = rewriteDynamicParameters(parsed.getQueryNode());
@ -481,6 +497,7 @@ public class DruidPlanner implements Closeable
* DruidRel (B)
* DruidRel(C)
* will return [DruidRel(A), DruidRel(B), DruidRel(C)]
*
* @param outermostDruidRel The outermost rel which is to be flattened
* @return a list of DruidRel's which donot have a DruidUnionRel nested in between them
*/
@ -495,7 +512,8 @@ public class DruidPlanner implements Closeable
* Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if
* they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the
* nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()}
* @param druidRel The current relNode
*
* @param druidRel The current relNode
* @param flattendListAccumulator Accumulator list which needs to be appended by this method
*/
private void flattenOutermostRel(DruidRel<?> druidRel, List<DruidRel<?>> flattendListAccumulator)
@ -737,18 +755,27 @@ public class DruidPlanner implements Closeable
private static class ParsedNodes
{
@Nullable
private SqlExplain explain;
private final SqlExplain explain;
@Nullable
private SqlInsert insert;
private final SqlInsert insert;
private SqlNode query;
private final SqlNode query;
private ParsedNodes(@Nullable SqlExplain explain, @Nullable SqlInsert insert, SqlNode query)
@Nullable
private final Granularity ingestionGranularity;
private ParsedNodes(
@Nullable SqlExplain explain,
@Nullable SqlInsert insert,
SqlNode query,
@Nullable Granularity ingestionGranularity
)
{
this.explain = explain;
this.insert = insert;
this.query = query;
this.ingestionGranularity = ingestionGranularity;
}
static ParsedNodes create(final SqlNode node) throws ValidationException
@ -756,6 +783,7 @@ public class DruidPlanner implements Closeable
SqlExplain explain = null;
SqlInsert insert = null;
SqlNode query = node;
Granularity ingestionGranularity = null;
if (query.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) query;
@ -765,13 +793,55 @@ public class DruidPlanner implements Closeable
if (query.getKind() == SqlKind.INSERT) {
insert = (SqlInsert) query;
query = insert.getSource();
// Check if ORDER BY clause is not provided to the underlying query
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
SqlNodeList orderByList = sqlOrderBy.orderList;
if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
throw new ValidationException("Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.");
}
}
// Processing to be done when the original query has either of the PARTITIONED BY or CLUSTERED BY clause
// The following condition should always be true however added defensively
if (insert instanceof DruidSqlInsert) {
DruidSqlInsert druidSqlInsert = (DruidSqlInsert) insert;
ingestionGranularity = druidSqlInsert.getPartitionedBy();
if (druidSqlInsert.getClusteredBy() != null) {
// If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new SqlOrderBy
// node
SqlNode offset = null;
SqlNode fetch = null;
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
// This represents the underlying query free of OFFSET, FETCH and ORDER BY clauses
// For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 10 FETCH 30 ORDER BY dim1 GROUP BY dim1
// this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1
query = sqlOrderBy.query;
offset = sqlOrderBy.offset;
fetch = sqlOrderBy.fetch;
}
// Creates a new SqlOrderBy query, which may have our CLUSTERED BY overwritten
query = new SqlOrderBy(
query.getParserPosition(),
query,
druidSqlInsert.getClusteredBy(),
offset,
fetch
);
}
}
}
if (!query.isA(SqlKind.QUERY)) {
throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
}
return new ParsedNodes(explain, insert, query);
return new ParsedNodes(explain, insert, query, ingestionGranularity);
}
@Nullable
@ -790,5 +860,11 @@ public class DruidPlanner implements Closeable
{
return query;
}
@Nullable
public Granularity getIngestionGranularity()
{
return ingestionGranularity;
}
}
}

View File

@ -41,6 +41,7 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.parser.DruidSqlParserImplFactory;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
@ -57,6 +58,7 @@ public class PlannerFactory
.setQuotedCasing(Casing.UNCHANGED)
.setQuoting(Quoting.DOUBLE_QUOTE)
.setConformance(DruidConformance.instance())
.setParserFactory(new DruidSqlParserImplFactory()) // Custom sql parser factory
.build();
private final DruidSchemaCatalog rootSchema;

View File

@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.Query;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -51,6 +52,7 @@ import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@ -60,6 +62,7 @@ import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@ -97,6 +100,11 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.build()
);
private static final Map<String, Object> PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT = ImmutableMap.of(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
"{\"type\":\"all\"}"
);
private boolean didTest = false;
@After
@ -115,7 +123,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertFromTable()
{
testInsertQuery()
.sql("INSERT INTO dst SELECT * FROM foo")
.sql("INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
@ -123,6 +131,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
@ -132,7 +141,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertFromView()
{
testInsertQuery()
.sql("INSERT INTO dst SELECT * FROM view.aview")
.sql("INSERT INTO dst SELECT * FROM view.aview PARTITIONED BY ALL TIME")
.expectTarget("dst", RowSignature.builder().add("dim1_firstchar", ColumnType.STRING).build())
.expectResources(viewRead("aview"), dataSourceWrite("dst"))
.expectQuery(
@ -142,6 +151,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.virtualColumns(expressionVirtualColumn("v0", "substring(\"dim1\", 0, 1)", ColumnType.STRING))
.filters(selector("dim2", "a", null))
.columns("v0")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
@ -151,7 +161,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertIntoExistingTable()
{
testInsertQuery()
.sql("INSERT INTO foo SELECT * FROM foo")
.sql("INSERT INTO foo SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("foo", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
.expectQuery(
@ -159,6 +169,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
@ -168,7 +179,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertIntoQualifiedTable()
{
testInsertQuery()
.sql("INSERT INTO druid.dst SELECT * FROM foo")
.sql("INSERT INTO druid.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectTarget("dst", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
@ -176,6 +187,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
@ -185,7 +197,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertIntoInvalidDataSourceName()
{
testInsertQuery()
.sql("INSERT INTO \"in/valid\" SELECT dim1, dim2 FROM foo")
.sql("INSERT INTO \"in/valid\" SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "INSERT dataSource cannot contain the '/' character.")
.verify();
}
@ -194,7 +206,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertUsingColumnList()
{
testInsertQuery()
.sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo")
.sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "INSERT with target column list is not supported.")
.verify();
}
@ -203,7 +215,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testUpsert()
{
testInsertQuery()
.sql("UPSERT INTO dst SELECT * FROM foo")
.sql("UPSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "UPSERT is not supported.")
.verify();
}
@ -212,7 +224,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertIntoSystemTable()
{
testInsertQuery()
.sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo")
.sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource."
@ -224,7 +236,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertIntoView()
{
testInsertQuery()
.sql("INSERT INTO view.aview SELECT * FROM foo")
.sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [view.aview] because it is not a Druid datasource."
@ -236,7 +248,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertFromUnauthorizedDataSource()
{
testInsertQuery()
.sql("INSERT INTO dst SELECT * FROM \"%s\"", CalciteTests.FORBIDDEN_DATASOURCE)
.sql("INSERT INTO dst SELECT * FROM \"%s\" PARTITIONED BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
.expectValidationError(ForbiddenException.class)
.verify();
}
@ -245,7 +257,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertIntoUnauthorizedDataSource()
{
testInsertQuery()
.sql("INSERT INTO \"%s\" SELECT * FROM foo", CalciteTests.FORBIDDEN_DATASOURCE)
.sql("INSERT INTO \"%s\" SELECT * FROM foo PARTITIONED BY ALL TIME", CalciteTests.FORBIDDEN_DATASOURCE)
.expectValidationError(ForbiddenException.class)
.verify();
}
@ -254,7 +266,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertIntoNonexistentSchema()
{
testInsertQuery()
.sql("INSERT INTO nonexistent.dst SELECT * FROM foo")
.sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [nonexistent.dst] because it is not a Druid datasource."
@ -266,7 +278,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertFromExternal()
{
testInsertQuery()
.sql("INSERT INTO dst SELECT * FROM %s", externSql(externalDataSource))
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", externalDataSource.getSignature())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
@ -275,11 +287,282 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.dataSource(externalDataSource)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("x", "y", "z")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
@Test
public void testInsertWithPartitionedBy()
{
// Test correctness of the query when only PARTITIONED BY clause is present
RowSignature targetRowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("floor_m1", ColumnType.FLOAT)
.add("dim1", ColumnType.STRING)
.build();
testInsertQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo PARTITIONED BY TIME_FLOOR(__time, 'PT1H')")
.expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "dim1", "v0")
.virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")", ColumnType.FLOAT))
.context(queryContextWithGranularity(Granularities.HOUR))
.build()
)
.verify();
}
@Test
public void testPartitionedBySupportedClauses()
{
RowSignature targetRowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();
Map<String, Granularity> partitionedByArgumentToGranularityMap =
ImmutableMap.<String, Granularity>builder()
.put("HOUR", Granularities.HOUR)
.put("DAY", Granularities.DAY)
.put("MONTH", Granularities.MONTH)
.put("YEAR", Granularities.YEAR)
.put("ALL TIME", Granularities.ALL)
.put("FLOOR(__time TO QUARTER)", Granularities.QUARTER)
.put("TIME_FLOOR(__time, 'PT1H')", Granularities.HOUR)
.build();
partitionedByArgumentToGranularityMap.forEach((partitionedByArgument, expectedGranularity) -> {
Map<String, Object> queryContext = null;
try {
queryContext = ImmutableMap.of(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, queryJsonMapper.writeValueAsString(expectedGranularity)
);
}
catch (JsonProcessingException e) {
// Won't reach here
Assert.fail(e.getMessage());
}
testInsertQuery()
.sql(StringUtils.format(
"INSERT INTO druid.dst SELECT __time, dim1 FROM foo PARTITIONED BY %s",
partitionedByArgument
))
.expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "dim1")
.context(queryContext)
.build()
)
.verify();
didTest = false;
});
didTest = true;
}
@Test
public void testInsertWithClusteredBy()
{
// Test correctness of the query when only CLUSTERED BY clause is present
RowSignature targetRowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("floor_m1", ColumnType.FLOAT)
.add("dim1", ColumnType.STRING)
.add("EXPR$3", ColumnType.DOUBLE)
.build();
testInsertQuery()
.sql(
"INSERT INTO druid.dst "
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) FROM foo "
+ "PARTITIONED BY FLOOR(__time TO DAY) CLUSTERED BY 2, dim1 DESC, CEIL(m2)"
)
.expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "dim1", "v0", "v1")
.virtualColumns(
expressionVirtualColumn("v0", "floor(\"m1\")", ColumnType.FLOAT),
expressionVirtualColumn("v1", "ceil(\"m2\")", ColumnType.DOUBLE)
)
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("dim1", ScanQuery.Order.DESCENDING),
new ScanQuery.OrderBy("v1", ScanQuery.Order.ASCENDING)
)
)
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
@Test
public void testInsertWithPartitionedByAndClusteredBy()
{
// Test correctness of the query when both PARTITIONED BY and CLUSTERED BY clause is present
RowSignature targetRowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("floor_m1", ColumnType.FLOAT)
.add("dim1", ColumnType.STRING)
.build();
testInsertQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo PARTITIONED BY DAY CLUSTERED BY 2, dim1")
.expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "dim1", "v0")
.virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")", ColumnType.FLOAT))
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING)
)
)
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
@Test
public void testInsertWithPartitionedByAndLimitOffset()
{
RowSignature targetRowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("floor_m1", ColumnType.FLOAT)
.add("dim1", ColumnType.STRING)
.build();
testInsertQuery()
.sql(
"INSERT INTO druid.dst SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo LIMIT 10 OFFSET 20 PARTITIONED BY DAY")
.expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "dim1", "v0")
.virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")", ColumnType.FLOAT))
.limit(10)
.offset(20)
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
@Test
public void testInsertWithClusteredByAndOrderBy() throws Exception
{
try {
testQuery(
StringUtils.format(
"INSERT INTO dst SELECT * FROM %s ORDER BY 2 PARTITIONED BY ALL TIME",
externSql(externalDataSource)
),
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("Exception should be thrown");
}
catch (SqlPlanningException e) {
Assert.assertEquals(
"Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
e.getMessage()
);
}
didTest = true;
}
@Test
public void testInsertWithPartitionedByContainingInvalidGranularity() throws Exception
{
// Throws a ValidationException, which gets converted to a SqlPlanningException before throwing to end user
try {
testQuery(
"INSERT INTO dst SELECT * FROM foo PARTITIONED BY 'invalid_granularity'",
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("Exception should be thrown");
}
catch (SqlPlanningException e) {
Assert.assertEquals(
"Encountered 'invalid_granularity' after PARTITIONED BY. Expected HOUR, DAY, MONTH, YEAR, ALL TIME, FLOOR function or TIME_FLOOR function",
e.getMessage()
);
}
didTest = true;
}
@Test
public void testInsertWithOrderBy() throws Exception
{
try {
testQuery(
StringUtils.format(
"INSERT INTO dst SELECT * FROM %s ORDER BY 2 PARTITIONED BY ALL TIME",
externSql(externalDataSource)
),
ImmutableList.of(),
ImmutableList.of()
);
Assert.fail("Exception should be thrown");
}
catch (SqlPlanningException e) {
Assert.assertEquals(
"Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
e.getMessage()
);
}
finally {
didTest = true;
}
}
// Currently EXPLAIN PLAN FOR doesn't work with the modified syntax
@Ignore
@Test
public void testExplainInsertWithPartitionedByAndClusteredBy()
{
Assert.assertThrows(
SqlPlanningException.class,
() ->
testQuery(
StringUtils.format(
"EXPLAIN PLAN FOR INSERT INTO dst SELECT * FROM %s PARTITIONED BY DAY CLUSTERED BY 1",
externSql(externalDataSource)
),
ImmutableList.of(),
ImmutableList.of()
)
);
didTest = true;
}
@Ignore
@Test
public void testExplainInsertFromExternal() throws Exception
{
@ -306,7 +589,10 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
// Use testQuery for EXPLAIN (not testInsertQuery).
testQuery(
new PlannerConfig(),
StringUtils.format("EXPLAIN PLAN FOR INSERT INTO dst SELECT * FROM %s", externSql(externalDataSource)),
StringUtils.format(
"EXPLAIN PLAN FOR INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
),
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
ImmutableList.of(
@ -321,6 +607,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
didTest = true;
}
@Ignore
@Test
public void testExplainInsertFromExternalUnauthorized()
{
@ -329,7 +616,10 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
ForbiddenException.class,
() ->
testQuery(
StringUtils.format("EXPLAIN PLAN FOR INSERT INTO dst SELECT * FROM %s", externSql(externalDataSource)),
StringUtils.format(
"EXPLAIN PLAN FOR INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
),
ImmutableList.of(),
ImmutableList.of()
)
@ -343,7 +633,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
public void testInsertFromExternalUnauthorized()
{
testInsertQuery()
.sql("INSERT INTO dst SELECT * FROM %s", externSql(externalDataSource))
.sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", externSql(externalDataSource))
.expectValidationError(ForbiddenException.class)
.verify();
}
@ -354,7 +644,10 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
// INSERT with a particular column ordering.
testInsertQuery()
.sql("INSERT INTO dst SELECT x || y AS xy, z FROM %s ORDER BY 1, 2", externSql(externalDataSource))
.sql(
"INSERT INTO dst SELECT x || y AS xy, z FROM %s PARTITIONED BY ALL TIME CLUSTERED BY 1, 2",
externSql(externalDataSource)
)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", RowSignature.builder().add("xy", ColumnType.STRING).add("z", ColumnType.LONG).build())
.expectResources(dataSourceWrite("dst"), ExternalOperatorConversion.EXTERNAL_RESOURCE_ACTION)
@ -370,6 +663,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
new ScanQuery.OrderBy("z", ScanQuery.Order.ASCENDING)
)
)
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
@ -382,7 +676,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
testInsertQuery()
.sql(
"INSERT INTO dst SELECT x, SUM(z) AS sum_z, COUNT(*) AS cnt FROM %s GROUP BY 1",
"INSERT INTO dst SELECT x, SUM(z) AS sum_z, COUNT(*) AS cnt FROM %s GROUP BY 1 PARTITIONED BY ALL TIME",
externSql(externalDataSource)
)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
@ -405,6 +699,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
new LongSumAggregatorFactory("a0", "z"),
new CountAggregatorFactory("a1")
)
.setContext(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
@ -417,7 +712,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
testInsertQuery()
.sql(
"INSERT INTO dst SELECT COUNT(*) AS cnt FROM %s",
"INSERT INTO dst SELECT COUNT(*) AS cnt FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
)
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
@ -434,6 +729,7 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(new CountAggregatorFactory("a0"))
.setContext(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
@ -454,6 +750,18 @@ public class CalciteInsertDmlTest extends BaseCalciteQueryTest
}
}
private Map<String, Object> queryContextWithGranularity(Granularity granularity)
{
String granularityString = null;
try {
granularityString = queryJsonMapper.writeValueAsString(granularity);
}
catch (JsonProcessingException e) {
Assert.fail(e.getMessage());
}
return ImmutableMap.of(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, granularityString);
}
private InsertDmlTester testInsertQuery()
{
return new InsertDmlTester();

View File

@ -13758,7 +13758,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
public void testSurfaceErrorsWhenInsertingThroughIncorrectSelectStatment()
{
assertQueryIsUnplannable(
"INSERT INTO druid.dst SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo",
"INSERT INTO druid.dst SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo PARTITIONED BY ALL TIME",
"Possible error: SQL requires 'UNION' but only 'UNION ALL' is supported."
);
}

View File

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.parser;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.sql.calcite.expression.builtin.TimeFloorOperatorConversion;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Enclosed.class)
public class DruidSqlParserUtilsTest
{
/**
* Sanity checking that the formats of TIME_FLOOR(__time, Period) work as expected
*/
@RunWith(Parameterized.class)
public static class TimeFloorToGranularityConversionTest
{
@Parameterized.Parameters(name = "{1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{"PT1H", Granularities.HOUR}
);
}
String periodString;
Granularity expectedGranularity;
public TimeFloorToGranularityConversionTest(String periodString, Granularity expectedGranularity)
{
this.periodString = periodString;
this.expectedGranularity = expectedGranularity;
}
@Test
public void testGranularityFromTimeFloor() throws ParseException
{
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
args.add(SqlLiteral.createCharString(this.periodString, SqlParserPos.ZERO));
final SqlNode timeFloorCall = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(
timeFloorCall);
Assert.assertEquals(expectedGranularity, actualGranularity);
}
}
/**
* Sanity checking that FLOOR(__time TO TimeUnit()) works as intended with the supported granularities
*/
@RunWith(Parameterized.class)
public static class FloorToGranularityConversionTest
{
@Parameterized.Parameters(name = "{1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{TimeUnit.SECOND, Granularities.SECOND},
new Object[]{TimeUnit.MINUTE, Granularities.MINUTE},
new Object[]{TimeUnit.HOUR, Granularities.HOUR},
new Object[]{TimeUnit.DAY, Granularities.DAY},
new Object[]{TimeUnit.WEEK, Granularities.WEEK},
new Object[]{TimeUnit.MONTH, Granularities.MONTH},
new Object[]{TimeUnit.QUARTER, Granularities.QUARTER},
new Object[]{TimeUnit.YEAR, Granularities.YEAR}
);
}
TimeUnit timeUnit;
Granularity expectedGranularity;
public FloorToGranularityConversionTest(TimeUnit timeUnit, Granularity expectedGranularity)
{
this.timeUnit = timeUnit;
this.expectedGranularity = expectedGranularity;
}
@Test
public void testGetGranularityFromFloor() throws ParseException
{
// parserPos doesn't matter
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
args.add(new SqlIntervalQualifier(this.timeUnit, null, SqlParserPos.ZERO));
final SqlNode floorCall = SqlStdOperatorTable.FLOOR.createCall(args);
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(floorCall);
Assert.assertEquals(expectedGranularity, actualGranularity);
}
}
public static class FloorToGranularityConversionTestErrors
{
/**
* Tests clause like "PARTITIONED BY 'day'"
*/
@Test
public void testConvertSqlNodeToGranularityWithIncorrectNode()
{
SqlNode sqlNode = SqlLiteral.createCharString("day", SqlParserPos.ZERO);
ParseException e = Assert.assertThrows(
ParseException.class,
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
);
Assert.assertEquals(
"Encountered 'day' after PARTITIONED BY. Expected HOUR, DAY, MONTH, YEAR, ALL TIME, FLOOR function or TIME_FLOOR function",
e.getMessage()
);
}
/**
* Tests clause like "PARTITIONED BY CEIL(__time TO DAY)"
*/
@Test
public void testConvertSqlNodeToGranularityWithIncorrectFunctionCall()
{
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
args.add(new SqlIntervalQualifier(TimeUnit.DAY, null, SqlParserPos.ZERO));
final SqlNode sqlNode = SqlStdOperatorTable.CEIL.createCall(args);
ParseException e = Assert.assertThrows(
ParseException.class,
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
);
Assert.assertEquals(
"PARTITIONED BY clause only supports FLOOR(__time TO <unit> and TIME_FLOOR(__time, period) functions",
e.getMessage()
);
}
/**
* Tests clause like "PARTITIONED BY FLOOR(__time)"
*/
@Test
public void testConvertSqlNodeToGranularityWithIncorrectNumberOfArguments()
{
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
ParseException e = Assert.assertThrows(
ParseException.class,
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
);
Assert.assertEquals("FLOOR in PARTITIONED BY clause must have two arguments", e.getMessage());
}
/**
* Tests clause like "PARTITIONED BY FLOOR(timestamps TO DAY)"
*/
@Test
public void testConvertSqlNodeToGranularityWithWrongIdentifierInFloorFunction()
{
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
args.add(new SqlIdentifier("timestamps", SqlParserPos.ZERO));
args.add(new SqlIntervalQualifier(TimeUnit.DAY, null, SqlParserPos.ZERO));
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
ParseException e = Assert.assertThrows(
ParseException.class,
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
);
Assert.assertEquals("First argument to FLOOR in PARTITIONED BY clause can only be __time", e.getMessage());
}
/**
* Tests clause like "PARTITIONED BY TIME_FLOOR(timestamps, 'PT1H')"
*/
@Test
public void testConvertSqlNodeToGranularityWithWrongIdentifierInTimeFloorFunction()
{
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
args.add(new SqlIdentifier("timestamps", SqlParserPos.ZERO));
args.add(SqlLiteral.createCharString("PT1H", SqlParserPos.ZERO));
final SqlNode sqlNode = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
ParseException e = Assert.assertThrows(
ParseException.class,
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
);
Assert.assertEquals("First argument to TIME_FLOOR in PARTITIONED BY clause can only be __time", e.getMessage());
}
/**
* Tests clause like "PARTITIONED BY FLOOR(__time to ISOYEAR)"
*/
@Test
public void testConvertSqlNodeToGranularityWithIncorrectIngestionGranularityInFloorFunction()
{
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
args.add(new SqlIntervalQualifier(TimeUnit.ISOYEAR, null, SqlParserPos.ZERO));
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
ParseException e = Assert.assertThrows(
ParseException.class,
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
);
Assert.assertEquals("ISOYEAR is not a valid granularity for ingestion", e.getMessage());
}
/**
* Tests clause like "PARTITIONED BY TIME_FLOOR(__time, 'abc')"
*/
@Test
public void testConvertSqlNodeToGranularityWithIncorrectIngestionGranularityInTimeFloorFunction()
{
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
args.add(SqlLiteral.createCharString("abc", SqlParserPos.ZERO));
final SqlNode sqlNode = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
ParseException e = Assert.assertThrows(
ParseException.class,
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
);
Assert.assertEquals("'abc' is an invalid period string", e.getMessage());
}
}
}