mirror of https://github.com/apache/druid.git
Fix up incorrect `PARTITIONED BY` error messages (#15961)
* Fix up typos, inaccuracies and clean up code related to PARTITIONED BY. * Remove wrapper function and update tests to use DruidExceptionMatcher. * Checkstyle and Intellij inspection fixes.
This commit is contained in:
parent
ddfc31d7ed
commit
67a6224d91
|
@ -256,12 +256,12 @@ For more information, see [Overwrite data with REPLACE](concepts.md#replace).
|
||||||
### `PARTITIONED BY`
|
### `PARTITIONED BY`
|
||||||
|
|
||||||
The `PARTITIONED BY <time granularity>` clause is required for [INSERT](#insert) and [REPLACE](#replace). See
|
The `PARTITIONED BY <time granularity>` clause is required for [INSERT](#insert) and [REPLACE](#replace). See
|
||||||
[Partitioning](concepts.md#partitioning) for details.
|
[Partitioning](concepts.md#partitioning-by-time) for details.
|
||||||
|
|
||||||
The following granularity arguments are accepted:
|
The following granularity arguments are accepted:
|
||||||
|
|
||||||
- Time unit keywords: `HOUR`, `DAY`, `MONTH`, or `YEAR`. Equivalent to `FLOOR(__time TO TimeUnit)`.
|
- Time unit keywords: `HOUR`, `DAY`, `MONTH`, or `YEAR`. Equivalent to `FLOOR(__time TO TimeUnit)`.
|
||||||
- Time units as ISO 8601 period strings: :`'PT1H'`, '`P1D`, etc. (Druid 26.0 and later.)
|
- Time units as ISO 8601 period strings: `'PT1H'`, `'P1D'`, etc. (Druid 26.0 and later.)
|
||||||
- `TIME_FLOOR(__time, 'granularity_string')`, where granularity_string is one of the ISO 8601 periods listed below. The
|
- `TIME_FLOOR(__time, 'granularity_string')`, where granularity_string is one of the ISO 8601 periods listed below. The
|
||||||
first argument must be `__time`.
|
first argument must be `__time`.
|
||||||
- `FLOOR(__time TO TimeUnit)`, where `TimeUnit` is any unit supported by the [FLOOR function](../querying/sql-scalar.md#date-and-time-functions). The first argument must be `__time`.
|
- `FLOOR(__time TO TimeUnit)`, where `TimeUnit` is any unit supported by the [FLOOR function](../querying/sql-scalar.md#date-and-time-functions). The first argument must be `__time`.
|
||||||
|
@ -296,8 +296,6 @@ The string constant can also include any of the keywords mentioned above:
|
||||||
- `ALL TIME`
|
- `ALL TIME`
|
||||||
- `ALL` - Alias for `ALL TIME`
|
- `ALL` - Alias for `ALL TIME`
|
||||||
|
|
||||||
The `WEEK` granularity is deprecated and not supported in MSQ.
|
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
|
|
||||||
```SQL
|
```SQL
|
||||||
|
@ -411,7 +409,7 @@ The query reads `products` and `customers` and then broadcasts both to
|
||||||
the stage that reads `orders`. That stage loads the broadcast inputs (`products` and `customers`) in memory and walks
|
the stage that reads `orders`. That stage loads the broadcast inputs (`products` and `customers`) in memory and walks
|
||||||
through `orders` row by row. The results are aggregated and written to the table `orders_enriched`.
|
through `orders` row by row. The results are aggregated and written to the table `orders_enriched`.
|
||||||
|
|
||||||
```
|
```sql
|
||||||
REPLACE INTO orders_enriched
|
REPLACE INTO orders_enriched
|
||||||
OVERWRITE ALL
|
OVERWRITE ALL
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -448,7 +446,7 @@ When using the sort-merge algorithm, keep the following in mind:
|
||||||
The following example runs using a single sort-merge join stage that receives `eventstream`
|
The following example runs using a single sort-merge join stage that receives `eventstream`
|
||||||
(partitioned on `user_id`) and `users` (partitioned on `id`) as inputs. There is no limit on the size of either input.
|
(partitioned on `user_id`) and `users` (partitioned on `id`) as inputs. There is no limit on the size of either input.
|
||||||
|
|
||||||
```
|
```sql
|
||||||
REPLACE INTO eventstream_enriched
|
REPLACE INTO eventstream_enriched
|
||||||
OVERWRITE ALL
|
OVERWRITE ALL
|
||||||
SELECT
|
SELECT
|
||||||
|
|
|
@ -63,7 +63,7 @@ SqlGranularityLiteral PartitionGranularity() :
|
||||||
|
|
|
|
||||||
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
|
||||||
{
|
{
|
||||||
granularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
|
granularity = DruidSqlParserUtils.convertSqlNodeToGranularity(e);
|
||||||
unparseString = e.toString();
|
unparseString = e.toString();
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
|
@ -84,7 +84,7 @@ SqlNode DruidSqlInsertEof() :
|
||||||
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
|
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
|
||||||
)
|
)
|
||||||
// PARTITIONED BY is necessary, but is kept optional in the grammar. It is asserted that it is not missing in the
|
// PARTITIONED BY is necessary, but is kept optional in the grammar. It is asserted that it is not missing in the
|
||||||
// DruidSqlInsert constructor so that we can return a custom error message.
|
// IngestHandler#validate() so that we can return a custom error message.
|
||||||
[
|
[
|
||||||
<PARTITIONED> <BY>
|
<PARTITIONED> <BY>
|
||||||
partitionedBy = PartitionGranularity()
|
partitionedBy = PartitionGranularity()
|
||||||
|
|
|
@ -68,7 +68,7 @@ SqlNode DruidSqlReplaceEof() :
|
||||||
]
|
]
|
||||||
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
|
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
|
||||||
// PARTITIONED BY is necessary, but is kept optional in the grammar. It is asserted that it is not missing in the
|
// PARTITIONED BY is necessary, but is kept optional in the grammar. It is asserted that it is not missing in the
|
||||||
// DruidSqlInsert constructor so that we can return a custom error message.
|
// IngestHandler#validate() so that we can return a custom error message.
|
||||||
[
|
[
|
||||||
<PARTITIONED> <BY>
|
<PARTITIONED> <BY>
|
||||||
partitionedBy = PartitionGranularity()
|
partitionedBy = PartitionGranularity()
|
||||||
|
|
|
@ -63,10 +63,11 @@ public class DruidSqlInsert extends DruidSqlIngest
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* While partitionedBy can be null as arguments to the constructor, this is
|
* While partitionedBy can be null as arguments to the constructor, this is disallowed (semantically) and
|
||||||
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
|
* {@link org.apache.druid.sql.calcite.planner.IngestHandler#validate()} performs checks to ensure that. This helps
|
||||||
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
|
* in producing friendly errors when the PARTITIONED BY custom clause is not present, and keeps its error separate
|
||||||
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
|
* from JavaCC/Calcite's custom errors which can be cryptic when someone accidentally forgets to explicitly specify
|
||||||
|
* the PARTITIONED BY clause
|
||||||
*/
|
*/
|
||||||
public DruidSqlInsert(
|
public DruidSqlInsert(
|
||||||
SqlParserPos pos,
|
SqlParserPos pos,
|
||||||
|
|
|
@ -20,7 +20,6 @@
|
||||||
package org.apache.druid.sql.calcite.parser;
|
package org.apache.druid.sql.calcite.parser;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.calcite.sql.SqlAsOperator;
|
import org.apache.calcite.sql.SqlAsOperator;
|
||||||
|
@ -37,13 +36,11 @@ import org.apache.calcite.sql.SqlOrderBy;
|
||||||
import org.apache.calcite.sql.SqlTimestampLiteral;
|
import org.apache.calcite.sql.SqlTimestampLiteral;
|
||||||
import org.apache.calcite.sql.SqlUnknownLiteral;
|
import org.apache.calcite.sql.SqlUnknownLiteral;
|
||||||
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
|
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
|
||||||
import org.apache.calcite.sql.parser.SqlParserUtil;
|
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||||
import org.apache.calcite.sql.type.SqlTypeName;
|
import org.apache.calcite.sql.type.SqlTypeName;
|
||||||
import org.apache.calcite.tools.ValidationException;
|
|
||||||
import org.apache.calcite.util.Pair;
|
import org.apache.calcite.util.Pair;
|
||||||
import org.apache.druid.error.DruidException;
|
import org.apache.druid.error.DruidException;
|
||||||
import org.apache.druid.error.InvalidSqlInput;
|
import org.apache.druid.error.InvalidSqlInput;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
|
||||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||||
import org.apache.druid.java.util.common.granularity.GranularityType;
|
import org.apache.druid.java.util.common.granularity.GranularityType;
|
||||||
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
|
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
|
||||||
|
@ -78,40 +75,16 @@ public class DruidSqlParserUtils
|
||||||
private static final Logger log = new Logger(DruidSqlParserUtils.class);
|
private static final Logger log = new Logger(DruidSqlParserUtils.class);
|
||||||
public static final String ALL = "all";
|
public static final String ALL = "all";
|
||||||
|
|
||||||
private static final List<GranularityType> DOCUMENTED_GRANULARTIES = Arrays.stream(GranularityType.values())
|
private static final List<GranularityType> DOCUMENTED_GRANULARITIES = Arrays.stream(GranularityType.values())
|
||||||
.filter(g -> g != GranularityType.WEEK)
|
.filter(g -> g != GranularityType.WEEK &&
|
||||||
|
g != GranularityType.NONE)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
@VisibleForTesting
|
|
||||||
public static final String PARTITION_ERROR_MESSAGE =
|
|
||||||
"Invalid granularity[%s] specified after PARTITIONED BY clause. "
|
|
||||||
+ "Expected "
|
|
||||||
+ StringUtils.replace(StringUtils.replace(DOCUMENTED_GRANULARTIES.toString(), "[", ""), "]", ",").trim()
|
|
||||||
+ " ALL TIME, FLOOR() or TIME_FLOOR()";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delegates to {@code convertSqlNodeToGranularity} and converts the exceptions to {@link ParseException}
|
|
||||||
* with the underlying message
|
|
||||||
*/
|
|
||||||
public static Granularity convertSqlNodeToGranularityThrowingParseExceptions(SqlNode sqlNode) throws ParseException
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
return convertSqlNodeToGranularity(sqlNode);
|
|
||||||
}
|
|
||||||
catch (DruidException e) {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
log.debug(e, StringUtils.format("Unable to convert %s to a valid granularity.", sqlNode.toString()));
|
|
||||||
throw new ParseException(e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is used to extract the granularity from a SqlNode which represents
|
* This method is used to extract the granularity from a SqlNode which represents
|
||||||
* the argument to the {@code PARTITIONED BY} clause. The node can be any of the following:
|
* the argument to the {@code PARTITIONED BY} clause. The node can be any of the following:
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>A literal with a string that matches the SQL keywords
|
* <li>A literal with a string that matches the SQL keywords from {@link #DOCUMENTED_GRANULARITIES} </li>
|
||||||
* {@code HOUR, DAY, MONTH, YEAR, ALL [TIME]}</li>
|
|
||||||
* <li>A literal string with a period in ISO 8601 format.</li>
|
* <li>A literal string with a period in ISO 8601 format.</li>
|
||||||
* <li>Function call: {@code FLOOR(__time TO TimeUnit)}</li>
|
* <li>Function call: {@code FLOOR(__time TO TimeUnit)}</li>
|
||||||
* <li>Function call: TIME_FLOOR(__time, 'PT1H')}</li>
|
* <li>Function call: TIME_FLOOR(__time, 'PT1H')}</li>
|
||||||
|
@ -143,22 +116,23 @@ public class DruidSqlParserUtils
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sqlNode instanceof SqlLiteral) {
|
|
||||||
final Granularity retVal;
|
final Granularity retVal;
|
||||||
|
|
||||||
|
// Check if argument is a literal such as DAY or "DAY".
|
||||||
|
if (sqlNode instanceof SqlLiteral) {
|
||||||
SqlLiteral literal = (SqlLiteral) sqlNode;
|
SqlLiteral literal = (SqlLiteral) sqlNode;
|
||||||
if (SqlLiteral.valueMatchesType(literal.getValue(), SqlTypeName.CHAR)) {
|
if (SqlLiteral.valueMatchesType(literal.getValue(), SqlTypeName.CHAR)) {
|
||||||
retVal = convertSqlLiteralCharToGranularity(literal);
|
retVal = convertSqlLiteralCharToGranularity(literal);
|
||||||
} else {
|
} else {
|
||||||
throw makeInvalidPartitionByException(literal);
|
throw makeInvalidPartitionByException(literal);
|
||||||
}
|
}
|
||||||
|
|
||||||
validateSupportedGranularityForPartitionedBy(sqlNode, retVal);
|
validateSupportedGranularityForPartitionedBy(sqlNode, retVal);
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if argument is an ISO 8601 period such as P1D or "P1D".
|
||||||
if (sqlNode instanceof SqlIdentifier) {
|
if (sqlNode instanceof SqlIdentifier) {
|
||||||
SqlIdentifier identifier = (SqlIdentifier) sqlNode;
|
SqlIdentifier identifier = (SqlIdentifier) sqlNode;
|
||||||
final Granularity retVal;
|
|
||||||
retVal = convertSqlIdentiferToGranularity(identifier);
|
retVal = convertSqlIdentiferToGranularity(identifier);
|
||||||
validateSupportedGranularityForPartitionedBy(sqlNode, retVal);
|
validateSupportedGranularityForPartitionedBy(sqlNode, retVal);
|
||||||
return retVal;
|
return retVal;
|
||||||
|
@ -171,41 +145,56 @@ public class DruidSqlParserUtils
|
||||||
|
|
||||||
String operatorName = sqlCall.getOperator().getName();
|
String operatorName = sqlCall.getOperator().getName();
|
||||||
|
|
||||||
Preconditions.checkArgument(
|
if (!(SqlStdOperatorTable.FLOOR.getName().equalsIgnoreCase(operatorName) ||
|
||||||
"FLOOR".equalsIgnoreCase(operatorName)
|
TimeFloorOperatorConversion.SQL_FUNCTION_NAME.equalsIgnoreCase(operatorName))) {
|
||||||
|| TimeFloorOperatorConversion.SQL_FUNCTION_NAME.equalsIgnoreCase(operatorName),
|
throw InvalidSqlInput.exception(
|
||||||
StringUtils.format(
|
"Invalid operator[%s] specified. "
|
||||||
"PARTITIONED BY clause only supports FLOOR(__time TO <unit> and %s(__time, period) functions",
|
+ "PARTITIONED BY clause only supports %s(__time TO <unit>) and %s(__time, period) functions.",
|
||||||
|
operatorName,
|
||||||
|
SqlStdOperatorTable.FLOOR.getName(),
|
||||||
TimeFloorOperatorConversion.SQL_FUNCTION_NAME
|
TimeFloorOperatorConversion.SQL_FUNCTION_NAME
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
List<SqlNode> operandList = sqlCall.getOperandList();
|
List<SqlNode> operandList = sqlCall.getOperandList();
|
||||||
Preconditions.checkArgument(
|
if (operandList.size() != 2) {
|
||||||
operandList.size() == 2,
|
throw InvalidSqlInput.exception(
|
||||||
StringUtils.format("%s in PARTITIONED BY clause must have two arguments", operatorName)
|
"%s in PARTITIONED BY clause must have 2 arguments, but only [%d] provided.",
|
||||||
|
operatorName,
|
||||||
|
operandList.size()
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Check if the first argument passed in the floor function is __time
|
// Check if the first argument passed in the floor function is __time
|
||||||
SqlNode timeOperandSqlNode = operandList.get(0);
|
SqlNode timeOperandSqlNode = operandList.get(0);
|
||||||
Preconditions.checkArgument(
|
if (!SqlKind.IDENTIFIER.equals(timeOperandSqlNode.getKind())) {
|
||||||
timeOperandSqlNode.getKind().equals(SqlKind.IDENTIFIER),
|
throw InvalidSqlInput.exception(
|
||||||
StringUtils.format("First argument to %s in PARTITIONED BY clause can only be __time", operatorName)
|
"Invalid argument type[%s] provided. The first argument to %s in PARTITIONED BY clause must be __time.",
|
||||||
|
timeOperandSqlNode.getKind(),
|
||||||
|
operatorName
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
SqlIdentifier timeOperandSqlIdentifier = (SqlIdentifier) timeOperandSqlNode;
|
SqlIdentifier timeOperandSqlIdentifier = (SqlIdentifier) timeOperandSqlNode;
|
||||||
Preconditions.checkArgument(
|
if (!ColumnHolder.TIME_COLUMN_NAME.equals(timeOperandSqlIdentifier.getSimple())) {
|
||||||
timeOperandSqlIdentifier.getSimple().equals(ColumnHolder.TIME_COLUMN_NAME),
|
throw InvalidSqlInput.exception(
|
||||||
StringUtils.format("First argument to %s in PARTITIONED BY clause can only be __time", operatorName)
|
"Invalid argument[%s] provided. The first argument to %s in PARTITIONED BY clause must be __time.",
|
||||||
|
timeOperandSqlIdentifier.getSimple(),
|
||||||
|
operatorName
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// If the floor function is of form TIME_FLOOR(__time, 'PT1H')
|
// If the floor function is of form TIME_FLOOR(__time, 'PT1H')
|
||||||
if (operatorName.equalsIgnoreCase(TimeFloorOperatorConversion.SQL_FUNCTION_NAME)) {
|
if (TimeFloorOperatorConversion.SQL_FUNCTION_NAME.equalsIgnoreCase(operatorName)) {
|
||||||
SqlNode granularitySqlNode = operandList.get(1);
|
SqlNode granularitySqlNode = operandList.get(1);
|
||||||
Preconditions.checkArgument(
|
if (!SqlKind.LITERAL.equals(granularitySqlNode.getKind())) {
|
||||||
granularitySqlNode.getKind().equals(SqlKind.LITERAL),
|
throw InvalidSqlInput.exception(
|
||||||
"Second argument to TIME_FLOOR in PARTITIONED BY clause must be a period like 'PT1H'"
|
"Invalid argument[%s] provided. The second argument to %s in PARTITIONED BY clause must be a period like 'PT1H'.",
|
||||||
|
granularitySqlNode.getKind(),
|
||||||
|
TimeFloorOperatorConversion.SQL_FUNCTION_NAME
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
String granularityString = SqlLiteral.unchain(granularitySqlNode).toValue();
|
String granularityString = SqlLiteral.unchain(granularitySqlNode).toValue();
|
||||||
Period period;
|
Period period;
|
||||||
try {
|
try {
|
||||||
|
@ -213,33 +202,35 @@ public class DruidSqlParserUtils
|
||||||
}
|
}
|
||||||
catch (IllegalArgumentException e) {
|
catch (IllegalArgumentException e) {
|
||||||
throw InvalidSqlInput.exception(
|
throw InvalidSqlInput.exception(
|
||||||
StringUtils.format("granularity[%s] is an invalid period string", granularitySqlNode.toString()),
|
"granularity[%s] is an invalid period literal.",
|
||||||
sqlNode);
|
granularitySqlNode.toString()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
final PeriodGranularity retVal = new PeriodGranularity(period, null, null);
|
retVal = new PeriodGranularity(period, null, null);
|
||||||
validateSupportedGranularityForPartitionedBy(sqlNode, retVal);
|
validateSupportedGranularityForPartitionedBy(sqlNode, retVal);
|
||||||
return retVal;
|
return retVal;
|
||||||
|
|
||||||
} else if ("FLOOR".equalsIgnoreCase(operatorName)) { // If the floor function is of form FLOOR(__time TO DAY)
|
} else if (SqlStdOperatorTable.FLOOR.getName().equalsIgnoreCase(operatorName)) { // If the floor function is of form FLOOR(__time TO DAY)
|
||||||
SqlNode granularitySqlNode = operandList.get(1);
|
SqlNode granularitySqlNode = operandList.get(1);
|
||||||
// In future versions of Calcite, this can be checked via
|
// In future versions of Calcite, this can be checked via
|
||||||
// granularitySqlNode.getKind().equals(SqlKind.INTERVAL_QUALIFIER)
|
// granularitySqlNode.getKind().equals(SqlKind.INTERVAL_QUALIFIER)
|
||||||
Preconditions.checkArgument(
|
if (!(granularitySqlNode instanceof SqlIntervalQualifier)) {
|
||||||
granularitySqlNode instanceof SqlIntervalQualifier,
|
throw InvalidSqlInput.exception(
|
||||||
"Second argument to the FLOOR function in PARTITIONED BY clause is not a valid granularity. "
|
"Second argument[%s] to the FLOOR function in PARTITIONED BY clause is not a valid granularity. "
|
||||||
+ "Please refer to the documentation of FLOOR function"
|
+ "Please refer to the documentation of FLOOR function.",
|
||||||
|
granularitySqlNode.toString()
|
||||||
);
|
);
|
||||||
|
}
|
||||||
SqlIntervalQualifier granularityIntervalQualifier = (SqlIntervalQualifier) granularitySqlNode;
|
SqlIntervalQualifier granularityIntervalQualifier = (SqlIntervalQualifier) granularitySqlNode;
|
||||||
|
|
||||||
Period period = TimeUnits.toPeriod(granularityIntervalQualifier.timeUnitRange);
|
Period period = TimeUnits.toPeriod(granularityIntervalQualifier.timeUnitRange);
|
||||||
Preconditions.checkNotNull(
|
if (period == null) {
|
||||||
period,
|
throw InvalidSqlInput.exception(
|
||||||
StringUtils.format(
|
"%s is not a valid period granularity for ingestion.",
|
||||||
"%s is not a valid granularity for ingestion",
|
|
||||||
granularityIntervalQualifier.timeUnitRange.toString()
|
granularityIntervalQualifier.timeUnitRange.toString()
|
||||||
)
|
|
||||||
);
|
);
|
||||||
final PeriodGranularity retVal = new PeriodGranularity(period, null, null);
|
}
|
||||||
|
retVal = new PeriodGranularity(period, null, null);
|
||||||
validateSupportedGranularityForPartitionedBy(sqlNode, retVal);
|
validateSupportedGranularityForPartitionedBy(sqlNode, retVal);
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
@ -286,7 +277,11 @@ public class DruidSqlParserUtils
|
||||||
private static DruidException makeInvalidPartitionByException(SqlNode sqlNode)
|
private static DruidException makeInvalidPartitionByException(SqlNode sqlNode)
|
||||||
{
|
{
|
||||||
return InvalidSqlInput.exception(
|
return InvalidSqlInput.exception(
|
||||||
PARTITION_ERROR_MESSAGE,
|
"Invalid granularity[%s] specified after PARTITIONED BY clause. Expected "
|
||||||
|
+ DOCUMENTED_GRANULARITIES.stream()
|
||||||
|
.map(granularityType -> "'" + granularityType.name() + "'")
|
||||||
|
.collect(Collectors.joining(", "))
|
||||||
|
+ ", ALL TIME, FLOOR() or TIME_FLOOR()",
|
||||||
sqlNode
|
sqlNode
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -307,7 +302,7 @@ public class DruidSqlParserUtils
|
||||||
* @param granularity granularity of the query for validation
|
* @param granularity granularity of the query for validation
|
||||||
* @param dateTimeZone timezone
|
* @param dateTimeZone timezone
|
||||||
* @return List of string representation of intervals
|
* @return List of string representation of intervals
|
||||||
* @throws ValidationException if the SqlNode cannot be converted to a list of intervals
|
* @throws DruidException if the SqlNode cannot be converted to a list of intervals
|
||||||
*/
|
*/
|
||||||
public static List<String> validateQueryAndConvertToIntervals(
|
public static List<String> validateQueryAndConvertToIntervals(
|
||||||
SqlNode replaceTimeQuery,
|
SqlNode replaceTimeQuery,
|
||||||
|
@ -365,7 +360,7 @@ public class DruidSqlParserUtils
|
||||||
* @param query sql query
|
* @param query sql query
|
||||||
* @param clusteredByList List of clustered by columns
|
* @param clusteredByList List of clustered by columns
|
||||||
* @return SqlOrderBy node containing the clusteredByList information
|
* @return SqlOrderBy node containing the clusteredByList information
|
||||||
* @throws ValidationException if any of the clustered by columns contain DESCENDING order.
|
* @throws DruidException if any of the clustered by columns contain DESCENDING order.
|
||||||
*/
|
*/
|
||||||
public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, SqlNodeList clusteredByList)
|
public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, SqlNodeList clusteredByList)
|
||||||
{
|
{
|
||||||
|
@ -472,6 +467,7 @@ public class DruidSqlParserUtils
|
||||||
*
|
*
|
||||||
* @param clusteredByNodes List of SqlNodes representing columns to be clustered by.
|
* @param clusteredByNodes List of SqlNodes representing columns to be clustered by.
|
||||||
*/
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
public static void validateClusteredByColumns(final SqlNodeList clusteredByNodes)
|
public static void validateClusteredByColumns(final SqlNodeList clusteredByNodes)
|
||||||
{
|
{
|
||||||
if (clusteredByNodes == null) {
|
if (clusteredByNodes == null) {
|
||||||
|
@ -508,9 +504,9 @@ public class DruidSqlParserUtils
|
||||||
* @param replaceTimeQuery Sql node representing the query
|
* @param replaceTimeQuery Sql node representing the query
|
||||||
* @param dateTimeZone timezone
|
* @param dateTimeZone timezone
|
||||||
* @return Dimfilter for the query
|
* @return Dimfilter for the query
|
||||||
* @throws ValidationException if the SqlNode cannot be converted a Dimfilter
|
* @throws DruidException if the SqlNode cannot be converted a Dimfilter
|
||||||
*/
|
*/
|
||||||
public static DimFilter convertQueryToDimFilter(SqlNode replaceTimeQuery, DateTimeZone dateTimeZone)
|
private static DimFilter convertQueryToDimFilter(SqlNode replaceTimeQuery, DateTimeZone dateTimeZone)
|
||||||
{
|
{
|
||||||
if (!(replaceTimeQuery instanceof SqlBasicCall)) {
|
if (!(replaceTimeQuery instanceof SqlBasicCall)) {
|
||||||
throw InvalidSqlInput.exception(
|
throw InvalidSqlInput.exception(
|
||||||
|
@ -667,44 +663,10 @@ public class DruidSqlParserUtils
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
if (!GranularityType.isStandard(granularity)) {
|
if (!GranularityType.isStandard(granularity)) {
|
||||||
throw InvalidSqlInput.exception(
|
throw makeInvalidPartitionByException(originalNode);
|
||||||
"The granularity specified in PARTITIONED BY [%s] is not supported. Valid options: [%s]",
|
|
||||||
originalNode == null ? granularity : originalNode,
|
|
||||||
Arrays.stream(GranularityType.values())
|
|
||||||
.filter(granularityType -> !granularityType.equals(GranularityType.NONE))
|
|
||||||
.map(Enum::name)
|
|
||||||
.map(StringUtils::toLowerCase)
|
|
||||||
.collect(Collectors.joining(", "))
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the timestamp string from a TIMESTAMP (or TIMESTAMP WITH LOCAL TIME ZONE) literal.
|
|
||||||
*
|
|
||||||
* @return string, or null if the provided node is not a timestamp literal
|
|
||||||
*/
|
|
||||||
@Nullable
|
|
||||||
private static String getTimestampStringFromLiteral(final SqlNode sqlNode)
|
|
||||||
{
|
|
||||||
if (sqlNode instanceof SqlTimestampLiteral) {
|
|
||||||
return ((SqlTimestampLiteral) sqlNode).toFormattedString();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sqlNode instanceof SqlUnknownLiteral) {
|
|
||||||
// SqlUnknownLiteral represents a type that is unknown until validation time. The tag is resolved to a proper
|
|
||||||
// type name later on; for example TIMESTAMP may become TIMESTAMP WITH LOCAL TIME ZONE.
|
|
||||||
final SqlUnknownLiteral sqlUnknownLiteral = (SqlUnknownLiteral) sqlNode;
|
|
||||||
|
|
||||||
if (SqlTypeName.TIMESTAMP.getSpaceName().equals(sqlUnknownLiteral.tag)
|
|
||||||
|| SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.getSpaceName().equals(sqlUnknownLiteral.tag)) {
|
|
||||||
return SqlParserUtil.parseTimestampLiteral(sqlUnknownLiteral.getValue(), sqlNode.getParserPosition())
|
|
||||||
.toFormattedString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static DruidException problemParsing(String message)
|
public static DruidException problemParsing(String message)
|
||||||
{
|
{
|
||||||
|
|
|
@ -69,10 +69,11 @@ public class DruidSqlReplace extends DruidSqlIngest
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* While partitionedBy can be null as arguments to the constructor, this is
|
* While partitionedBy can be null as arguments to the constructor, this is disallowed (semantically) and
|
||||||
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
|
* {@link org.apache.druid.sql.calcite.planner.IngestHandler#validate()} performs checks to ensure that. This helps
|
||||||
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
|
* in producing friendly errors when the PARTITIONED BY custom clause is not present, and keeps its error separate
|
||||||
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
|
* from JavaCC/Calcite's custom errors which can be cryptic when someone accidentally forgets to explicitly specify
|
||||||
|
* the PARTITIONED BY clause
|
||||||
*/
|
*/
|
||||||
public DruidSqlReplace(
|
public DruidSqlReplace(
|
||||||
SqlParserPos pos,
|
SqlParserPos pos,
|
||||||
|
|
|
@ -19,24 +19,22 @@
|
||||||
|
|
||||||
package org.apache.druid.sql.calcite.parser;
|
package org.apache.druid.sql.calcite.parser;
|
||||||
|
|
||||||
import org.apache.calcite.sql.SqlIdentifier;
|
|
||||||
import org.apache.calcite.sql.SqlLiteral;
|
import org.apache.calcite.sql.SqlLiteral;
|
||||||
import org.apache.calcite.sql.SqlWriter;
|
import org.apache.calcite.sql.SqlWriter;
|
||||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||||
import org.apache.calcite.sql.type.SqlTypeName;
|
import org.apache.calcite.sql.type.SqlTypeName;
|
||||||
import org.apache.calcite.util.NlsString;
|
import org.apache.calcite.util.NlsString;
|
||||||
import org.apache.druid.error.DruidException;
|
|
||||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extends the {@link SqlIdentifier} to hold parameters for the PARTITIONED BY clause.
|
* Extends the {@link SqlLiteral} to hold parameters for the PARTITIONED BY clause.
|
||||||
*/
|
*/
|
||||||
public class SqlGranularityLiteral extends SqlLiteral
|
public class SqlGranularityLiteral extends SqlLiteral
|
||||||
{
|
{
|
||||||
private String unparseString;
|
private final String unparseString;
|
||||||
private Granularity granularity;
|
private final Granularity granularity;
|
||||||
|
|
||||||
public SqlGranularityLiteral(
|
public SqlGranularityLiteral(
|
||||||
@Nonnull Granularity granularity,
|
@Nonnull Granularity granularity,
|
||||||
|
@ -54,13 +52,6 @@ public class SqlGranularityLiteral extends SqlLiteral
|
||||||
return new SqlGranularityLiteral(granularity, unparseString, pos);
|
return new SqlGranularityLiteral(granularity, unparseString, pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public Object clone()
|
|
||||||
{
|
|
||||||
throw DruidException.defensive("Function is deprecated, please use clone(SqlNode) instead.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
public Granularity getGranularity()
|
public Granularity getGranularity()
|
||||||
{
|
{
|
||||||
|
@ -70,8 +61,6 @@ public class SqlGranularityLiteral extends SqlLiteral
|
||||||
@Override
|
@Override
|
||||||
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
|
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
|
||||||
{
|
{
|
||||||
if (unparseString != null) {
|
|
||||||
writer.keyword(unparseString);
|
writer.keyword(unparseString);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.druid.error.DruidExceptionMatcher;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||||
|
import org.apache.druid.java.util.common.granularity.GranularityType;
|
||||||
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||||
import org.apache.druid.query.QueryDataSource;
|
import org.apache.druid.query.QueryDataSource;
|
||||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
@ -47,7 +48,6 @@ import org.apache.druid.sql.calcite.external.ExternalDataSource;
|
||||||
import org.apache.druid.sql.calcite.external.Externals;
|
import org.apache.druid.sql.calcite.external.Externals;
|
||||||
import org.apache.druid.sql.calcite.filtration.Filtration;
|
import org.apache.druid.sql.calcite.filtration.Filtration;
|
||||||
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
|
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
|
||||||
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
|
|
||||||
import org.apache.druid.sql.calcite.planner.Calcites;
|
import org.apache.druid.sql.calcite.planner.Calcites;
|
||||||
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||||
import org.apache.druid.sql.calcite.util.CalciteTests;
|
import org.apache.druid.sql.calcite.util.CalciteTests;
|
||||||
|
@ -59,9 +59,11 @@ import org.junit.internal.matchers.ThrowableMessageMatcher;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.druid.segment.column.ColumnType.DOUBLE;
|
import static org.apache.druid.segment.column.ColumnType.DOUBLE;
|
||||||
import static org.apache.druid.segment.column.ColumnType.FLOAT;
|
import static org.apache.druid.segment.column.ColumnType.FLOAT;
|
||||||
|
@ -657,6 +659,52 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
|
||||||
didTest = true;
|
didTest = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionedBySupportedGranularityLiteralClauses()
|
||||||
|
{
|
||||||
|
final RowSignature targetRowSignature = RowSignature.builder()
|
||||||
|
.add("__time", ColumnType.LONG)
|
||||||
|
.add("dim1", ColumnType.STRING)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Map<String, Granularity> partitionedByToGranularity =
|
||||||
|
Arrays.stream(GranularityType.values())
|
||||||
|
.collect(Collectors.toMap(GranularityType::name, GranularityType::getDefaultGranularity));
|
||||||
|
|
||||||
|
final ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
|
||||||
|
partitionedByToGranularity.forEach((partitionedByArgument, expectedGranularity) -> {
|
||||||
|
Map<String, Object> queryContext = null;
|
||||||
|
try {
|
||||||
|
queryContext = ImmutableMap.of(
|
||||||
|
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, queryJsonMapper.writeValueAsString(expectedGranularity)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (JsonProcessingException e) {
|
||||||
|
// Won't reach here
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
testIngestionQuery()
|
||||||
|
.sql(StringUtils.format(
|
||||||
|
"INSERT INTO druid.dst SELECT __time, dim1 FROM foo PARTITIONED BY '%s'",
|
||||||
|
partitionedByArgument
|
||||||
|
))
|
||||||
|
.expectTarget("dst", targetRowSignature)
|
||||||
|
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
|
||||||
|
.expectQuery(
|
||||||
|
newScanQueryBuilder()
|
||||||
|
.dataSource("foo")
|
||||||
|
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||||
|
.columns("__time", "dim1")
|
||||||
|
.context(queryContext)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.verify();
|
||||||
|
didTest = false;
|
||||||
|
});
|
||||||
|
didTest = true;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExplainPlanInsertWithClusteredBy() throws JsonProcessingException
|
public void testExplainPlanInsertWithClusteredBy() throws JsonProcessingException
|
||||||
{
|
{
|
||||||
|
@ -1165,7 +1213,6 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
|
||||||
@Test
|
@Test
|
||||||
public void testInsertWithPartitionedByContainingInvalidGranularity()
|
public void testInsertWithPartitionedByContainingInvalidGranularity()
|
||||||
{
|
{
|
||||||
// Throws a ValidationException, which gets converted to a DruidException before throwing to end user
|
|
||||||
try {
|
try {
|
||||||
testQuery(
|
testQuery(
|
||||||
"INSERT INTO dst SELECT * FROM foo PARTITIONED BY 'invalid_granularity'",
|
"INSERT INTO dst SELECT * FROM foo PARTITIONED BY 'invalid_granularity'",
|
||||||
|
@ -1178,7 +1225,10 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
|
||||||
MatcherAssert.assertThat(
|
MatcherAssert.assertThat(
|
||||||
e,
|
e,
|
||||||
invalidSqlIs(
|
invalidSqlIs(
|
||||||
StringUtils.format(DruidSqlParserUtils.PARTITION_ERROR_MESSAGE, "'invalid_granularity'")
|
"Invalid granularity['invalid_granularity'] specified after PARTITIONED BY clause."
|
||||||
|
+ " Expected 'SECOND', 'MINUTE', 'FIVE_MINUTE', 'TEN_MINUTE', 'FIFTEEN_MINUTE', 'THIRTY_MINUTE', 'HOUR',"
|
||||||
|
+ " 'SIX_HOUR', 'EIGHT_HOUR', 'DAY', 'MONTH', 'QUARTER', 'YEAR', 'ALL', ALL TIME, FLOOR()"
|
||||||
|
+ " or TIME_FLOOR()"
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
didTest = true;
|
didTest = true;
|
||||||
|
@ -1624,9 +1674,10 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
|
||||||
CoreMatchers.allOf(
|
CoreMatchers.allOf(
|
||||||
CoreMatchers.instanceOf(DruidException.class),
|
CoreMatchers.instanceOf(DruidException.class),
|
||||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
|
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
|
||||||
"The granularity specified in PARTITIONED BY [`time_floor`(`__time`, 'PT2H')] is not supported. "
|
"Invalid granularity[`time_floor`(`__time`, 'PT2H')] specified after PARTITIONED BY clause."
|
||||||
+ "Valid options: [second, minute, five_minute, ten_minute, fifteen_minute, thirty_minute, hour, "
|
+ " Expected 'SECOND', 'MINUTE', 'FIVE_MINUTE', 'TEN_MINUTE', 'FIFTEEN_MINUTE', 'THIRTY_MINUTE',"
|
||||||
+ "six_hour, eight_hour, day, week, month, quarter, year, all]"))
|
+ " 'HOUR', 'SIX_HOUR', 'EIGHT_HOUR', 'DAY', 'MONTH', 'QUARTER', 'YEAR', 'ALL',"
|
||||||
|
+ " ALL TIME, FLOOR() or TIME_FLOOR()"))
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
.verify();
|
.verify();
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.sql.calcite;
|
package org.apache.druid.sql.calcite;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
@ -26,6 +27,8 @@ import org.apache.druid.error.DruidException;
|
||||||
import org.apache.druid.error.DruidExceptionMatcher;
|
import org.apache.druid.error.DruidExceptionMatcher;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
|
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||||
|
import org.apache.druid.java.util.common.granularity.GranularityType;
|
||||||
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
|
@ -47,9 +50,11 @@ import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.druid.segment.column.ColumnType.DOUBLE;
|
import static org.apache.druid.segment.column.ColumnType.DOUBLE;
|
||||||
import static org.apache.druid.segment.column.ColumnType.FLOAT;
|
import static org.apache.druid.segment.column.ColumnType.FLOAT;
|
||||||
|
@ -594,10 +599,55 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
|
||||||
.verify();
|
.verify();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionedBySupportedGranularityLiteralClauses()
|
||||||
|
{
|
||||||
|
final RowSignature targetRowSignature = RowSignature.builder()
|
||||||
|
.add("__time", ColumnType.LONG)
|
||||||
|
.add("dim1", ColumnType.STRING)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Map<String, Granularity> partitionedByToGranularity =
|
||||||
|
Arrays.stream(GranularityType.values())
|
||||||
|
.collect(Collectors.toMap(GranularityType::name, GranularityType::getDefaultGranularity));
|
||||||
|
|
||||||
|
final ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
|
||||||
|
partitionedByToGranularity.forEach((partitionedByArgument, expectedGranularity) -> {
|
||||||
|
Map<String, Object> queryContext = null;
|
||||||
|
try {
|
||||||
|
queryContext = ImmutableMap.of(
|
||||||
|
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, queryJsonMapper.writeValueAsString(expectedGranularity)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (JsonProcessingException e) {
|
||||||
|
// Won't reach here
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
testIngestionQuery()
|
||||||
|
.sql(StringUtils.format(
|
||||||
|
"REPLACE INTO druid.dst OVERWRITE ALL SELECT __time, dim1 FROM foo PARTITIONED BY '%s'",
|
||||||
|
partitionedByArgument
|
||||||
|
))
|
||||||
|
.expectTarget("dst", targetRowSignature)
|
||||||
|
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
|
||||||
|
.expectQuery(
|
||||||
|
newScanQueryBuilder()
|
||||||
|
.dataSource("foo")
|
||||||
|
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||||
|
.columns("__time", "dim1")
|
||||||
|
.context(queryContext)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.verify();
|
||||||
|
didTest = false;
|
||||||
|
});
|
||||||
|
didTest = true;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReplaceWithPartitionedByContainingInvalidGranularity()
|
public void testReplaceWithPartitionedByContainingInvalidGranularity()
|
||||||
{
|
{
|
||||||
// Throws a ValidationException, which gets converted to a DruidException before throwing to end user
|
|
||||||
try {
|
try {
|
||||||
testQuery(
|
testQuery(
|
||||||
"REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY 'invalid_granularity'",
|
"REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY 'invalid_granularity'",
|
||||||
|
@ -610,7 +660,10 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
|
||||||
MatcherAssert.assertThat(
|
MatcherAssert.assertThat(
|
||||||
e,
|
e,
|
||||||
invalidSqlIs(
|
invalidSqlIs(
|
||||||
StringUtils.format(DruidSqlParserUtils.PARTITION_ERROR_MESSAGE, "'invalid_granularity'")
|
"Invalid granularity['invalid_granularity'] specified after PARTITIONED BY clause."
|
||||||
|
+ " Expected 'SECOND', 'MINUTE', 'FIVE_MINUTE', 'TEN_MINUTE', 'FIFTEEN_MINUTE', 'THIRTY_MINUTE', 'HOUR',"
|
||||||
|
+ " 'SIX_HOUR', 'EIGHT_HOUR', 'DAY', 'MONTH', 'QUARTER', 'YEAR', 'ALL', ALL TIME, FLOOR()"
|
||||||
|
+ " or TIME_FLOOR()"
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
didTest = true;
|
didTest = true;
|
||||||
|
|
|
@ -83,13 +83,13 @@ public class DruidSqlParserUtilsTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGranularityFromTimeFloor() throws ParseException
|
public void testGranularityFromTimeFloor()
|
||||||
{
|
{
|
||||||
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
|
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
|
||||||
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
||||||
args.add(SqlLiteral.createCharString(this.periodString, SqlParserPos.ZERO));
|
args.add(SqlLiteral.createCharString(this.periodString, SqlParserPos.ZERO));
|
||||||
final SqlNode timeFloorCall = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
|
final SqlNode timeFloorCall = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
|
||||||
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(
|
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(
|
||||||
timeFloorCall);
|
timeFloorCall);
|
||||||
Assert.assertEquals(expectedGranularity, actualGranularity);
|
Assert.assertEquals(expectedGranularity, actualGranularity);
|
||||||
}
|
}
|
||||||
|
@ -128,14 +128,14 @@ public class DruidSqlParserUtilsTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetGranularityFromFloor() throws ParseException
|
public void testGetGranularityFromFloor()
|
||||||
{
|
{
|
||||||
// parserPos doesn't matter
|
// parserPos doesn't matter
|
||||||
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
|
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
|
||||||
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
||||||
args.add(new SqlIntervalQualifier(this.timeUnit, null, SqlParserPos.ZERO));
|
args.add(new SqlIntervalQualifier(this.timeUnit, null, SqlParserPos.ZERO));
|
||||||
final SqlNode floorCall = SqlStdOperatorTable.FLOOR.createCall(args);
|
final SqlNode floorCall = SqlStdOperatorTable.FLOOR.createCall(args);
|
||||||
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(floorCall);
|
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(floorCall);
|
||||||
Assert.assertEquals(expectedGranularity, actualGranularity);
|
Assert.assertEquals(expectedGranularity, actualGranularity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,10 +143,10 @@ public class DruidSqlParserUtilsTest
|
||||||
* Tests clause like "PARTITIONED BY 'day'"
|
* Tests clause like "PARTITIONED BY 'day'"
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testConvertSqlNodeToGranularityAsLiteral() throws ParseException
|
public void testConvertSqlNodeToGranularityAsLiteral()
|
||||||
{
|
{
|
||||||
SqlNode sqlNode = SqlLiteral.createCharString(timeUnit.name(), SqlParserPos.ZERO);
|
SqlNode sqlNode = SqlLiteral.createCharString(timeUnit.name(), SqlParserPos.ZERO);
|
||||||
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode);
|
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode);
|
||||||
Assert.assertEquals(expectedGranularity, actualGranularity);
|
Assert.assertEquals(expectedGranularity, actualGranularity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,10 +154,10 @@ public class DruidSqlParserUtilsTest
|
||||||
* Tests clause like "PARTITIONED BY PT1D"
|
* Tests clause like "PARTITIONED BY PT1D"
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testConvertSqlNodeToPeriodFormGranularityAsIdentifier() throws ParseException
|
public void testConvertSqlNodeToPeriodFormGranularityAsIdentifier()
|
||||||
{
|
{
|
||||||
SqlNode sqlNode = new SqlIdentifier(period.toString(), SqlParserPos.ZERO);
|
SqlNode sqlNode = new SqlIdentifier(period.toString(), SqlParserPos.ZERO);
|
||||||
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode);
|
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode);
|
||||||
Assert.assertEquals(expectedGranularity, actualGranularity);
|
Assert.assertEquals(expectedGranularity, actualGranularity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,10 +165,10 @@ public class DruidSqlParserUtilsTest
|
||||||
* Tests clause like "PARTITIONED BY 'PT1D'"
|
* Tests clause like "PARTITIONED BY 'PT1D'"
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testConvertSqlNodeToPeriodFormGranularityAsLiteral() throws ParseException
|
public void testConvertSqlNodeToPeriodFormGranularityAsLiteral()
|
||||||
{
|
{
|
||||||
SqlNode sqlNode = SqlLiteral.createCharString(period.toString(), SqlParserPos.ZERO);
|
SqlNode sqlNode = SqlLiteral.createCharString(period.toString(), SqlParserPos.ZERO);
|
||||||
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode);
|
Granularity actualGranularity = DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode);
|
||||||
Assert.assertEquals(expectedGranularity, actualGranularity);
|
Assert.assertEquals(expectedGranularity, actualGranularity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -353,14 +353,13 @@ public class DruidSqlParserUtilsTest
|
||||||
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
||||||
args.add(new SqlIntervalQualifier(TimeUnit.DAY, null, SqlParserPos.ZERO));
|
args.add(new SqlIntervalQualifier(TimeUnit.DAY, null, SqlParserPos.ZERO));
|
||||||
final SqlNode sqlNode = SqlStdOperatorTable.CEIL.createCall(args);
|
final SqlNode sqlNode = SqlStdOperatorTable.CEIL.createCall(args);
|
||||||
ParseException e = Assert.assertThrows(
|
DruidExceptionMatcher
|
||||||
ParseException.class,
|
.invalidSqlInput()
|
||||||
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
|
.expectMessageIs(
|
||||||
);
|
"Invalid operator[CEIL] specified. PARTITIONED BY clause only supports FLOOR(__time TO <unit>)"
|
||||||
Assert.assertEquals(
|
+ " and TIME_FLOOR(__time, period) functions."
|
||||||
"PARTITIONED BY clause only supports FLOOR(__time TO <unit> and TIME_FLOOR(__time, period) functions",
|
)
|
||||||
e.getMessage()
|
.assertThrowsAndMatches(() -> DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode));
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -372,11 +371,12 @@ public class DruidSqlParserUtilsTest
|
||||||
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
|
final SqlNodeList args = new SqlNodeList(SqlParserPos.ZERO);
|
||||||
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
||||||
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
|
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
|
||||||
ParseException e = Assert.assertThrows(
|
DruidExceptionMatcher
|
||||||
ParseException.class,
|
.invalidSqlInput()
|
||||||
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
|
.expectMessageIs(
|
||||||
);
|
"FLOOR in PARTITIONED BY clause must have 2 arguments, but only [1] provided."
|
||||||
Assert.assertEquals("FLOOR in PARTITIONED BY clause must have two arguments", e.getMessage());
|
)
|
||||||
|
.assertThrowsAndMatches(() -> DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -389,11 +389,13 @@ public class DruidSqlParserUtilsTest
|
||||||
args.add(new SqlIdentifier("timestamps", SqlParserPos.ZERO));
|
args.add(new SqlIdentifier("timestamps", SqlParserPos.ZERO));
|
||||||
args.add(new SqlIntervalQualifier(TimeUnit.DAY, null, SqlParserPos.ZERO));
|
args.add(new SqlIntervalQualifier(TimeUnit.DAY, null, SqlParserPos.ZERO));
|
||||||
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
|
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
|
||||||
ParseException e = Assert.assertThrows(
|
DruidExceptionMatcher
|
||||||
ParseException.class,
|
.invalidSqlInput()
|
||||||
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
|
.expectMessageIs(
|
||||||
);
|
"Invalid argument[timestamps] provided. The first argument to FLOOR in PARTITIONED BY"
|
||||||
Assert.assertEquals("First argument to FLOOR in PARTITIONED BY clause can only be __time", e.getMessage());
|
+ " clause must be __time."
|
||||||
|
)
|
||||||
|
.assertThrowsAndMatches(() -> DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -406,11 +408,13 @@ public class DruidSqlParserUtilsTest
|
||||||
args.add(new SqlIdentifier("timestamps", SqlParserPos.ZERO));
|
args.add(new SqlIdentifier("timestamps", SqlParserPos.ZERO));
|
||||||
args.add(SqlLiteral.createCharString("PT1H", SqlParserPos.ZERO));
|
args.add(SqlLiteral.createCharString("PT1H", SqlParserPos.ZERO));
|
||||||
final SqlNode sqlNode = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
|
final SqlNode sqlNode = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
|
||||||
ParseException e = Assert.assertThrows(
|
DruidExceptionMatcher
|
||||||
ParseException.class,
|
.invalidSqlInput()
|
||||||
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
|
.expectMessageIs(
|
||||||
);
|
"Invalid argument[timestamps] provided. The first argument to TIME_FLOOR in"
|
||||||
Assert.assertEquals("First argument to TIME_FLOOR in PARTITIONED BY clause can only be __time", e.getMessage());
|
+ " PARTITIONED BY clause must be __time."
|
||||||
|
)
|
||||||
|
.assertThrowsAndMatches(() -> DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -423,11 +427,12 @@ public class DruidSqlParserUtilsTest
|
||||||
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
||||||
args.add(new SqlIntervalQualifier(TimeUnit.ISOYEAR, null, SqlParserPos.ZERO));
|
args.add(new SqlIntervalQualifier(TimeUnit.ISOYEAR, null, SqlParserPos.ZERO));
|
||||||
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
|
final SqlNode sqlNode = SqlStdOperatorTable.FLOOR.createCall(args);
|
||||||
ParseException e = Assert.assertThrows(
|
DruidExceptionMatcher
|
||||||
ParseException.class,
|
.invalidSqlInput()
|
||||||
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
|
.expectMessageIs(
|
||||||
);
|
"ISOYEAR is not a valid period granularity for ingestion."
|
||||||
Assert.assertEquals("ISOYEAR is not a valid granularity for ingestion", e.getMessage());
|
)
|
||||||
|
.assertThrowsAndMatches(() -> DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -440,11 +445,12 @@ public class DruidSqlParserUtilsTest
|
||||||
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
args.add(new SqlIdentifier("__time", SqlParserPos.ZERO));
|
||||||
args.add(SqlLiteral.createCharString("abc", SqlParserPos.ZERO));
|
args.add(SqlLiteral.createCharString("abc", SqlParserPos.ZERO));
|
||||||
final SqlNode sqlNode = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
|
final SqlNode sqlNode = TimeFloorOperatorConversion.SQL_FUNCTION.createCall(args);
|
||||||
DruidException e = Assert.assertThrows(
|
DruidExceptionMatcher
|
||||||
DruidException.class,
|
.invalidSqlInput()
|
||||||
() -> DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(sqlNode)
|
.expectMessageIs(
|
||||||
);
|
"granularity['abc'] is an invalid period literal."
|
||||||
Assert.assertEquals("granularity['abc'] is an invalid period string", e.getMessage());
|
)
|
||||||
|
.assertThrowsAndMatches(() -> DruidSqlParserUtils.convertSqlNodeToGranularity(sqlNode));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue