Add cluster by support for replace syntax (#12524)

* Add cluster by support for replace syntax

* Add unit test for with list
This commit is contained in:
Adarsh Sanjeev 2022-05-17 15:15:29 +05:30 committed by GitHub
parent b23ddc5939
commit fcb1c0b7bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 156 additions and 50 deletions

View File

@ -72,3 +72,22 @@ org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity
return new org.apache.druid.java.util.common.Pair(granularity, unparseString); return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
} }
} }
SqlNodeList ClusterItems() :
{
List<SqlNode> list;
final Span s;
SqlNode e;
}
{
e = OrderItem() {
s = span();
list = startList(e);
}
(
LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
)*
{
return new SqlNodeList(list, s.addAll(list).pos());
}
}

View File

@ -51,22 +51,3 @@ SqlNode DruidSqlInsertEof() :
return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy); return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy);
} }
} }
SqlNodeList ClusterItems() :
{
List<SqlNode> list;
final Span s;
SqlNode e;
}
{
e = OrderItem() {
s = span();
list = startList(e);
}
(
LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
)*
{
return new SqlNodeList(list, s.addAll(list).pos());
}
}

View File

@ -27,6 +27,7 @@ SqlNode DruidSqlReplaceEof() :
SqlInsert sqlInsert; SqlInsert sqlInsert;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj // Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null); org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNodeList clusteredBy = null;
final Pair<SqlNodeList, SqlNodeList> p; final Pair<SqlNodeList, SqlNodeList> p;
SqlNode replaceTimeQuery = null; SqlNode replaceTimeQuery = null;
} }
@ -51,6 +52,10 @@ SqlNode DruidSqlReplaceEof() :
<PARTITIONED> <BY> <PARTITIONED> <BY>
partitionedBy = PartitionGranularity() partitionedBy = PartitionGranularity()
] ]
[
<CLUSTERED> <BY>
clusteredBy = ClusterItems()
]
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times. // EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a DruidSqlReplace node after the syntax has been // The reason for adding EOF here is to ensure that we create a DruidSqlReplace node after the syntax has been
// validated and throw SQL syntax errors before performing validations in the DruidSqlReplace which can overshadow the // validated and throw SQL syntax errors before performing validations in the DruidSqlReplace which can overshadow the
@ -58,7 +63,7 @@ SqlNode DruidSqlReplaceEof() :
<EOF> <EOF>
{ {
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table, source, columnList); sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, table, source, columnList);
return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, replaceTimeQuery); return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, replaceTimeQuery);
} }
} }

View File

@ -28,6 +28,8 @@ import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlTimestampLiteral; import org.apache.calcite.sql.SqlTimestampLiteral;
import org.apache.calcite.tools.ValidationException; import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
@ -245,6 +247,38 @@ public class DruidSqlParserUtils
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
/**
* Extracts and converts the information in the CLUSTERED BY clause to a new SqlOrderBy node.
*
* @param query sql query
* @param clusteredByList List of clustered by columns
* @return SqlOrderBy node containing the clusteredByList information
*/
public static SqlOrderBy convertClusterByToOrderBy(SqlNode query, SqlNodeList clusteredByList)
{
// If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new
// SqlOrderBy node
SqlNode offset = null;
SqlNode fetch = null;
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
// This represents the underlying query free of OFFSET, FETCH and ORDER BY clauses
// For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
// BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1
query = sqlOrderBy.query;
offset = sqlOrderBy.offset;
fetch = sqlOrderBy.fetch;
}
// Creates a new SqlOrderBy query, which may have our CLUSTERED BY overwritten
return new SqlOrderBy(
query.getParserPosition(),
query,
clusteredByList,
offset,
fetch
);
}
/** /**
* This method is used to convert an {@link SqlNode} representing a query into a {@link DimFilter} for the same query. * This method is used to convert an {@link SqlNode} representing a query into a {@link DimFilter} for the same query.

View File

@ -51,6 +51,9 @@ public class DruidSqlReplace extends SqlInsert
private final SqlNode replaceTimeQuery; private final SqlNode replaceTimeQuery;
@Nullable
private final SqlNodeList clusteredBy;
/** /**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is * While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly * disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
@ -61,6 +64,7 @@ public class DruidSqlReplace extends SqlInsert
@Nonnull SqlInsert insertNode, @Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy, @Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse, @Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy,
@Nullable SqlNode replaceTimeQuery @Nullable SqlNode replaceTimeQuery
) throws ParseException ) throws ParseException
{ {
@ -82,6 +86,8 @@ public class DruidSqlReplace extends SqlInsert
this.partitionedByStringForUnparse = Preconditions.checkNotNull(partitionedByStringForUnparse); this.partitionedByStringForUnparse = Preconditions.checkNotNull(partitionedByStringForUnparse);
this.replaceTimeQuery = replaceTimeQuery; this.replaceTimeQuery = replaceTimeQuery;
this.clusteredBy = clusteredBy;
} }
public SqlNode getReplaceTimeQuery() public SqlNode getReplaceTimeQuery()
@ -94,6 +100,12 @@ public class DruidSqlReplace extends SqlInsert
return partitionedBy; return partitionedBy;
} }
@Nullable
public SqlNodeList getClusteredBy()
{
return clusteredBy;
}
@Nonnull @Nonnull
@Override @Override
public SqlOperator getOperator() public SqlOperator getOperator()
@ -128,5 +140,14 @@ public class DruidSqlReplace extends SqlInsert
writer.keyword("PARTITIONED BY"); writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse); writer.keyword(partitionedByStringForUnparse);
if (getClusteredBy() != null) {
writer.keyword("CLUSTERED BY");
SqlWriter.Frame frame = writer.startList("", "");
for (SqlNode clusterByOpts : getClusteredBy().getList()) {
clusterByOpts.unparse(writer, leftPrec, rightPrec);
}
writer.endList(frame);
}
} }
} }

View File

@ -852,28 +852,7 @@ public class DruidPlanner implements Closeable
Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy(); Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy();
if (druidSqlInsert.getClusteredBy() != null) { if (druidSqlInsert.getClusteredBy() != null) {
// If we have a CLUSTERED BY clause, extract the information in that CLUSTERED BY and create a new query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlInsert.getClusteredBy());
// SqlOrderBy node
SqlNode offset = null;
SqlNode fetch = null;
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
// This represents the underlying query free of OFFSET, FETCH and ORDER BY clauses
// For a sqlOrderBy.query like "SELECT dim1, sum(dim2) FROM foo OFFSET 10 FETCH 30 ORDER BY dim1 GROUP
// BY dim1 this would represent the "SELECT dim1, sum(dim2) from foo GROUP BY dim1
query = sqlOrderBy.query;
offset = sqlOrderBy.offset;
fetch = sqlOrderBy.fetch;
}
// Creates a new SqlOrderBy query, which may have our CLUSTERED BY overwritten
query = new SqlOrderBy(
query.getParserPosition(),
query,
druidSqlInsert.getClusteredBy(),
offset,
fetch
);
} }
if (!query.isA(SqlKind.QUERY)) { if (!query.isA(SqlKind.QUERY)) {
@ -893,7 +872,7 @@ public class DruidPlanner implements Closeable
SqlOrderBy sqlOrderBy = (SqlOrderBy) query; SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
SqlNodeList orderByList = sqlOrderBy.orderList; SqlNodeList orderByList = sqlOrderBy.orderList;
if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) { if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
throw new ValidationException("Cannot have ORDER BY on a REPLACE query."); throw new ValidationException("Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead.");
} }
} }
@ -905,6 +884,10 @@ public class DruidPlanner implements Closeable
Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy(); Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy();
List<String> replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, ingestionGranularity, dateTimeZone); List<String> replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, ingestionGranularity, dateTimeZone);
if (druidSqlReplace.getClusteredBy() != null) {
query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlReplace.getClusteredBy());
}
if (!query.isA(SqlKind.QUERY)) { if (!query.isA(SqlKind.QUERY)) {
throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind())); throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
} }

View File

@ -36,6 +36,7 @@ import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace; import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerContext;
@ -54,7 +55,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
"{\"type\":\"all\"}", "{\"type\":\"all\"}",
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS, DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
"all" DruidSqlParserUtils.ALL
); );
protected Map<String, Object> addReplaceTimeChunkToQueryContext(Map<String, Object> context, String replaceTimeChunks) protected Map<String, Object> addReplaceTimeChunkToQueryContext(Map<String, Object> context, String replaceTimeChunks)
@ -252,7 +253,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{ {
testIngestionQuery() testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 PARTITIONED BY ALL TIME") .sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE query.") .expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead.")
.verify(); .verify();
} }
@ -350,6 +351,29 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.verify(); .verify();
} }
@Test
public void testReplaceContainingWithList()
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL WITH foo_data AS (SELECT * FROM foo) SELECT dim1, dim3 FROM foo_data PARTITIONED BY ALL TIME")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("dst", RowSignature.builder()
.add("dim1", ColumnType.STRING)
.add("dim3", ColumnType.STRING)
.build()
)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim1", "dim3")
.context(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}
@Test @Test
public void testReplaceIntoInvalidDataSourceName() public void testReplaceIntoInvalidDataSourceName()
{ {
@ -484,7 +508,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.context( .context(
addReplaceTimeChunkToQueryContext( addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.DAY), queryContextWithGranularity(Granularities.DAY),
"all" DruidSqlParserUtils.ALL
) )
) )
.build() .build()
@ -492,6 +516,43 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.verify(); .verify();
} }
@Test
public void testReplaceWithClusteredBy()
{
// Test correctness of the query when CLUSTERED BY clause is present
RowSignature targetRowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("floor_m1", ColumnType.FLOAT)
.add("dim1", ColumnType.STRING)
.build();
testIngestionQuery()
.sql(
"REPLACE INTO druid.dst OVERWRITE ALL SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo PARTITIONED BY DAY CLUSTERED BY 2, dim1")
.expectTarget("dst", targetRowSignature)
.expectResources(dataSourceRead("foo"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "dim1", "v0")
.virtualColumns(expressionVirtualColumn("v0", "floor(\"m1\")", ColumnType.FLOAT))
.orderBy(
ImmutableList.of(
new ScanQuery.OrderBy("v0", ScanQuery.Order.ASCENDING),
new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING)
)
)
.context(
addReplaceTimeChunkToQueryContext(
queryContextWithGranularity(Granularities.DAY),
DruidSqlParserUtils.ALL)
)
.build()
)
.verify();
}
@Test @Test
public void testReplaceWithPartitionedByContainingInvalidGranularity() throws Exception public void testReplaceWithPartitionedByContainingInvalidGranularity() throws Exception
{ {

View File

@ -53,12 +53,13 @@ public class DruidSqlUnparseTest
@Test @Test
public void testUnparseReplaceAll() throws ParseException public void testUnparseReplaceAll() throws ParseException
{ {
String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME"; String sqlQuery = "REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME CLUSTERED BY dim1";
String prettySqlQuery = "REPLACE INTO \"dst\"\n" String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+ "OVERWRITE ALL\n" + "OVERWRITE ALL\n"
+ "(SELECT *\n" + "(SELECT *\n"
+ " FROM \"foo\")\n" + " FROM \"foo\")\n"
+ "PARTITIONED BY ALL TIME"; + "PARTITIONED BY ALL TIME "
+ "CLUSTERED BY \"dim1\"";
DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery); DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
DruidSqlReplace druidSqlReplace = (DruidSqlReplace) druidSqlParser.DruidSqlReplaceEof(); DruidSqlReplace druidSqlReplace = (DruidSqlReplace) druidSqlParser.DruidSqlReplaceEof();
@ -70,12 +71,13 @@ public class DruidSqlUnparseTest
@Test @Test
public void testUnparseReplaceWhere() throws ParseException public void testUnparseReplaceWhere() throws ParseException
{ {
String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT * FROM foo PARTITIONED BY DAY"; String sqlQuery = "REPLACE INTO dst OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2000-01-02 00:00:00' SELECT * FROM foo PARTITIONED BY DAY CLUSTERED BY dim1";
String prettySqlQuery = "REPLACE INTO \"dst\"\n" String prettySqlQuery = "REPLACE INTO \"dst\"\n"
+ "OVERWRITE \"__time\" >= TIMESTAMP '2000-01-01 00:00:00' AND \"__time\" < TIMESTAMP '2000-01-02 00:00:00'\n" + "OVERWRITE \"__time\" >= TIMESTAMP '2000-01-01 00:00:00' AND \"__time\" < TIMESTAMP '2000-01-02 00:00:00'\n"
+ "(SELECT *\n" + "(SELECT *\n"
+ " FROM \"foo\")\n" + " FROM \"foo\")\n"
+ "PARTITIONED BY DAY"; + "PARTITIONED BY DAY "
+ "CLUSTERED BY \"dim1\"";
DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery); DruidSqlParserImpl druidSqlParser = createTestParser(sqlQuery);
DruidSqlReplace druidSqlReplace = (DruidSqlReplace) druidSqlParser.DruidSqlReplaceEof(); DruidSqlReplace druidSqlReplace = (DruidSqlReplace) druidSqlParser.DruidSqlReplaceEof();