Convert the Druid planner to use statement handlers (#12905)

* Converted Druid planner to use statement handlers

Converts the large collection of if-statements for statement
types into a set of classes: one per supported statement type.
Cleans up a few error messages.

* Revisions from review comments

* Build fix

* Build fix

* Resolve merge confict.

* More merges with QueryResponse PR

* More parameterized type cleanup

Forces a rebuild due to a flaky test
This commit is contained in:
Paul Rogers 2022-09-19 08:28:45 +02:00 committed by GitHub
parent bb0b810b1d
commit 8ce03eb094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1334 additions and 995 deletions

View File

@ -105,7 +105,7 @@ public class MSQTaskQueryMaker implements QueryMaker
}
@Override
public QueryResponse runQuery(final DruidQuery druidQuery)
public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());

View File

@ -147,8 +147,8 @@ public class SqlTaskResource
final String sqlQueryId = stmt.sqlQueryId();
try {
final DirectStatement.ResultSet plan = stmt.plan();
final QueryResponse response = plan.run();
final Sequence sequence = response.getResults();
final QueryResponse<Object[]> response = plan.run();
final Sequence<Object[]> sequence = response.getResults();
final SqlRowTransformer rowTransformer = plan.createRowTransformer();
final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList());

View File

@ -439,7 +439,7 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"))
"CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"))
))
.verifyPlanningErrors();
}

View File

@ -187,7 +187,7 @@ public class MSQReplaceTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL."))
"Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table."))
)
)
.verifyPlanningErrors();

View File

@ -697,7 +697,7 @@ public class MSQSelectTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Cannot query table [INFORMATION_SCHEMA.SCHEMATA] with SQL engine 'msq-task'."))
"Cannot query table INFORMATION_SCHEMA.SCHEMATA with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
@ -712,7 +712,7 @@ public class MSQSelectTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Cannot query table [sys.segments] with SQL engine 'msq-task'."))
"Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
@ -727,7 +727,7 @@ public class MSQSelectTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Cannot query table [sys.segments] with SQL engine 'msq-task'."))
"Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
@ -743,7 +743,7 @@ public class MSQSelectTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Cannot query table [sys.segments] with SQL engine 'msq-task'."))
"Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();

View File

@ -128,7 +128,6 @@ public class QueryLifecycle
this.startNs = startNs;
}
/**
* For callers who have already authorized their query, and where simplicity is desired over flexibility. This method
* does it all in one call. Logs and metrics are emitted when the Sequence is either fully iterated or throws an
@ -140,8 +139,7 @@ public class QueryLifecycle
*
* @return results
*/
@SuppressWarnings("unchecked")
public <T> QueryResponse runSimple(
public <T> QueryResponse<T> runSimple(
final Query<T> query,
final AuthenticationResult authenticationResult,
final Access authorizationResult
@ -151,7 +149,7 @@ public class QueryLifecycle
final Sequence<T> results;
final QueryResponse queryResponse;
final QueryResponse<T> queryResponse;
try {
preAuthorized(authenticationResult, authorizationResult);
if (!authorizationResult.isAllowed()) {
@ -172,7 +170,7 @@ public class QueryLifecycle
* cannot be moved into execute(). We leave this as an exercise for the future, however as this oddity
* was discovered while just trying to expose HTTP response headers
*/
return new QueryResponse(
return new QueryResponse<T>(
Sequences.wrap(
results,
new SequenceWrapper()
@ -193,8 +191,7 @@ public class QueryLifecycle
*
* @param baseQuery the query
*/
@SuppressWarnings("unchecked")
public void initialize(final Query baseQuery)
public void initialize(final Query<?> baseQuery)
{
transition(State.NEW, State.INITIALIZED);
@ -282,17 +279,18 @@ public class QueryLifecycle
*
* @return result sequence and response context
*/
public QueryResponse execute()
public <T> QueryResponse<T> execute()
{
transition(State.AUTHORIZED, State.EXECUTING);
final ResponseContext responseContext = DirectDruidClient.makeResponseContextForQuery();
final Sequence<?> res = QueryPlus.wrap(baseQuery)
@SuppressWarnings("unchecked")
final Sequence<T> res = QueryPlus.wrap((Query<T>) baseQuery)
.withIdentity(authenticationResult.getIdentity())
.run(texasRanger, responseContext);
return new QueryResponse(res == null ? Sequences.empty() : res, responseContext);
return new QueryResponse<T>(res == null ? Sequences.empty() : res, responseContext);
}
/**

View File

@ -200,7 +200,7 @@ public class QueryResource implements QueryCountStatsProvider
throw new ForbiddenException(authResult.toString());
}
final QueryResponse queryResponse = queryLifecycle.execute();
final QueryResponse<?> queryResponse = queryLifecycle.execute();
final Sequence<?> results = queryResponse.getResults();
final ResponseContext responseContext = queryResponse.getResponseContext();
final String prevEtag = getPreviousEtag(req);
@ -477,8 +477,8 @@ public class QueryResource implements QueryCountStatsProvider
}
ObjectWriter newOutputWriter(
@Nullable QueryToolChest toolChest,
@Nullable Query query,
@Nullable QueryToolChest<?, Query<?>> toolChest,
@Nullable Query<?> query,
boolean serializeDateTimeAsLong
)
{

View File

@ -22,23 +22,23 @@ package org.apache.druid.server;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
public class QueryResponse
public class QueryResponse<T>
{
public static QueryResponse withEmptyContext(Sequence results)
{
return new QueryResponse(results, ResponseContext.createEmpty());
}
private final Sequence results;
private final Sequence<T> results;
private final ResponseContext responseContext;
public QueryResponse(final Sequence results, final ResponseContext responseContext)
public QueryResponse(final Sequence<T> results, final ResponseContext responseContext)
{
this.results = results;
this.responseContext = responseContext;
}
public Sequence getResults()
public static <T> QueryResponse<T> withEmptyContext(Sequence<T> results)
{
return new QueryResponse<T>(results, ResponseContext.createEmpty());
}
public Sequence<T> getResults()
{
return results;
}

View File

@ -38,7 +38,7 @@ SqlNode DruidSqlInsertEof() :
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause");
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.

View File

@ -58,7 +58,7 @@ SqlNode DruidSqlReplaceEof() :
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause");
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.

View File

@ -98,7 +98,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
* Do the actual execute step which allows subclasses to wrap the sequence,
* as is sometimes needed for testing.
*/
public QueryResponse run()
public QueryResponse<Object[]> run()
{
try {
// Check cancellation. Required for SqlResourceTest to work.
@ -176,7 +176,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
*
* @return sequence which delivers query results
*/
public QueryResponse execute()
public QueryResponse<Object[]> execute()
{
return plan().run();
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.parser;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;
import javax.annotation.Nullable;
/**
* Common base class to the two Druid "ingest" statements: INSERT and REPLACE.
* Allows Planner code to work with these two statements generically where they
* share common clauses.
*/
public abstract class DruidSqlIngest extends SqlInsert
{
protected final Granularity partitionedBy;
// Used in the unparse function to generate the original query since we convert the string to an enum
protected final String partitionedByStringForUnparse;
@Nullable
protected final SqlNodeList clusteredBy;
public DruidSqlIngest(SqlParserPos pos,
SqlNodeList keywords,
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
)
{
super(pos, keywords, targetTable, source, columnList);
this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.partitionedBy = partitionedBy;
this.clusteredBy = clusteredBy;
}
public Granularity getPartitionedBy()
{
return partitionedBy;
}
@Nullable
public SqlNodeList getClusteredBy()
{
return clusteredBy;
}
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
@ -32,24 +31,16 @@ import javax.annotation.Nullable;
/**
* Extends the 'insert' call to hold custom parameters specific to Druid i.e. PARTITIONED BY and CLUSTERED BY
* This class extends the {@link SqlInsert} so that this SqlNode can be used in
* This class extends the {@link DruidSqlIngest} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
*/
public class DruidSqlInsert extends SqlInsert
public class DruidSqlInsert extends DruidSqlIngest
{
public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity";
// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
private final Granularity partitionedBy;
// Used in the unparse function to generate the original query since we convert the string to an enum
private final String partitionedByStringForUnparse;
@Nullable
private final SqlNodeList clusteredBy;
/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
@ -61,35 +52,18 @@ public class DruidSqlInsert extends SqlInsert
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
) throws ParseException
)
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList()
insertNode.getTargetColumnList(),
partitionedBy,
partitionedByStringForUnparse,
clusteredBy
);
if (partitionedBy == null) {
throw new ParseException("INSERT statements must specify PARTITIONED BY clause explicitly");
}
this.partitionedBy = partitionedBy;
Preconditions.checkNotNull(partitionedByStringForUnparse);
this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.clusteredBy = clusteredBy;
}
@Nullable
public SqlNodeList getClusteredBy()
{
return clusteredBy;
}
public Granularity getPartitionedBy()
{
return partitionedBy;
}
@Nonnull

View File

@ -19,7 +19,6 @@
package org.apache.druid.sql.calcite.parser;
import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
@ -38,22 +37,14 @@ import javax.annotation.Nullable;
* This class extends the {@link SqlInsert} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
*/
public class DruidSqlReplace extends SqlInsert
public class DruidSqlReplace extends DruidSqlIngest
{
public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";
public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER);
private final Granularity partitionedBy;
// Used in the unparse function to generate the original query since we convert the string to an enum
private final String partitionedByStringForUnparse;
private final SqlNode replaceTimeQuery;
@Nullable
private final SqlNodeList clusteredBy;
/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
@ -66,28 +57,20 @@ public class DruidSqlReplace extends SqlInsert
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy,
@Nullable SqlNode replaceTimeQuery
) throws ParseException
)
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList()
insertNode.getTargetColumnList(),
partitionedBy,
partitionedByStringForUnparse,
clusteredBy
);
if (replaceTimeQuery == null) {
throw new ParseException("Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.");
}
if (partitionedBy == null) {
throw new ParseException("REPLACE statements must specify PARTITIONED BY clause explicitly");
}
this.partitionedBy = partitionedBy;
this.partitionedByStringForUnparse = Preconditions.checkNotNull(partitionedByStringForUnparse);
this.replaceTimeQuery = replaceTimeQuery;
this.clusteredBy = clusteredBy;
}
public SqlNode getReplaceTimeQuery()
@ -95,17 +78,6 @@ public class DruidSqlReplace extends SqlInsert
return replaceTimeQuery;
}
public Granularity getPartitionedBy()
{
return partitionedBy;
}
@Nullable
public SqlNodeList getClusteredBy()
{
return clusteredBy;
}
@Nonnull
@Override
public SqlOperator getOperator()

View File

@ -0,0 +1,346 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import java.util.List;
import java.util.regex.Pattern;
public abstract class IngestHandler extends QueryHandler
{
private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
@VisibleForTesting
public static final String UNNAMED_INGESTION_COLUMN_ERROR =
"Cannot ingest expressions that do not have an alias "
+ "or columns with names like EXPR$[digit].\n"
+ "E.g. if you are ingesting \"func(X)\", then you can rewrite it as "
+ "\"func(X) as myColumn\"";
protected final Granularity ingestionGranularity;
protected String targetDatasource;
IngestHandler(
HandlerContext handlerContext,
DruidSqlIngest ingestNode,
SqlNode queryNode,
SqlExplain explain
)
{
super(handlerContext, queryNode, explain);
this.ingestionGranularity = ingestNode.getPartitionedBy();
}
protected static SqlNode convertQuery(DruidSqlIngest sqlNode) throws ValidationException
{
SqlNode query = sqlNode.getSource();
// Check if ORDER BY clause is not provided to the underlying query
if (query instanceof SqlOrderBy) {
SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
SqlNodeList orderByList = sqlOrderBy.orderList;
if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
String opName = sqlNode.getOperator().getName();
throw new ValidationException(StringUtils.format(
"Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
"INSERT".equals(opName) ? "an" : "a",
opName
));
}
}
if (sqlNode.getClusteredBy() != null) {
query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy());
}
if (!query.isA(SqlKind.QUERY)) {
throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
}
return query;
}
protected String operationName()
{
return ingestNode().getOperator().getName();
}
protected abstract DruidSqlIngest ingestNode();
@Override
public void validate() throws ValidationException
{
if (ingestNode().getPartitionedBy() == null) {
throw new ValidationException(StringUtils.format(
"%s statements must specify PARTITIONED BY clause explicitly",
operationName()
));
}
try {
PlannerContext plannerContext = handlerContext.plannerContext();
if (ingestionGranularity != null) {
plannerContext.getQueryContext().addSystemParam(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
plannerContext.getJsonMapper().writeValueAsString(ingestionGranularity)
);
}
}
catch (JsonProcessingException e) {
throw new ValidationException("Unable to serialize partition granularity.");
}
super.validate();
// Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes
// the number of rows inserted to be limited which is likely to be confusing and unintended.
if (handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) {
throw new ValidationException(
StringUtils.format(
"%s cannot be provided with %s.",
PlannerContext.CTX_SQL_OUTER_LIMIT,
operationName()
)
);
}
targetDatasource = validateAndGetDataSourceForIngest();
resourceActions.add(new ResourceAction(new Resource(targetDatasource, ResourceType.DATASOURCE), Action.WRITE));
}
@Override
protected RelDataType returnedRowType()
{
final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
return handlerContext.engine().resultTypeForInsert(
typeFactory,
rootQueryRel.validatedRowType);
}
/**
* Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support.
* Expects the target datasource to be either an unqualified name, or a name qualified by the default schema.
*/
private String validateAndGetDataSourceForIngest() throws ValidationException
{
final SqlInsert insert = ingestNode();
if (insert.isUpsert()) {
throw new ValidationException("UPSERT is not supported.");
}
if (insert.getTargetColumnList() != null) {
throw new ValidationException(operationName() + " with a target column list is not supported.");
}
final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable();
final String dataSource;
if (tableIdentifier.names.isEmpty()) {
// I don't think this can happen, but include a branch for it just in case.
throw new ValidationException(operationName() + " requires a target table.");
} else if (tableIdentifier.names.size() == 1) {
// Unqualified name.
dataSource = Iterables.getOnlyElement(tableIdentifier.names);
} else {
// Qualified name.
final String defaultSchemaName =
Iterables.getOnlyElement(CalciteSchema.from(handlerContext.defaultSchema()).path(null));
if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) {
dataSource = tableIdentifier.names.get(1);
} else {
throw new ValidationException(
StringUtils.format(
"Cannot %s into %s because it is not a Druid datasource.",
operationName(),
tableIdentifier
)
);
}
}
try {
IdUtils.validateId(operationName() + " dataSource", dataSource);
}
catch (IllegalArgumentException e) {
throw new ValidationException(e.getMessage());
}
return dataSource;
}
@Override
protected PlannerResult planForDruid() throws ValidationException
{
return planWithDruidConvention();
}
@Override
protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException
{
validateColumnsForIngestion(rootQueryRel);
return handlerContext.engine().buildQueryMakerForInsert(
targetDatasource,
rootQueryRel,
handlerContext.plannerContext());
}
private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException
{
// Check that there are no unnamed columns in the insert.
for (Pair<Integer, String> field : rootQueryRel.fields) {
if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) {
throw new ValidationException(UNNAMED_INGESTION_COLUMN_ERROR);
}
}
}
/**
* Handler for the INSERT statement.
*/
protected static class InsertHandler extends IngestHandler
{
private final DruidSqlInsert sqlNode;
public InsertHandler(
SqlStatementHandler.HandlerContext handlerContext,
DruidSqlInsert sqlNode,
SqlExplain explain
) throws ValidationException
{
super(
handlerContext,
sqlNode,
convertQuery(sqlNode),
explain);
this.sqlNode = sqlNode;
}
@Override
public SqlNode sqlNode()
{
return sqlNode;
}
@Override
protected DruidSqlIngest ingestNode()
{
return sqlNode;
}
@Override
public void validate() throws ValidationException
{
if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_INSERT)) {
throw new ValidationException(StringUtils.format(
"Cannot execute INSERT with SQL engine '%s'.",
handlerContext.engine().name())
);
}
super.validate();
}
}
/**
* Handler for the REPLACE statement.
*/
protected static class ReplaceHandler extends IngestHandler
{
private final DruidSqlReplace sqlNode;
private List<String> replaceIntervals;
public ReplaceHandler(
SqlStatementHandler.HandlerContext handlerContext,
DruidSqlReplace sqlNode,
SqlExplain explain
) throws ValidationException
{
super(
handlerContext,
sqlNode,
convertQuery(sqlNode),
explain
);
this.sqlNode = sqlNode;
}
@Override
public SqlNode sqlNode()
{
return sqlNode;
}
@Override
protected DruidSqlIngest ingestNode()
{
return sqlNode;
}
@Override
public void validate() throws ValidationException
{
if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_REPLACE)) {
throw new ValidationException(StringUtils.format(
"Cannot execute REPLACE with SQL engine '%s'.",
handlerContext.engine().name())
);
}
SqlNode replaceTimeQuery = sqlNode.getReplaceTimeQuery();
if (replaceTimeQuery == null) {
throw new ValidationException("Missing time chunk information in OVERWRITE clause for REPLACE. Use "
+ "OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.");
}
replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(
replaceTimeQuery,
ingestionGranularity,
handlerContext.timeZone());
super.validate();
if (replaceIntervals != null) {
handlerContext.queryContext().addSystemParam(
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
String.join(",", replaceIntervals)
);
}
}
}
}

View File

@ -32,12 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class PlannerResult
{
private final Supplier<QueryResponse> resultsSupplier;
private final Supplier<QueryResponse<Object[]>> resultsSupplier;
private final RelDataType rowType;
private final AtomicBoolean didRun = new AtomicBoolean();
public PlannerResult(
final Supplier<QueryResponse> resultsSupplier,
final Supplier<QueryResponse<Object[]>> resultsSupplier,
final RelDataType rowType
)
{
@ -53,7 +53,7 @@ public class PlannerResult
/**
* Run the query
*/
public QueryResponse run()
public QueryResponse<Object[]> run()
{
if (!didRun.compareAndSet(false, true)) {
// Safety check.

View File

@ -0,0 +1,675 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.DataContext;
import org.apache.calcite.interpreter.BindableConvention;
import org.apache.calcite.interpreter.BindableRel;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelVisitor;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.utils.Throwables;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Abstract base class for handlers that revolve around queries: SELECT,
* INSERT and REPLACE. This class handles the common SELECT portion of the statement.
*/
public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHandler
{
static final EmittingLogger log = new EmittingLogger(QueryHandler.class);
protected SqlNode queryNode;
protected SqlExplain explain;
protected SqlNode validatedQueryNode;
private boolean isPrepared;
protected RelRoot rootQueryRel;
private PrepareResult prepareResult;
protected RexBuilder rexBuilder;
public QueryHandler(SqlStatementHandler.HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain)
{
super(handlerContext);
this.queryNode = sqlNode;
this.explain = explain;
}
@Override
public void validate() throws ValidationException
{
CalcitePlanner planner = handlerContext.planner();
validatedQueryNode = planner.validate(rewriteParameters());
final SqlValidator validator = planner.getValidator();
SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle(
validator,
handlerContext.plannerContext()
);
validatedQueryNode.accept(resourceCollectorShuttle);
resourceActions = resourceCollectorShuttle.getResourceActions();
}
private SqlNode rewriteParameters()
{
// Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
// {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
// replacement.
//
// Parameter replacement is done only if the client provides parameter values.
// If this is a PREPARE-only, then there will be no values even if the statement contains
// parameters. If this is a PLAN, then we'll catch later the case that the statement
// contains parameters, but no values were provided.
PlannerContext plannerContext = handlerContext.plannerContext();
if (plannerContext.getParameters().isEmpty()) {
return queryNode;
} else {
return queryNode.accept(new SqlParameterizerShuttle(plannerContext));
}
}
@Override
public void prepare()
{
if (isPrepared) {
return;
}
isPrepared = true;
rootQueryRel = handlerContext.planner().rel(validatedQueryNode);
final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
final SqlValidator validator = handlerContext.planner().getValidator();
final RelDataType parameterTypes = validator.getParameterRowType(validatedQueryNode);
final RelDataType returnedRowType;
if (explain != null) {
returnedRowType = getExplainStructType(typeFactory);
} else {
returnedRowType = returnedRowType();
}
prepareResult = new PrepareResult(rootQueryRel.validatedRowType, returnedRowType, parameterTypes);
}
@Override
public PrepareResult prepareResult()
{
return prepareResult;
}
protected abstract RelDataType returnedRowType();
private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory)
{
return typeFactory.createStructType(
ImmutableList.of(
Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR),
Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR)
),
ImmutableList.of("PLAN", "RESOURCES")
);
}
@Override
public PlannerResult plan() throws ValidationException
{
prepare();
final Set<RelOptTable> bindableTables = getBindableTables(rootQueryRel.rel);
// the planner's type factory is not available until after parsing
rexBuilder = new RexBuilder(handlerContext.planner().getTypeFactory());
try {
if (!bindableTables.isEmpty()) {
// Consider BINDABLE convention when necessary. Used for metadata tables.
if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.ALLOW_BINDABLE_PLAN)) {
throw new ValidationException(
StringUtils.format(
"Cannot query table%s %s with SQL engine '%s'.",
bindableTables.size() != 1 ? "s" : "",
bindableTables.stream()
.map(table -> Joiner.on(".").join(table.getQualifiedName()))
.collect(Collectors.joining(", ")),
handlerContext.engine().name()
)
);
}
return planWithBindableConvention();
} else {
// Druid convention is used whenever there are no tables that require BINDABLE.
return planForDruid();
}
}
catch (Exception e) {
Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class);
if (null == cannotPlanException) {
// Not a CannotPlanException, rethrow without logging.
throw e;
}
Logger logger = log;
if (!handlerContext.queryContext().isDebug()) {
logger = log.noStackTrace();
}
String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException);
logger.warn(e, errorMessage);
throw new UnsupportedSQLQueryException(errorMessage);
}
}
private static Set<RelOptTable> getBindableTables(final RelNode relNode)
{
class HasBindableVisitor extends RelVisitor
{
private final Set<RelOptTable> found = new HashSet<>();
@Override
public void visit(RelNode node, int ordinal, RelNode parent)
{
if (node instanceof TableScan) {
RelOptTable table = node.getTable();
if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) {
found.add(table);
return;
}
}
super.visit(node, ordinal, parent);
}
}
final HasBindableVisitor visitor = new HasBindableVisitor();
visitor.go(relNode);
return visitor.found;
}
/**
* Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for
* things that are not directly translatable to native Druid queries such
* as system tables and just a general purpose (but definitely not optimized)
* fall-back.
*
* See {@link #planWithDruidConvention} which will handle things which are
* directly translatable to native Druid queries.
*
* The bindable path handles parameter substitution of any values not
* bound by the earlier steps.
*/
private PlannerResult planWithBindableConvention()
{
CalcitePlanner planner = handlerContext.planner();
BindableRel bindableRel = (BindableRel) planner.transform(
CalciteRulesManager.BINDABLE_CONVENTION_RULES,
planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(rootQueryRel.collation),
rootQueryRel.rel
);
if (!rootQueryRel.isRefTrivial()) {
// Add a projection on top to accommodate root.fields.
final List<RexNode> projects = new ArrayList<>();
final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder();
for (int field : Pair.left(rootQueryRel.fields)) {
projects.add(rexBuilder.makeInputRef(bindableRel, field));
}
bindableRel = new Bindables.BindableProject(
bindableRel.getCluster(),
bindableRel.getTraitSet(),
bindableRel,
projects,
rootQueryRel.validatedRowType
);
}
PlannerContext plannerContext = handlerContext.plannerContext();
if (explain != null) {
return planExplanation(bindableRel, false);
} else {
final BindableRel theRel = bindableRel;
final DataContext dataContext = plannerContext.createDataContext(
planner.getTypeFactory(),
plannerContext.getParameters()
);
final Supplier<QueryResponse<Object[]>> resultsSupplier = () -> {
final Enumerable<?> enumerable = theRel.bind(dataContext);
final Enumerator<?> enumerator = enumerable.enumerator();
return QueryResponse.withEmptyContext(
Sequences.withBaggage(new BaseSequence<>(
new BaseSequence.IteratorMaker<Object[], QueryHandler.EnumeratorIterator<Object[]>>()
{
@Override
public QueryHandler.EnumeratorIterator<Object[]> make()
{
return new QueryHandler.EnumeratorIterator<>(new Iterator<Object[]>()
{
@Override
public boolean hasNext()
{
return enumerator.moveNext();
}
@Override
public Object[] next()
{
return (Object[]) enumerator.current();
}
});
}
@Override
public void cleanup(QueryHandler.EnumeratorIterator<Object[]> iterFromMake)
{
}
}
), enumerator::close)
);
};
return new PlannerResult(resultsSupplier, rootQueryRel.validatedRowType);
}
}
/**
* Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode}
*/
protected PlannerResult planExplanation(
final RelNode rel,
final boolean isDruidConventionExplanation
)
{
PlannerContext plannerContext = handlerContext.plannerContext();
String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel());
String resourcesString;
try {
if (isDruidConventionExplanation && rel instanceof DruidRel) {
// Show the native queries instead of Calcite's explain if the legacy flag is turned off
if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) {
DruidRel<?> druidRel = (DruidRel<?>) rel;
try {
explanation = explainSqlPlanAsNativeQueries(druidRel);
}
catch (Exception ex) {
log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan.");
}
}
}
final Set<Resource> resources =
plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet());
resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources);
}
catch (JsonProcessingException jpe) {
// this should never happen, we create the Resources here, not a user
log.error(jpe, "Encountered exception while serializing resources for explain output");
resourcesString = null;
}
final Supplier<QueryResponse<Object[]>> resultsSupplier = Suppliers.ofInstance(
QueryResponse.withEmptyContext(
Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString}))
)
);
return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()));
}
/**
* This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} since that tends to be verbose
* and not indicative of the native Druid Queries which will get executed
* This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implicitly cast it
*
* @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it
* @return A string representing an array of native queries that correspond to the given SQL query, in JSON format
* @throws JsonProcessingException
*/
private String explainSqlPlanAsNativeQueries(DruidRel<?> rel) throws JsonProcessingException
{
ObjectMapper jsonMapper = handlerContext.jsonMapper();
List<DruidQuery> druidQueryList;
druidQueryList = flattenOutermostRel(rel)
.stream()
.map(druidRel -> druidRel.toDruidQuery(false))
.collect(Collectors.toList());
// Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when
// serializing the "queryType". Another method would be to create a POJO containing query and signature, and then
// serializing it using normal list method.
ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode();
for (DruidQuery druidQuery : druidQueryList) {
Query<?> nativeQuery = druidQuery.getQuery();
ObjectNode objectNode = jsonMapper.createObjectNode();
objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class));
objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class));
nativeQueriesArrayNode.add(objectNode);
}
return jsonMapper.writeValueAsString(nativeQueriesArrayNode);
}
/**
* Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel}
* It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel}
* node
* E.g. a DruidRel structure of kind:<pre><code>
* DruidUnionRel
* DruidUnionRel
* DruidRel (A)
* DruidRel (B)
* DruidRel(C)
* </code</pre>will return {@code [DruidRel(A), DruidRel(B), DruidRel(C)]}.
*
* @param outermostDruidRel The outermost rel which is to be flattened
* @return a list of DruidRel's which do not have a DruidUnionRel nested in between them
*/
private List<DruidRel<?>> flattenOutermostRel(DruidRel<?> outermostDruidRel)
{
List<DruidRel<?>> druidRels = new ArrayList<>();
flattenOutermostRel(outermostDruidRel, druidRels);
return druidRels;
}
/**
* Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if
* they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the
* nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()}
*
* @param druidRel The current relNode
* @param flattendListAccumulator Accumulator list which needs to be appended by this method
*/
private void flattenOutermostRel(DruidRel<?> druidRel, List<DruidRel<?>> flattendListAccumulator)
{
if (druidRel instanceof DruidUnionRel) {
DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel;
druidUnionRel.getInputs().forEach(innerRelNode -> {
DruidRel<?> innerDruidRelNode = (DruidRel<?>) innerRelNode; // This type conversion should always be possible
flattenOutermostRel(innerDruidRelNode, flattendListAccumulator);
});
} else {
flattendListAccumulator.add(druidRel);
}
}
protected abstract PlannerResult planForDruid() throws ValidationException;
/**
* Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query.
*/
protected PlannerResult planWithDruidConvention() throws ValidationException
{
final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(rootQueryRel);
final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot);
PlannerContext plannerContext = handlerContext.plannerContext();
plannerContext.setQueryMaker(queryMaker);
// Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle}
// in the event that {@link #rewriteDynamicParameters(SqlNode)} was unable to
// successfully substitute all parameter values, and will cause a failure if any
// dynamic a parameters are not bound. This occurs at least for DATE parameters
// with integer values.
//
// This check also catches the case where we did not do a parameter check earlier
// because no values were provided. (Values are not required in the PREPARE case
// but now that we're planning, we require them.)
RelNode parameterized = possiblyLimitedRoot.rel.accept(
new RelParameterizerShuttle(plannerContext)
);
CalcitePlanner planner = handlerContext.planner();
final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
CalciteRulesManager.DRUID_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(DruidConvention.instance())
.plus(rootQueryRel.collation),
parameterized
);
if (explain != null) {
return planExplanation(druidRel, true);
} else {
// Compute row type.
final RelDataType rowType = prepareResult.getReturnedRowType();
// Start the query.
final Supplier<QueryResponse<Object[]>> resultsSupplier = () -> {
// sanity check
final Set<ResourceAction> readResourceActions =
plannerContext.getResourceActions()
.stream()
.filter(action -> action.getAction() == Action.READ)
.collect(Collectors.toSet());
Preconditions.checkState(
readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty()
// The resources found in the plannerContext can be less than the datasources in
// the query plan, because the query planner can eliminate empty tables by replacing
// them with InlineDataSource of empty rows.
|| readResourceActions.size() >= druidRel.getDataSourceNames().size(),
"Authorization sanity check failed"
);
return druidRel.runQuery();
};
return new PlannerResult(resultsSupplier, rowType);
}
}
/**
* This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
* is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
* {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
*
* The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
* the web console, allowing it to apply a limit to queries without rewriting the original SQL.
*
* @param root root node
*
* @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
*/
@Nullable
private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root)
{
Object outerLimitObj = handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT);
Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true);
if (outerLimit == null) {
return root;
}
final LogicalSort newRootRel;
if (root.rel instanceof Sort) {
Sort sort = (Sort) root.rel;
final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort);
final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit));
if (newOffsetLimit.equals(originalOffsetLimit)) {
// nothing to do, don't bother to make a new sort
return root;
}
newRootRel = LogicalSort.create(
sort.getInput(),
sort.collation,
newOffsetLimit.getOffsetAsRexNode(rexBuilder),
newOffsetLimit.getLimitAsRexNode(rexBuilder)
);
} else {
newRootRel = LogicalSort.create(
root.rel,
root.collation,
null,
new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder)
);
}
return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation);
}
protected abstract QueryMaker buildQueryMaker(RelRoot rootQueryRel) throws ValidationException;
private String buildSQLPlanningErrorMessage(Throwable exception)
{
String errorMessage = handlerContext.plannerContext().getPlanningError();
if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) {
errorMessage = exception.getMessage();
}
if (null == errorMessage) {
errorMessage = "Please check Broker logs for additional details.";
} else {
// Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong.
errorMessage = "Possible error: " + errorMessage;
}
// Finally, add the query itself to error message that user will get.
return StringUtils.format(
"Query not supported. %s SQL was: %s", errorMessage,
handlerContext.plannerContext().getSql()
);
}
public static class SelectHandler extends QueryHandler
{
private final SqlNode sqlNode;
public SelectHandler(
HandlerContext handlerContext,
SqlNode sqlNode,
SqlExplain explain)
{
super(handlerContext, sqlNode, explain);
this.sqlNode = sqlNode;
}
@Override
public SqlNode sqlNode()
{
return sqlNode;
}
@Override
public void validate() throws ValidationException
{
if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_SELECT)) {
throw new ValidationException(StringUtils.format(
"Cannot execute SELECT with SQL engine '%s'.",
handlerContext.engine().name())
);
}
super.validate();
}
@Override
protected RelDataType returnedRowType()
{
final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
return handlerContext.engine().resultTypeForSelect(
typeFactory,
rootQueryRel.validatedRowType
);
}
@Override
protected PlannerResult planForDruid() throws ValidationException
{
return planWithDruidConvention();
}
@Override
protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException
{
return handlerContext.engine().buildQueryMakerForSelect(
rootQueryRel,
handlerContext.plannerContext());
}
}
private static class EnumeratorIterator<T> implements Iterator<T>
{
private final Iterator<T> it;
EnumeratorIterator(Iterator<T> it)
{
this.it = it;
}
@Override
public boolean hasNext()
{
return it.hasNext();
}
@Override
public T next()
{
return it.next();
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.query.QueryContext;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.joda.time.DateTimeZone;
import java.util.Set;
/**
* Handler for a SQL statement. Follows the same lifecycle as the planner,
* however this class handles one specific kind of SQL statement.
*/
public interface SqlStatementHandler
{
SqlNode sqlNode();
void validate() throws ValidationException;
Set<ResourceAction> resourceActions();
void prepare();
PrepareResult prepareResult();
PlannerResult plan() throws ValidationException;
/**
* Context available to statement handlers.
*/
interface HandlerContext
{
PlannerContext plannerContext();
SqlEngine engine();
CalcitePlanner planner();
QueryContext queryContext();
SchemaPlus defaultSchema();
ObjectMapper jsonMapper();
DateTimeZone timeZone();
}
abstract class BaseStatementHandler implements SqlStatementHandler
{
protected final HandlerContext handlerContext;
protected Set<ResourceAction> resourceActions;
protected BaseStatementHandler(HandlerContext handlerContext)
{
this.handlerContext = handlerContext;
}
@Override
public Set<ResourceAction> resourceActions()
{
return resourceActions;
}
}
}

View File

@ -133,7 +133,7 @@ public class DruidQuery
@Nullable
private final Sorting sorting;
private final Query query;
private final Query<?> query;
private final RowSignature outputRowSignature;
private final RelDataType outputRowType;
private final VirtualColumnRegistry virtualColumnRegistry;
@ -795,7 +795,7 @@ public class DruidQuery
return outputRowSignature;
}
public Query getQuery()
public Query<?> getQuery()
{
return query;
}
@ -806,7 +806,7 @@ public class DruidQuery
*
* @return Druid query
*/
private Query computeQuery()
private Query<?> computeQuery()
{
if (dataSource instanceof QueryDataSource) {
// If there is a subquery, then we prefer the outer query to be a groupBy if possible, since this potentially

View File

@ -28,7 +28,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.Set;
public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
public abstract class DruidRel<T extends DruidRel<?>> extends AbstractRelNode
{
private final PlannerContext plannerContext;
@ -45,7 +45,7 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
@Nullable
public abstract PartialDruidQuery getPartialDruidQuery();
public QueryResponse runQuery()
public QueryResponse<Object[]> runQuery()
{
// runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
// is the outermost query, and it will actually get run as a native query. Druid's native query layer will

View File

@ -107,11 +107,11 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public QueryResponse runQuery()
public QueryResponse<Object[]> runQuery()
{
// Lazy: run each query in sequence, not all at once.
if (limit == 0) {
return new QueryResponse(Sequences.empty(), ResponseContext.createEmpty());
return new QueryResponse<Object[]>(Sequences.empty(), ResponseContext.createEmpty());
} else {
// We run the first rel here for two reasons:
@ -122,10 +122,10 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
// is also sub-optimal as it would consume parallel query resources and potentially starve the system.
// Instead, we only return the headers from the first query and potentially exception out and fail the query
// if there are any response headers that come from subsequent queries that are correctness concerns
final QueryResponse queryResponse = ((DruidRel) rels.get(0)).runQuery();
final QueryResponse<Object[]> queryResponse = ((DruidRel) rels.get(0)).runQuery();
final List<Sequence<Object>> firstAsList = Collections.singletonList(queryResponse.getResults());
final Iterable<Sequence<Object>> theRestTransformed = FluentIterable
final List<Sequence<Object[]>> firstAsList = Collections.singletonList(queryResponse.getResults());
final Iterable<Sequence<Object[]>> theRestTransformed = FluentIterable
.from(rels.subList(1, rels.size()))
.transform(
rel -> {
@ -144,10 +144,10 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
}
);
final Iterable<Sequence<Object>> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
final Iterable<Sequence<Object[]>> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
final Sequence returnSequence = Sequences.concat(recombinedSequences);
return new QueryResponse(
return new QueryResponse<Object[]>(
limit > 0 ? returnSequence.limit(limit) : returnSequence,
queryResponse.getResponseContext()
);

View File

@ -94,7 +94,7 @@ public class NativeQueryMaker implements QueryMaker
}
@Override
public QueryResponse runQuery(final DruidQuery druidQuery)
public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
final Query<?> query = druidQuery.getQuery();
@ -173,7 +173,8 @@ public class NativeQueryMaker implements QueryMaker
.orElseGet(query::getIntervals);
}
private <T> QueryResponse execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)
@SuppressWarnings("unchecked")
private <T> QueryResponse<Object[]> execute(Query<?> query, final List<String> newFields, final List<SqlTypeName> newTypes)
{
Hook.QUERY_PLAN.run(query);
@ -195,14 +196,19 @@ public class NativeQueryMaker implements QueryMaker
// otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do
// array-based results before starting the query; but in practice we don't expect this to happen since we keep
// tight control over which query types we generate in the SQL layer. They all support array-based results.)
final QueryResponse results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
final QueryResponse<T> results = queryLifecycle.runSimple((Query<T>) query, authenticationResult, authorizationResult);
return mapResultSequence(results, queryLifecycle.getToolChest(), query, newFields, newTypes);
return mapResultSequence(
results,
(QueryToolChest<T, Query<T>>) queryLifecycle.getToolChest(),
(Query<T>) query,
newFields,
newTypes
);
}
private <T> QueryResponse mapResultSequence(
final QueryResponse results,
private <T> QueryResponse<Object[]> mapResultSequence(
final QueryResponse<T> results,
final QueryToolChest<T, Query<T>> toolChest,
final Query<T> query,
final List<String> newFields,

View File

@ -33,5 +33,5 @@ public interface QueryMaker
* created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or
* {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement.
*/
QueryResponse runQuery(DruidQuery druidQuery);
QueryResponse<Object[]> runQuery(DruidQuery druidQuery);
}

View File

@ -128,7 +128,7 @@ public class SqlResource
try {
Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId));
ResultSet resultSet = stmt.plan();
final QueryResponse response = resultSet.run();
final QueryResponse<Object[]> response = resultSet.run();
final SqlRowTransformer rowTransformer = resultSet.createRowTransformer();
final Yielder<Object[]> finalYielder = Yielders.each(response.getResults());

View File

@ -43,7 +43,7 @@ import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.IngestHandler;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CalciteTests;
@ -204,7 +204,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "INSERT with target column list is not supported.")
.expectValidationError(SqlPlanningException.class, "INSERT with a target column list is not supported.")
.verify();
}
@ -226,7 +226,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO dst SELECT * FROM INFORMATION_SCHEMA.COLUMNS PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot query table [INFORMATION_SCHEMA.COLUMNS] with SQL engine 'ingestion-test'."
"Cannot query table INFORMATION_SCHEMA.COLUMNS with SQL engine 'ingestion-test'."
)
.verify();
}
@ -238,7 +238,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
"Cannot INSERT into INFORMATION_SCHEMA.COLUMNS because it is not a Druid datasource."
)
.verify();
}
@ -250,7 +250,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [view.aview] because it is not a Druid datasource (schema = druid)."
"Cannot INSERT into view.aview because it is not a Druid datasource."
)
.verify();
}
@ -280,7 +280,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
"Cannot INSERT into nonexistent.dst because it is not a Druid datasource."
)
.verify();
}
@ -435,7 +435,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
)
.expectValidationError(
SqlPlanningException.class,
"CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"
"CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"
)
.verify();
}
@ -517,7 +517,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
}
catch (SqlPlanningException e) {
Assert.assertEquals(
"Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
"Cannot have ORDER BY on an INSERT statement, use CLUSTERED BY instead.",
e.getMessage()
);
}
@ -561,7 +561,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
}
catch (SqlPlanningException e) {
Assert.assertEquals(
"Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
"Cannot have ORDER BY on an INSERT statement, use CLUSTERED BY instead.",
e.getMessage()
);
}
@ -796,7 +796,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO t SELECT dim1, dim2 || '-lol' FROM foo PARTITIONED BY ALL")
.expectValidationError(
SqlPlanningException.class,
DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
)
.verify();
}
@ -808,7 +808,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO t SELECT __time, dim1 AS EXPR$0 FROM foo PARTITIONED BY ALL")
.expectValidationError(
SqlPlanningException.class,
DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
)
.verify();
}
@ -822,7 +822,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "(SELECT __time, LOWER(dim1) FROM foo) PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
)
.verify();
}

View File

@ -255,7 +255,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead.")
.expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE statement, use CLUSTERED BY instead.")
.verify();
}
@ -390,7 +390,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst (foo, bar) OVERWRITE ALL SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "REPLACE with target column list is not supported.")
.expectValidationError(SqlPlanningException.class, "REPLACE with a target column list is not supported.")
.verify();
}
@ -408,7 +408,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo CLUSTERED BY dim1")
.expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause")
.expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause")
.verify();
}
@ -417,7 +417,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")
.expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")
.verify();
}
@ -426,7 +426,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")
.expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")
.verify();
}
@ -437,7 +437,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO INFORMATION_SCHEMA.COLUMNS OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot REPLACE into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
"Cannot REPLACE into INFORMATION_SCHEMA.COLUMNS because it is not a Druid datasource."
)
.verify();
}
@ -449,7 +449,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO view.aview OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot REPLACE into [view.aview] because it is not a Druid datasource (schema = druid)."
"Cannot REPLACE into view.aview because it is not a Druid datasource."
)
.verify();
}
@ -479,7 +479,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO nonexistent.dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot REPLACE into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
"Cannot REPLACE into nonexistent.dst because it is not a Druid datasource."
)
.verify();
}

View File

@ -45,7 +45,7 @@ public class TestInsertQueryMaker implements QueryMaker
}
@Override
public QueryResponse runQuery(final DruidQuery druidQuery)
public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
// Don't actually execute anything, but do record information that tests will check for.

View File

@ -2077,7 +2077,7 @@ public class SqlResourceTest extends CalciteTestBase
return new ResultSet(plannerResult)
{
@Override
public QueryResponse run()
public QueryResponse<Object[]> run()
{
final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
@ -2085,12 +2085,12 @@ public class SqlResourceTest extends CalciteTestBase
final NonnullPair<CountDownLatch, Boolean> executeLatch = executeLatchSupplier.get();
if (executeLatch != null) {
if (executeLatch.rhs) {
final QueryResponse resp = super.run();
final QueryResponse<Object[]> resp = super.run();
Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
executeLatch.lhs.countDown();
final ResponseContext respContext = resp.getResponseContext();
respContext.merge(responseContextSupplier.get());
return new QueryResponse(sequence, respContext);
return new QueryResponse<>(sequence, respContext);
} else {
try {
if (!executeLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
@ -2103,11 +2103,11 @@ public class SqlResourceTest extends CalciteTestBase
}
}
final QueryResponse resp = super.run();
final QueryResponse<Object[]> resp = super.run();
Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
final ResponseContext respContext = resp.getResponseContext();
respContext.merge(responseContextSupplier.get());
return new QueryResponse(sequence, respContext);
return new QueryResponse<>(sequence, respContext);
}
};
}