SQL: Morph QueryMakerFactory into SqlEngine. (#12897)

* SQL: Morph QueryMakerFactory into SqlEngine.

Groundwork for introducing an indexing-service-task-based SQL engine
under the umbrella of #12262. Also includes some other changes related
to improving error behavior.

Main changes:

1) Elevate the QueryMakerFactory interface (an extension point that allows
   customization of how queries are made) into SqlEngine. SQL engines
   can influence planner behavior through EngineFeatures, and can fully
   control the mechanics of query execution using QueryMakers.

2) Remove the server-wide QueryMakerFactory choice, in favor of the choice
   being made by the SQL entrypoint. The indexing-service-task-based
   SQL engine would be associated with its own entrypoint, like
   /druid/v2/sql/task.

Other changes:

1) Adjust DruidPlanner to try either DRUID or BINDABLE convention based
   on analysis of the planned rels; never try both. In particular, we
   no longer try BINDABLE when DRUID fails. This simplifies the logic
   and improves error messages.

2) Adjust error message "Cannot build plan for query" to omit the SQL
   query text. Useful because the text can be quite long, which makes it
   easy to miss the text about the problem.

3) Add a feature to block context parameters used internally by the SQL
   planner from being supplied by end users.

4) Add a feature to enable adding row signature to the context for
   Scan queries. This is useful in building the task-based engine.

5) Add saffron.properties file that turns off sets and graphviz dumps
   in "cannot plan" errors. Significantly reduces log spam on the Broker.

* Fixes from CI.

* Changes from review.

* Can vectorize, now that join-to-filter is on by default.

* Checkstyle! And variable renames!

* Remove throws from test.
This commit is contained in:
Gian Merlino 2022-08-14 23:31:19 -07:00 committed by GitHub
parent 846345669d
commit 6c5a43106a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 1648 additions and 683 deletions

View File

@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctSqlAggregator;
@ -53,6 +54,7 @@ import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@ -405,6 +407,7 @@ public class SqlBenchmark
@Param({STORAGE_MMAP, STORAGE_FRAME_ROW, STORAGE_FRAME_COLUMNAR})
private String storageType;
private SqlEngine engine;
@Nullable
private PlannerFactory plannerFactory;
private final Closer closer = Closer.create();
@ -436,9 +439,9 @@ public class SqlBenchmark
final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
@ -513,7 +516,7 @@ public class SqlBenchmark
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
final String sql = QUERIES.get(Integer.parseInt(query));
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(context, sql)) {
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run();
final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
@ -531,7 +534,7 @@ public class SqlBenchmark
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
final String sql = QUERIES.get(Integer.parseInt(query));
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(context, sql)) {
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
final PlannerResult plannerResult = planner.plan();
blackhole.consume(plannerResult);
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
@ -43,6 +44,7 @@ import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@ -274,6 +276,7 @@ public class SqlExpressionBenchmark
})
private String query;
private SqlEngine engine;
@Nullable
private PlannerFactory plannerFactory;
private Closer closer = Closer.create();
@ -310,9 +313,9 @@ public class SqlExpressionBenchmark
final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
@ -349,7 +352,7 @@ public class SqlExpressionBenchmark
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
final String sql = QUERIES.get(Integer.parseInt(query));
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(context, sql)) {
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run();
final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.expression.TestExprMacroTable;
@ -49,6 +50,7 @@ import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@ -216,6 +218,7 @@ public class SqlNestedDataBenchmark
})
private String query;
private SqlEngine engine;
@Nullable
private PlannerFactory plannerFactory;
private Closer closer = Closer.create();
@ -234,7 +237,6 @@ public class SqlNestedDataBenchmark
.build();
final PlannerConfig plannerConfig = new PlannerConfig();
final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
@ -277,9 +279,9 @@ public class SqlNestedDataBenchmark
final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
@ -316,7 +318,7 @@ public class SqlNestedDataBenchmark
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
final String sql = QUERIES.get(Integer.parseInt(query));
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(context, sql)) {
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run();
final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -44,6 +45,7 @@ import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@ -85,6 +87,7 @@ public class SqlVsNativeBenchmark
}
private SpecificSegmentsQuerySegmentWalker walker;
private SqlEngine engine;
private PlannerFactory plannerFactory;
private GroupByQuery groupByQuery;
private String sqlQuery;
@ -115,9 +118,9 @@ public class SqlVsNativeBenchmark
this.walker = closer.register(new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index));
final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
@ -164,7 +167,7 @@ public class SqlVsNativeBenchmark
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryPlanner(Blackhole blackhole) throws Exception
{
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(null, sqlQuery)) {
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sqlQuery, new QueryContext())) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run();
final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);

View File

@ -104,6 +104,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
public Sequence<Object[]> execute()
{
try (DruidPlanner planner = sqlToolbox.plannerFactory.createPlanner(
sqlToolbox.engine,
queryPlus.sql(),
queryPlus.context())) {
validate(planner);

View File

@ -25,7 +25,6 @@ import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.http.SqlQuery;
import javax.servlet.http.HttpServletRequest;
import java.util.Set;
import java.util.function.Function;

View File

@ -20,7 +20,6 @@
package org.apache.druid.sql;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PrepareResult;
@ -67,20 +66,16 @@ public class PreparedStatement extends AbstractStatement
public PrepareResult prepare()
{
try (DruidPlanner planner = sqlToolbox.plannerFactory.createPlanner(
sqlToolbox.engine,
queryPlus.sql(),
queryPlus.context())) {
validate(planner);
authorize(planner, authorizer());
// Do the prepare step.
try {
this.prepareResult = planner.prepare();
return prepareResult;
}
catch (ValidationException e) {
throw new SqlPlanningException(e);
}
}
catch (RuntimeException e) {
reporter.failed(e);
throw e;

View File

@ -19,61 +19,15 @@
package org.apache.druid.sql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.http.SqlQuery;
import javax.servlet.http.HttpServletRequest;
@LazySingleton
public class SqlStatementFactory
public interface SqlStatementFactory
{
protected final SqlToolbox lifecycleToolbox;
HttpStatement httpStatement(SqlQuery sqlQuery, HttpServletRequest req);
@Inject
public SqlStatementFactory(
final PlannerFactory plannerFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final QueryScheduler queryScheduler,
final AuthConfig authConfig,
final Supplier<DefaultQueryConfig> defaultQueryConfig,
final SqlLifecycleManager sqlLifecycleManager
)
{
this.lifecycleToolbox = new SqlToolbox(
plannerFactory,
emitter,
requestLogger,
queryScheduler,
authConfig,
defaultQueryConfig.get(),
sqlLifecycleManager
);
}
DirectStatement directStatement(SqlQueryPlus sqlRequest);
public HttpStatement httpStatement(
final SqlQuery sqlQuery,
final HttpServletRequest req
)
{
return new HttpStatement(lifecycleToolbox, sqlQuery, req);
}
public DirectStatement directStatement(final SqlQueryPlus sqlRequest)
{
return new DirectStatement(lifecycleToolbox, sqlRequest);
}
public PreparedStatement preparedStatement(final SqlQueryPlus sqlRequest)
{
return new PreparedStatement(lifecycleToolbox, sqlRequest);
}
PreparedStatement preparedStatement(SqlQueryPlus sqlRequest);
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.http.SqlQuery;
import javax.servlet.http.HttpServletRequest;
/**
* Factory factories: when design patterns go too far.
*
* Almost everything we need to create a {@link SqlStatementFactory} is injectable, except for the {@link SqlEngine}.
* So this class exists to produce {@link SqlStatementFactory} once the engine for a query is known.
*/
@LazySingleton
public class SqlStatementFactoryFactory
{
protected final SqlToolbox lifecycleToolbox;
@Inject
public SqlStatementFactoryFactory(
final PlannerFactory plannerFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final QueryScheduler queryScheduler,
final AuthConfig authConfig,
final Supplier<DefaultQueryConfig> defaultQueryConfig,
final SqlLifecycleManager sqlLifecycleManager
)
{
this.lifecycleToolbox = new SqlToolbox(
null,
plannerFactory,
emitter,
requestLogger,
queryScheduler,
authConfig,
defaultQueryConfig.get(),
sqlLifecycleManager
);
}
public SqlStatementFactory factorize(final SqlEngine engine)
{
return new FactoryImpl(lifecycleToolbox.withEngine(engine));
}
private static class FactoryImpl implements SqlStatementFactory
{
private final SqlToolbox lifecycleToolbox;
public FactoryImpl(SqlToolbox lifecycleToolbox)
{
this.lifecycleToolbox = lifecycleToolbox;
}
@Override
public HttpStatement httpStatement(
final SqlQuery sqlQuery,
final HttpServletRequest req
)
{
return new HttpStatement(lifecycleToolbox, sqlQuery, req);
}
@Override
public DirectStatement directStatement(final SqlQueryPlus sqlRequest)
{
return new DirectStatement(lifecycleToolbox, sqlRequest);
}
@Override
public PreparedStatement preparedStatement(final SqlQueryPlus sqlRequest)
{
return new PreparedStatement(lifecycleToolbox, sqlRequest);
}
}
}

View File

@ -26,12 +26,14 @@ import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.SqlEngine;
/**
* Provides the plan and execution resources to process SQL queries.
*/
public class SqlToolbox
{
final SqlEngine engine;
final PlannerFactory plannerFactory;
final ServiceEmitter emitter;
final RequestLogger requestLogger;
@ -41,6 +43,7 @@ public class SqlToolbox
final SqlLifecycleManager sqlLifecycleManager;
public SqlToolbox(
final SqlEngine engine,
final PlannerFactory plannerFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
@ -50,6 +53,7 @@ public class SqlToolbox
final SqlLifecycleManager sqlLifecycleManager
)
{
this.engine = engine;
this.plannerFactory = plannerFactory;
this.emitter = emitter;
this.requestLogger = requestLogger;
@ -58,4 +62,18 @@ public class SqlToolbox
this.defaultQueryConfig = defaultQueryConfig;
this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, "sqlLifecycleManager");
}
public SqlToolbox withEngine(final SqlEngine engine)
{
return new SqlToolbox(
engine,
plannerFactory,
emitter,
requestLogger,
queryScheduler,
authConfig,
defaultQueryConfig,
sqlLifecycleManager
);
}
}

View File

@ -77,7 +77,7 @@ public abstract class AbstractDruidJdbcStatement implements Closeable
params.add(createParameter(field, type));
}
return Meta.Signature.create(
createColumnMetaData(prepareResult.getRowType()),
createColumnMetaData(prepareResult.getReturnedRowType()),
sql,
params,
Meta.CursorFactory.ARRAY,

View File

@ -78,7 +78,7 @@ public class DruidConnection
return connectionId;
}
public DruidJdbcStatement createStatement(SqlStatementFactory sqlLifecycleFactory)
public DruidJdbcStatement createStatement(SqlStatementFactory sqlStatementFactory)
{
final int statementId = statementCounter.incrementAndGet();
@ -98,7 +98,7 @@ public class DruidConnection
connectionId,
statementId,
context,
sqlLifecycleFactory
sqlStatementFactory
);
statements.put(statementId, statement);
@ -108,7 +108,7 @@ public class DruidConnection
}
public DruidJdbcPreparedStatement createPreparedStatement(
SqlStatementFactory sqlLifecycleFactory,
SqlStatementFactory sqlStatementFactory,
SqlQueryPlus sqlRequest,
final long maxRowCount)
{
@ -126,7 +126,7 @@ public class DruidConnection
}
@SuppressWarnings("GuardedBy")
final PreparedStatement statement = sqlLifecycleFactory.preparedStatement(
final PreparedStatement statement = sqlStatementFactory.preparedStatement(
sqlRequest.withContext(context)
);
final DruidJdbcPreparedStatement jdbcStmt = new DruidJdbcPreparedStatement(

View File

@ -21,7 +21,6 @@ package org.apache.druid.sql.avatica;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@ -38,9 +37,9 @@ import org.apache.calcite.avatica.QueryState;
import org.apache.calcite.avatica.remote.AvaticaRuntimeException;
import org.apache.calcite.avatica.remote.Service.ErrorResponse;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryContext;
@ -50,13 +49,14 @@ import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@ -70,6 +70,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@LazySingleton
public class DruidMeta extends MetaImpl
{
/**
@ -105,7 +106,7 @@ public class DruidMeta extends MetaImpl
"user", "password"
);
private final SqlStatementFactory sqlLifecycleFactory;
private final SqlStatementFactory sqlStatementFactory;
private final ScheduledExecutorService exec;
private final AvaticaServerConfig config;
private final List<Authenticator> authenticators;
@ -124,25 +125,41 @@ public class DruidMeta extends MetaImpl
@Inject
public DruidMeta(
final SqlStatementFactory sqlLifecycleFactory,
final NativeSqlEngine engine,
final SqlStatementFactoryFactory sqlStatementFactoryFactory,
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final Injector injector
)
{
super(null);
this.sqlLifecycleFactory = Preconditions.checkNotNull(sqlLifecycleFactory, "sqlLifecycleFactory");
this.config = config;
this.errorHandler = errorHandler;
this.exec = Executors.newSingleThreadScheduledExecutor(
this(
sqlStatementFactoryFactory.factorize(engine),
config,
errorHandler,
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat(StringUtils.format("DruidMeta@%s-ScheduledExecutor", Integer.toHexString(hashCode())))
.setNameFormat("DruidMeta-ScheduledExecutor-%d")
.setDaemon(true)
.build()
),
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
);
}
final AuthenticatorMapper authenticatorMapper = injector.getInstance(AuthenticatorMapper.class);
this.authenticators = authenticatorMapper.getAuthenticatorChain();
public DruidMeta(
final SqlStatementFactory sqlStatementFactory,
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final ScheduledExecutorService exec,
final List<Authenticator> authenticators
)
{
super(null);
this.sqlStatementFactory = sqlStatementFactory;
this.config = config;
this.errorHandler = errorHandler;
this.exec = exec;
this.authenticators = authenticators;
}
@Override
@ -219,7 +236,7 @@ public class DruidMeta extends MetaImpl
public StatementHandle createStatement(final ConnectionHandle ch)
{
try {
final DruidJdbcStatement druidStatement = getDruidConnection(ch.id).createStatement(sqlLifecycleFactory);
final DruidJdbcStatement druidStatement = getDruidConnection(ch.id).createStatement(sqlStatementFactory);
return new StatementHandle(ch.id, druidStatement.getStatementId(), null);
}
catch (NoSuchConnectionException e) {
@ -251,7 +268,7 @@ public class DruidMeta extends MetaImpl
doAuthenticate(druidConnection)
);
DruidJdbcPreparedStatement stmt = getDruidConnection(ch.id).createPreparedStatement(
sqlLifecycleFactory,
sqlStatementFactory,
sqlReq,
maxRowCount);
stmt.prepare();

View File

@ -23,7 +23,7 @@ import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQueryRel;
import org.apache.druid.sql.calcite.run.QueryFeature;
import org.apache.druid.sql.calcite.run.EngineFeature;
/**
* Rule that converts an {@link ExternalTableScan} to a call to {@link DruidQueryRel#scanExternal}.
@ -43,10 +43,15 @@ public class ExternalTableScanRule extends RelOptRule
@Override
public boolean matches(RelOptRuleCall call)
{
if (plannerContext.getQueryMaker().feature(QueryFeature.CAN_READ_EXTERNAL_DATA)) {
if (plannerContext.engineHasFeature(EngineFeature.READ_EXTERNAL_DATA)) {
return super.matches(call);
} else {
plannerContext.setPlanningError("SQL query requires scanning external datasources that is not suported.");
plannerContext.setPlanningError(
"Cannot use '%s' with SQL engine '%s'.",
ExternalOperatorConversion.FUNCTION_NAME,
plannerContext.getEngine().name()
);
return false;
}
}
@ -54,6 +59,11 @@ public class ExternalTableScanRule extends RelOptRule
@Override
public void onMatch(final RelOptRuleCall call)
{
if (!plannerContext.engineHasFeature(EngineFeature.READ_EXTERNAL_DATA)) {
// Not called because "matches" returns false.
throw new UnsupportedOperationException();
}
final ExternalTableScan scan = call.rel(0);
call.transformTo(DruidQueryRel.scanExternal(scan, plannerContext));
}

View File

@ -20,40 +20,21 @@
package org.apache.druid.sql.calcite.planner;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
import org.apache.druid.sql.calcite.run.NativeQueryMakerFactory;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
/**
* The module responsible for provide bindings for the Calcite Planner.
*/
public class CalcitePlannerModule implements Module
{
public static final String PROPERTY_SQL_EXECUTOR_TYPE = "druid.sql.executor.type";
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class);
PolyBind.optionBinder(binder, Key.get(QueryMakerFactory.class))
.addBinding(NativeQueryMakerFactory.TYPE)
.to(NativeQueryMakerFactory.class)
.in(LazySingleton.class);
PolyBind.createChoiceWithDefault(
binder,
PROPERTY_SQL_EXECUTOR_TYPE,
Key.get(QueryMakerFactory.class),
NativeQueryMakerFactory.TYPE
);
binder.bind(PlannerFactory.class).in(LazySingleton.class);
binder.bind(DruidOperatorTable.class).in(LazySingleton.class);
Multibinder.newSetBinder(binder, ExtensionCalciteRuleProvider.class);

View File

@ -58,7 +58,6 @@ import org.joda.time.format.DateTimeFormatterBuilder;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.Date;

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@ -37,15 +38,19 @@ import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelVisitor;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
@ -81,13 +86,14 @@ import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.DruidRel;
import org.apache.druid.sql.calcite.rel.DruidUnionRel;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.utils.Throwables;
import org.joda.time.DateTimeZone;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashSet;
@ -127,7 +133,7 @@ public class DruidPlanner implements Closeable
private final FrameworkConfig frameworkConfig;
private final CalcitePlanner planner;
private final PlannerContext plannerContext;
private final QueryMakerFactory queryMakerFactory;
private final SqlEngine engine;
private State state = State.START;
private ParsedNodes parsed;
private SqlNode validatedQueryNode;
@ -140,13 +146,13 @@ public class DruidPlanner implements Closeable
DruidPlanner(
final FrameworkConfig frameworkConfig,
final PlannerContext plannerContext,
final QueryMakerFactory queryMakerFactory
final SqlEngine engine
)
{
this.frameworkConfig = frameworkConfig;
this.planner = new CalcitePlanner(frameworkConfig);
this.plannerContext = plannerContext;
this.queryMakerFactory = queryMakerFactory;
this.engine = engine;
}
/**
@ -158,9 +164,22 @@ public class DruidPlanner implements Closeable
public void validate() throws SqlParseException, ValidationException
{
Preconditions.checkState(state == State.START);
// Validate query context.
engine.validateContext(plannerContext.getQueryContext());
// Parse the query string.
SqlNode root = planner.parse(plannerContext.getSql());
parsed = ParsedNodes.create(root, plannerContext.getTimeZone());
if (parsed.isSelect() && !plannerContext.engineHasFeature(EngineFeature.CAN_SELECT)) {
throw new ValidationException(StringUtils.format("Cannot execute SELECT with SQL engine '%s'.", engine.name()));
} else if (parsed.isInsert() && !plannerContext.engineHasFeature(EngineFeature.CAN_INSERT)) {
throw new ValidationException(StringUtils.format("Cannot execute INSERT with SQL engine '%s'.", engine.name()));
} else if (parsed.isReplace() && !plannerContext.engineHasFeature(EngineFeature.CAN_REPLACE)) {
throw new ValidationException(StringUtils.format("Cannot execute REPLACE with SQL engine '%s'.", engine.name()));
}
try {
if (parsed.getIngestionGranularity() != null) {
plannerContext.getQueryContext().addSystemParam(
@ -205,11 +224,17 @@ public class DruidPlanner implements Closeable
resourceActions = new HashSet<>(resourceCollectorShuttle.getResourceActions());
if (parsed.getInsertOrReplace() != null) {
if (parsed.isInsert() || parsed.isReplace()) {
// Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes
// the number of rows inserted to be limited which is likely to be confusing and unintended.
if (plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) {
throw new ValidationException(PlannerContext.CTX_SQL_OUTER_LIMIT + " cannot be provided on INSERT or REPLACE queries.");
throw new ValidationException(
StringUtils.format(
"%s cannot be provided with %s.",
PlannerContext.CTX_SQL_OUTER_LIMIT,
parsed.getInsertOrReplace().getOperator().getName()
)
);
}
final String targetDataSource = validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
resourceActions.add(new ResourceAction(new Resource(targetDataSource, ResourceType.DATASOURCE), Action.WRITE));
@ -232,17 +257,17 @@ public class DruidPlanner implements Closeable
* {@link org.apache.druid.sql.calcite.view.DruidViewMacro} prepares
* a view while having no information about the user of that view.
*/
public PrepareResult prepare() throws ValidationException
public PrepareResult prepare()
{
Preconditions.checkState(state == State.VALIDATED);
rootQueryRel = planner.rel(validatedQueryNode);
doPrepare(null);
doPrepare();
state = State.PREPARED;
return prepareResult;
}
private void doPrepare(@Nullable QueryMaker queryMaker) throws ValidationException
private void doPrepare()
{
final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
final SqlValidator validator = planner.getValidator();
@ -251,14 +276,14 @@ public class DruidPlanner implements Closeable
if (parsed.getExplainNode() != null) {
returnedRowType = getExplainStructType(typeFactory);
} else if (parsed.isSelect()) {
returnedRowType = engine.resultTypeForSelect(typeFactory, rootQueryRel.validatedRowType);
} else {
if (queryMaker == null) {
queryMaker = buildQueryMaker(rootQueryRel, parsed.getInsertOrReplace());
}
returnedRowType = queryMaker.getResultType();
assert parsed.insertOrReplace != null;
returnedRowType = engine.resultTypeForInsert(typeFactory, rootQueryRel.validatedRowType);
}
prepareResult = new PrepareResult(returnedRowType, parameterTypes);
prepareResult = new PrepareResult(rootQueryRel.validatedRowType, returnedRowType, parameterTypes);
}
/**
@ -266,6 +291,7 @@ public class DruidPlanner implements Closeable
* step within the planner's state machine.
*
* @param authorizer a function from resource actions to a {@link Access} result.
*
* @return the return value from the authorizer
*/
public Access authorize(Function<Set<ResourceAction>, Access> authorizer, boolean authorizeContextParams)
@ -318,31 +344,47 @@ public class DruidPlanner implements Closeable
rootQueryRel = planner.rel(validatedQueryNode);
}
final Set<RelOptTable> bindableTables = getBindableTables(rootQueryRel.rel);
// the planner's type factory is not available until after parsing
this.rexBuilder = new RexBuilder(planner.getTypeFactory());
state = State.PLANNED;
try {
if (!bindableTables.isEmpty()) {
// Consider BINDABLE convention when necessary. Used for metadata tables.
if (parsed.isInsert() || parsed.isReplace()) {
// Throws ValidationException if the target table is itself bindable.
validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
}
if (!plannerContext.engineHasFeature(EngineFeature.ALLOW_BINDABLE_PLAN)) {
throw new ValidationException(
StringUtils.format(
"Cannot query table%s [%s] with SQL engine '%s'.",
bindableTables.size() != 1 ? "s" : "",
bindableTables.stream()
.map(table -> Joiner.on(".").join(table.getQualifiedName()))
.collect(Collectors.joining(", ")),
engine.name()
)
);
}
return planWithBindableConvention(rootQueryRel, parsed.getExplainNode());
} else {
// DRUID convention is used whenever there are no tables that require BINDABLE.
return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), parsed.getInsertOrReplace());
}
}
catch (Exception e) {
Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class);
if (null == cannotPlanException) {
// Not a CannotPlanException, rethrow without trying with bindable
// Not a CannotPlanException, rethrow without logging.
throw e;
}
// If there isn't any ingestion clause, then we should try again with BINDABLE convention. And return without
// any error, if it is plannable by the bindable convention
if (parsed.getInsertOrReplace() == null) {
// Try again with BINDABLE convention. Used for querying Values and metadata tables.
try {
return planWithBindableConvention(rootQueryRel, parsed.getExplainNode());
}
catch (Exception e2) {
e.addSuppressed(e2);
}
}
Logger logger = log;
if (!plannerContext.getQueryContext().isDebug()) {
logger = log.noStackTrace();
@ -382,7 +424,7 @@ public class DruidPlanner implements Closeable
final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot, insertOrReplace);
plannerContext.setQueryMaker(queryMaker);
if (prepareResult == null) {
doPrepare(queryMaker);
doPrepare();
}
// Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle}
@ -408,8 +450,19 @@ public class DruidPlanner implements Closeable
if (explain != null) {
return planExplanation(druidRel, explain, true);
} else {
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
// Compute row type.
final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
final RelDataType rowType;
if (parsed.isSelect()) {
rowType = engine.resultTypeForSelect(typeFactory, rootQueryRel.validatedRowType);
} else {
assert parsed.insertOrReplace != null;
rowType = engine.resultTypeForInsert(typeFactory, rootQueryRel.validatedRowType);
}
// Start the query.
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
// sanity check
final Set<ResourceAction> readResourceActions =
plannerContext.getResourceActions()
@ -428,7 +481,7 @@ public class DruidPlanner implements Closeable
return druidRel.runQuery();
};
return new PlannerResult(resultsSupplier, queryMaker.getResultType());
return new PlannerResult(resultsSupplier, rowType);
}
}
@ -449,6 +502,10 @@ public class DruidPlanner implements Closeable
@Nullable final SqlExplain explain
)
{
if (prepareResult == null) {
doPrepare();
}
BindableRel bindableRel = (BindableRel) planner.transform(
CalciteRulesManager.BINDABLE_CONVENTION_RULES,
planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(root.collation),
@ -641,6 +698,7 @@ public class DruidPlanner implements Closeable
* the web console, allowing it to apply a limit to queries without rewriting the original SQL.
*
* @param root root node
*
* @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
*/
@Nullable
@ -691,9 +749,9 @@ public class DruidPlanner implements Closeable
if (insertOrReplace != null) {
final String targetDataSource = validateAndGetDataSourceForIngest(insertOrReplace);
validateColumnsForIngestion(rootQueryRel);
return queryMakerFactory.buildForInsert(targetDataSource, rootQueryRel, plannerContext);
return engine.buildQueryMakerForInsert(targetDataSource, rootQueryRel, plannerContext);
} else {
return queryMakerFactory.buildForSelect(rootQueryRel, plannerContext);
return engine.buildQueryMakerForSelect(rootQueryRel, plannerContext);
}
}
@ -741,7 +799,12 @@ public class DruidPlanner implements Closeable
dataSource = tableIdentifier.names.get(1);
} else {
throw new ValidationException(
StringUtils.format("Cannot %s into [%s] because it is not a Druid datasource.", operatorName, tableIdentifier)
StringUtils.format(
"Cannot %s into [%s] because it is not a Druid datasource (schema = %s).",
operatorName,
tableIdentifier,
defaultSchemaName
)
);
}
}
@ -773,13 +836,39 @@ public class DruidPlanner implements Closeable
errorMessage = exception.getMessage();
}
if (null == errorMessage) {
errorMessage = "Please check broker logs for more details";
errorMessage = "Please check Broker logs for more details.";
} else {
// Re-phrase since planning errors are more like hints
errorMessage = "Possible error: " + errorMessage;
}
// Finally, add the query itself to error message that user will get.
return StringUtils.format("Cannot build plan for query: %s. %s", plannerContext.getSql(), errorMessage);
return StringUtils.format("Cannot build plan for query. %s", errorMessage);
}
private static Set<RelOptTable> getBindableTables(final RelNode relNode)
{
class HasBindableVisitor extends RelVisitor
{
private final Set<RelOptTable> found = new HashSet<>();
@Override
public void visit(RelNode node, int ordinal, RelNode parent)
{
if (node instanceof TableScan) {
RelOptTable table = node.getTable();
if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) {
found.add(table);
return;
}
}
super.visit(node, ordinal, parent);
}
}
final HasBindableVisitor visitor = new HasBindableVisitor();
visitor.go(relNode);
return visitor.found;
}
private static class EnumeratorIterator<T> implements Iterator<T>
@ -924,6 +1013,21 @@ public class DruidPlanner implements Closeable
return explain;
}
public boolean isSelect()
{
return insertOrReplace == null;
}
public boolean isInsert()
{
return insertOrReplace != null && !isReplace();
}
public boolean isReplace()
{
return insertOrReplace instanceof DruidSqlReplace;
}
@Nullable
public SqlInsert getInsertOrReplace()
{

View File

@ -39,7 +39,9 @@ import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -79,6 +81,7 @@ public class PlannerContext
private final PlannerConfig plannerConfig;
private final DateTime localNow;
private final DruidSchemaCatalog rootSchema;
private final SqlEngine engine;
private final QueryContext queryContext;
private final String sqlQueryId;
private final boolean stringifyArrays;
@ -106,6 +109,7 @@ public class PlannerContext
final DateTime localNow,
final boolean stringifyArrays,
final DruidSchemaCatalog rootSchema,
final SqlEngine engine,
final QueryContext queryContext
)
{
@ -115,6 +119,7 @@ public class PlannerContext
this.jsonMapper = jsonMapper;
this.plannerConfig = Preconditions.checkNotNull(plannerConfig, "plannerConfig");
this.rootSchema = rootSchema;
this.engine = engine;
this.queryContext = queryContext;
this.localNow = Preconditions.checkNotNull(localNow, "localNow");
this.stringifyArrays = stringifyArrays;
@ -134,6 +139,7 @@ public class PlannerContext
final ObjectMapper jsonMapper,
final PlannerConfig plannerConfig,
final DruidSchemaCatalog rootSchema,
final SqlEngine engine,
final QueryContext queryContext
)
{
@ -172,6 +178,7 @@ public class PlannerContext
utcNow.withZone(timeZone),
stringifyArrays,
rootSchema,
engine,
queryContext
);
}
@ -383,6 +390,16 @@ public class PlannerContext
this.queryMaker = Preconditions.checkNotNull(queryMaker, "queryMaker");
}
public SqlEngine getEngine()
{
return engine;
}
public boolean engineHasFeature(final EngineFeature feature)
{
return engine.feature(feature, this);
}
public QueryMaker getQueryMaker()
{
return Preconditions.checkNotNull(queryMaker, "QueryMaker not available");

View File

@ -45,11 +45,10 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.parser.DruidSqlParserImplFactory;
import org.apache.druid.sql.calcite.planner.convertlet.DruidConvertletTable;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import java.util.Map;
import java.util.Properties;
public class PlannerFactory
@ -65,7 +64,6 @@ public class PlannerFactory
.build();
private final DruidSchemaCatalog rootSchema;
private final QueryMakerFactory queryMakerFactory;
private final DruidOperatorTable operatorTable;
private final ExprMacroTable macroTable;
private final PlannerConfig plannerConfig;
@ -77,7 +75,6 @@ public class PlannerFactory
@Inject
public PlannerFactory(
final DruidSchemaCatalog rootSchema,
final QueryMakerFactory queryMakerFactory,
final DruidOperatorTable operatorTable,
final ExprMacroTable macroTable,
final PlannerConfig plannerConfig,
@ -88,7 +85,6 @@ public class PlannerFactory
)
{
this.rootSchema = rootSchema;
this.queryMakerFactory = queryMakerFactory;
this.operatorTable = operatorTable;
this.macroTable = macroTable;
this.plannerConfig = plannerConfig;
@ -101,7 +97,7 @@ public class PlannerFactory
/**
* Create a Druid query planner from an initial query context
*/
public DruidPlanner createPlanner(final String sql, final QueryContext queryContext)
public DruidPlanner createPlanner(final SqlEngine engine, final String sql, final QueryContext queryContext)
{
final PlannerContext context = PlannerContext.create(
sql,
@ -110,10 +106,11 @@ public class PlannerFactory
jsonMapper,
plannerConfig,
rootSchema,
engine,
queryContext
);
return new DruidPlanner(buildFrameworkConfig(context), context, queryMakerFactory);
return new DruidPlanner(buildFrameworkConfig(context), context, engine);
}
/**
@ -121,9 +118,9 @@ public class PlannerFactory
* and ready to go authorization result.
*/
@VisibleForTesting
public DruidPlanner createPlannerForTesting(final Map<String, Object> queryContext, String query)
public DruidPlanner createPlannerForTesting(final SqlEngine engine, final String sql, final QueryContext queryContext)
{
final DruidPlanner thePlanner = createPlanner(query, new QueryContext(queryContext));
final DruidPlanner thePlanner = createPlanner(engine, sql, queryContext);
thePlanner.getPlannerContext()
.setAuthenticationResult(NoopEscalator.getInstance().createEscalatedAuthenticationResult());
try {

View File

@ -56,6 +56,9 @@ public class PlannerResult
return resultsSupplier.get();
}
/**
* Row type returned to the end user. Equivalent to {@link PrepareResult#getReturnedRowType()}.
*/
public RelDataType rowType()
{
return rowType;

View File

@ -27,20 +27,46 @@ import org.apache.calcite.rel.type.RelDataType;
*/
public class PrepareResult
{
private final RelDataType rowType;
private final RelDataType validatedRowType;
private final RelDataType returnedRowType;
private final RelDataType parameterRowType;
public PrepareResult(final RelDataType rowType, final RelDataType parameterRowType)
public PrepareResult(
final RelDataType validatedRowType,
final RelDataType returnedRowType,
final RelDataType parameterRowType
)
{
this.rowType = rowType;
this.validatedRowType = validatedRowType;
this.returnedRowType = returnedRowType;
this.parameterRowType = parameterRowType;
}
public RelDataType getRowType()
/**
* Row type from {@link org.apache.calcite.rel.RelRoot#validatedRowType} prepared by {@link DruidPlanner#prepare()}.
* Corresponds to the SELECT portion of a SQL statement. For SELECT, this is the row type of the SELECT itself.
* For EXPLAIN PLAN FOR SELECT, INSERT ... SELECT, or REPLACE ... SELECT, this is the row type of the
* embedded SELECT.
*/
public RelDataType getValidatedRowType()
{
return rowType;
return validatedRowType;
}
/**
* Row type for the result that the end user will receive. Different from {@link #getValidatedRowType()} in
* cases like EXPLAIN (where the user gets an explanation, not the query results) and other non-SELECT
* statements.
*/
public RelDataType getReturnedRowType()
{
return returnedRowType;
}
/**
* Row type from {@link org.apache.calcite.sql.validate.SqlValidator#getParameterRowType} containing the
* name and type of each parameter.
*/
public RelDataType getParameterRowType()
{
return parameterRowType;

View File

@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.rel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -48,6 +49,7 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
@ -72,6 +74,7 @@ import org.apache.druid.query.topn.TopNMetricSpec;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -86,8 +89,7 @@ import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.OffsetLimit;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rule.GroupByRules;
import org.apache.druid.sql.calcite.run.QueryFeature;
import org.apache.druid.sql.calcite.run.QueryFeatureInspector;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.RowSignatures;
import javax.annotation.Nonnull;
@ -111,6 +113,11 @@ import java.util.stream.Collectors;
*/
public class DruidQuery
{
/**
* Native query context key that is set when {@link EngineFeature#SCAN_NEEDS_SIGNATURE}.
*/
public static final String CTX_SCAN_SIGNATURE = "scanSignature";
private final DataSource dataSource;
private final PlannerContext plannerContext;
@ -130,6 +137,7 @@ public class DruidQuery
private final RowSignature outputRowSignature;
private final RelDataType outputRowType;
private final VirtualColumnRegistry virtualColumnRegistry;
private final RowSignature sourceRowSignature;
private DruidQuery(
final DataSource dataSource,
@ -149,10 +157,12 @@ public class DruidQuery
this.selectProjection = selectProjection;
this.grouping = grouping;
this.sorting = sorting;
this.sourceRowSignature = sourceRowSignature;
this.outputRowSignature = computeOutputRowSignature(sourceRowSignature, selectProjection, grouping, sorting);
this.outputRowType = Preconditions.checkNotNull(outputRowType, "outputRowType");
this.virtualColumnRegistry = Preconditions.checkNotNull(virtualColumnRegistry, "virtualColumnRegistry");
this.query = computeQuery(plannerContext.getQueryMaker());
this.query = computeQuery();
}
public static DruidQuery fromPartialQuery(
@ -796,40 +806,40 @@ public class DruidQuery
*
* @return Druid query
*/
private Query computeQuery(final QueryFeatureInspector queryFeatureInspector)
private Query computeQuery()
{
if (dataSource instanceof QueryDataSource) {
// If there is a subquery, then we prefer the outer query to be a groupBy if possible, since this potentially
// enables more efficient execution. (The groupBy query toolchest can handle some subqueries by itself, without
// requiring the Broker to inline results.)
final GroupByQuery outerQuery = toGroupByQuery(queryFeatureInspector);
final GroupByQuery outerQuery = toGroupByQuery();
if (outerQuery != null) {
return outerQuery;
}
}
final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery(queryFeatureInspector);
final TimeBoundaryQuery timeBoundaryQuery = toTimeBoundaryQuery();
if (timeBoundaryQuery != null) {
return timeBoundaryQuery;
}
final TimeseriesQuery tsQuery = toTimeseriesQuery(queryFeatureInspector);
final TimeseriesQuery tsQuery = toTimeseriesQuery();
if (tsQuery != null) {
return tsQuery;
}
final TopNQuery topNQuery = toTopNQuery(queryFeatureInspector);
final TopNQuery topNQuery = toTopNQuery();
if (topNQuery != null) {
return topNQuery;
}
final GroupByQuery groupByQuery = toGroupByQuery(queryFeatureInspector);
final GroupByQuery groupByQuery = toGroupByQuery();
if (groupByQuery != null) {
return groupByQuery;
}
final ScanQuery scanQuery = toScanQuery(queryFeatureInspector);
final ScanQuery scanQuery = toScanQuery();
if (scanQuery != null) {
return scanQuery;
}
@ -843,9 +853,9 @@ public class DruidQuery
* @return a TimeBoundaryQuery if possible. null if it is not possible to construct one.
*/
@Nullable
private TimeBoundaryQuery toTimeBoundaryQuery(QueryFeatureInspector queryFeatureInspector)
private TimeBoundaryQuery toTimeBoundaryQuery()
{
if (!queryFeatureInspector.feature(QueryFeature.CAN_RUN_TIME_BOUNDARY)
if (!plannerContext.engineHasFeature(EngineFeature.TIME_BOUNDARY_QUERY)
|| grouping == null
|| grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
|| grouping.getHavingFilter() != null
@ -907,9 +917,9 @@ public class DruidQuery
* @return query
*/
@Nullable
private TimeseriesQuery toTimeseriesQuery(final QueryFeatureInspector queryFeatureInspector)
private TimeseriesQuery toTimeseriesQuery()
{
if (!queryFeatureInspector.feature(QueryFeature.CAN_RUN_TIMESERIES)
if (!plannerContext.engineHasFeature(EngineFeature.TIMESERIES_QUERY)
|| grouping == null
|| grouping.getSubtotals().hasEffect(grouping.getDimensionSpecs())
|| grouping.getHavingFilter() != null) {
@ -1019,10 +1029,10 @@ public class DruidQuery
* @return query or null
*/
@Nullable
private TopNQuery toTopNQuery(final QueryFeatureInspector queryFeatureInspector)
private TopNQuery toTopNQuery()
{
// Must be allowed by the QueryMaker.
if (!queryFeatureInspector.feature(QueryFeature.CAN_RUN_TOPN)) {
if (!plannerContext.engineHasFeature(EngineFeature.TOPN_QUERY)) {
return null;
}
@ -1114,7 +1124,7 @@ public class DruidQuery
* @return query or null
*/
@Nullable
private GroupByQuery toGroupByQuery(final QueryFeatureInspector queryFeatureInspector)
private GroupByQuery toGroupByQuery()
{
if (grouping == null) {
return null;
@ -1227,7 +1237,7 @@ public class DruidQuery
* @return query or null
*/
@Nullable
private ScanQuery toScanQuery(final QueryFeatureInspector queryFeatureInspector)
private ScanQuery toScanQuery()
{
if (grouping != null) {
// Scan cannot GROUP BY.
@ -1278,7 +1288,7 @@ public class DruidQuery
orderByColumns = Collections.emptyList();
}
if (!queryFeatureInspector.feature(QueryFeature.SCAN_CAN_ORDER_BY_NON_TIME) && !orderByColumns.isEmpty()) {
if (!plannerContext.engineHasFeature(EngineFeature.SCAN_ORDER_BY_NON_TIME) && !orderByColumns.isEmpty()) {
if (orderByColumns.size() > 1 || !ColumnHolder.TIME_COLUMN_NAME.equals(orderByColumns.get(0).getColumnName())) {
// Cannot handle this ordering.
// Scan cannot ORDER BY non-time columns.
@ -1303,10 +1313,13 @@ public class DruidQuery
final SortedSet<String> scanColumns = new TreeSet<>(outputRowSignature.getColumnNames());
orderByColumns.forEach(column -> scanColumns.add(column.getColumnName()));
final VirtualColumns virtualColumns = getVirtualColumns(true);
final ImmutableList<String> scanColumnsList = ImmutableList.copyOf(scanColumns);
return new ScanQuery(
newDataSource,
filtration.getQuerySegmentSpec(),
getVirtualColumns(true),
virtualColumns,
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST,
0,
scanOffset,
@ -1314,9 +1327,56 @@ public class DruidQuery
null,
orderByColumns,
filtration.getDimFilter(),
ImmutableList.copyOf(scanColumns),
scanColumnsList,
false,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext().getMergedParams())
withScanSignatureIfNeeded(
virtualColumns,
scanColumnsList,
plannerContext.getQueryContext()
).getMergedParams()
);
}
/**
* Returns a copy of "queryContext" with {@link #CTX_SCAN_SIGNATURE} added if the execution context has the
* {@link EngineFeature#SCAN_NEEDS_SIGNATURE} feature.
*/
private QueryContext withScanSignatureIfNeeded(
final VirtualColumns virtualColumns,
final List<String> scanColumns,
final QueryContext queryContext
)
{
if (plannerContext.engineHasFeature(EngineFeature.SCAN_NEEDS_SIGNATURE)) {
// Compute the signature of the columns that we are selecting.
final RowSignature.Builder scanSignatureBuilder = RowSignature.builder();
for (final String columnName : scanColumns) {
final ColumnCapabilities capabilities =
virtualColumns.getColumnCapabilitiesWithFallback(sourceRowSignature, columnName);
if (capabilities == null) {
// No type for this column. This is a planner bug.
throw new ISE("No type for column [%s]", columnName);
}
scanSignatureBuilder.add(columnName, capabilities.toColumnType());
}
final RowSignature signature = scanSignatureBuilder.build();
try {
queryContext.addSystemParam(
CTX_SCAN_SIGNATURE,
plannerContext.getJsonMapper().writeValueAsString(signature)
);
return queryContext;
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
} else {
return queryContext;
}
}
}

View File

@ -20,35 +20,63 @@
package org.apache.druid.sql.calcite.run;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.planner.PlannerContext;
/**
* Arguments to {@link QueryFeatureInspector#feature(QueryFeature)}.
* Arguments to {@link SqlEngine#feature(EngineFeature, PlannerContext)}.
*/
public enum QueryFeature
public enum EngineFeature
{
/**
* Can execute SELECT statements.
*/
CAN_SELECT,
/**
* Can execute INSERT statements.
*/
CAN_INSERT,
/**
* Can execute REPLACE statements.
*/
CAN_REPLACE,
/**
* Queries of type {@link org.apache.druid.query.timeseries.TimeseriesQuery} are usable.
*/
CAN_RUN_TIMESERIES,
TIMESERIES_QUERY,
/**
* Queries of type {@link org.apache.druid.query.topn.TopNQuery} are usable.
*/
CAN_RUN_TOPN,
TOPN_QUERY,
/**
* Queries of type {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} are usable.
*/
TIME_BOUNDARY_QUERY,
/**
* Queries can use {@link ExternalDataSource}.
*/
CAN_READ_EXTERNAL_DATA,
READ_EXTERNAL_DATA,
/**
* Scan queries can use {@link org.apache.druid.query.scan.ScanQuery#getOrderBys()} that are based on something
* other than the "__time" column.
*/
SCAN_CAN_ORDER_BY_NON_TIME,
SCAN_ORDER_BY_NON_TIME,
/**
* Queries of type {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} are usable.
* Scan queries must have {@link org.apache.druid.sql.calcite.rel.DruidQuery#CTX_SCAN_SIGNATURE} set in their
* query contexts.
*/
CAN_RUN_TIME_BOUNDARY
SCAN_NEEDS_SIGNATURE,
/**
* Planner is permitted to use a {@link org.apache.calcite.runtime.Bindable} plan on local resources, instead
* of {@link QueryMaker}, for SELECT query implementation. Used for system tables and the like.
*/
ALLOW_BINDABLE_PLAN
}

View File

@ -25,14 +25,12 @@ import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -42,7 +40,6 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.math.expr.Evals;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
@ -80,44 +77,18 @@ public class NativeQueryMaker implements QueryMaker
private final PlannerContext plannerContext;
private final ObjectMapper jsonMapper;
private final List<Pair<Integer, String>> fieldMapping;
private final RelDataType resultType;
public NativeQueryMaker(
final QueryLifecycleFactory queryLifecycleFactory,
final PlannerContext plannerContext,
final ObjectMapper jsonMapper,
final List<Pair<Integer, String>> fieldMapping,
final RelDataType resultType
final List<Pair<Integer, String>> fieldMapping
)
{
this.queryLifecycleFactory = queryLifecycleFactory;
this.plannerContext = plannerContext;
this.jsonMapper = jsonMapper;
this.fieldMapping = fieldMapping;
this.resultType = resultType;
}
@Override
public RelDataType getResultType()
{
return resultType;
}
@Override
public boolean feature(QueryFeature feature)
{
switch (feature) {
case CAN_RUN_TIMESERIES:
case CAN_RUN_TOPN:
return true;
case CAN_READ_EXTERNAL_DATA:
case SCAN_CAN_ORDER_BY_NON_TIME:
return false;
case CAN_RUN_TIME_BOUNDARY:
return QueryContexts.isTimeBoundaryPlanningEnabled(plannerContext.getQueryContext().getMergedParams());
default:
throw new IAE("Unrecognized feature: %s", feature);
}
}
@Override

View File

@ -1,69 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.run;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@LazySingleton
public class NativeQueryMakerFactory implements QueryMakerFactory
{
public static final String TYPE = "native";
private final QueryLifecycleFactory queryLifecycleFactory;
private final ObjectMapper jsonMapper;
@Inject
public NativeQueryMakerFactory(
final QueryLifecycleFactory queryLifecycleFactory,
final ObjectMapper jsonMapper
)
{
this.queryLifecycleFactory = queryLifecycleFactory;
this.jsonMapper = jsonMapper;
}
@Override
public QueryMaker buildForSelect(final RelRoot relRoot, final PlannerContext plannerContext)
{
return new NativeQueryMaker(
queryLifecycleFactory,
plannerContext,
jsonMapper,
relRoot.fields,
relRoot.validatedRowType
);
}
@Override
public QueryMaker buildForInsert(
final String targetDataSource,
final RelRoot relRoot,
final PlannerContext plannerContext
) throws ValidationException
{
throw new ValidationException("Cannot execute INSERT queries.");
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.run;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import java.util.Set;
@LazySingleton
public class NativeSqlEngine implements SqlEngine
{
public static final Set<String> SYSTEM_CONTEXT_PARAMETERS = ImmutableSet.of(
TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME,
TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME,
GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD,
GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY,
GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX,
DruidQuery.CTX_SCAN_SIGNATURE,
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS
);
private static final String NAME = "native";
private final QueryLifecycleFactory queryLifecycleFactory;
private final ObjectMapper jsonMapper;
@Inject
public NativeSqlEngine(
final QueryLifecycleFactory queryLifecycleFactory,
final ObjectMapper jsonMapper
)
{
this.queryLifecycleFactory = queryLifecycleFactory;
this.jsonMapper = jsonMapper;
}
@Override
public String name()
{
return NAME;
}
@Override
public void validateContext(QueryContext queryContext) throws ValidationException
{
SqlEngines.validateNoSpecialContextKeys(queryContext, SYSTEM_CONTEXT_PARAMETERS);
}
@Override
public RelDataType resultTypeForSelect(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
return validatedRowType;
}
@Override
public RelDataType resultTypeForInsert(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
throw new UnsupportedOperationException();
}
@Override
public boolean feature(EngineFeature feature, PlannerContext plannerContext)
{
switch (feature) {
case CAN_SELECT:
case ALLOW_BINDABLE_PLAN:
case TIMESERIES_QUERY:
case TOPN_QUERY:
return true;
case TIME_BOUNDARY_QUERY:
return QueryContexts.isTimeBoundaryPlanningEnabled(plannerContext.getQueryContext().getMergedParams());
case CAN_INSERT:
case CAN_REPLACE:
case READ_EXTERNAL_DATA:
case SCAN_ORDER_BY_NON_TIME:
case SCAN_NEEDS_SIGNATURE:
return false;
default:
throw new IAE("Unrecognized feature: %s", feature);
}
}
@Override
public QueryMaker buildQueryMakerForSelect(final RelRoot relRoot, final PlannerContext plannerContext)
{
return new NativeQueryMaker(
queryLifecycleFactory,
plannerContext,
jsonMapper,
relRoot.fields
);
}
@Override
public QueryMaker buildQueryMakerForInsert(
final String targetDataSource,
final RelRoot relRoot,
final PlannerContext plannerContext
)
{
throw new UnsupportedOperationException();
}
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.run;
/**
* Gives the SQL-to-Druid query translator information about what features are supporetd by the {@link QueryMaker}
* that will execute the query.
*/
public interface QueryFeatureInspector
{
/**
* Returns whether a feature is present or not.
*/
boolean feature(QueryFeature feature);
}

View File

@ -19,24 +19,19 @@
package org.apache.druid.sql.calcite.run;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.sql.calcite.rel.DruidQuery;
/**
* Interface for executing Druid queries. Each one is created by a {@link QueryMakerFactory} and is tied to a
* specific SQL query. Extends {@link QueryFeatureInspector}, so calling code can tell what this executor supports.
* Interface for executing Druid queries. Each one is created by a {@link SqlEngine} and is tied to a
* specific SQL query.
*/
public interface QueryMaker extends QueryFeatureInspector
public interface QueryMaker
{
/**
* Returns the SQL row type for this query.
*/
RelDataType getResultType();
/**
* Executes a given Druid query, which is expected to correspond to the SQL query that this QueryMaker was originally
* created for. The returned arrays match the row type given by {@link #getResultType()}.
* created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or
* {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement.
*/
Sequence<Object[]> runQuery(DruidQuery druidQuery);
}

View File

@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.run;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.sql.calcite.planner.PlannerContext;
/**
* Interface for creating {@link QueryMaker}, which in turn are used to execute Druid queries.
*/
public interface QueryMakerFactory
{
/**
* Create a {@link QueryMaker} for a SELECT query.
*
* @param relRoot planned and validated rel
* @param plannerContext context for this query
*
* @return an executor for the provided query
*
* @throws ValidationException if this factory cannot build an executor for the provided query
*/
@SuppressWarnings("RedundantThrows")
QueryMaker buildForSelect(RelRoot relRoot, PlannerContext plannerContext) throws ValidationException;
/**
* Create a {@link QueryMaker} for an INSERT ... SELECT query.
*
* @param targetDataSource datasource for the INSERT portion of the query
* @param relRoot planned and validated rel for the SELECT portion of the query
* @param plannerContext context for this query
*
* @return an executor for the provided query
*
* @throws ValidationException if this factory cannot build an executor for the provided query
*/
QueryMaker buildForInsert(
String targetDataSource,
RelRoot relRoot,
PlannerContext plannerContext
) throws ValidationException;
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.run;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.query.QueryContext;
import org.apache.druid.sql.calcite.planner.PlannerContext;
/**
* Engine for running SQL queries.
*/
public interface SqlEngine
{
/**
* Name of this engine. Appears in user-visible messages.
*/
String name();
/**
* Whether a feature applies to this engine or not.
*/
boolean feature(EngineFeature feature, PlannerContext plannerContext);
/**
* Validates a provided query context. Returns quietly if the context is OK; throws {@link ValidationException}
* if the context has a problem.
*/
void validateContext(QueryContext queryContext) throws ValidationException;
/**
* SQL row type that would be emitted by the {@link QueryMaker} from {@link #buildQueryMakerForSelect}.
* Called for SELECT. Not called for EXPLAIN, which is handled by the planner itself.
*
* @param typeFactory type factory
* @param validatedRowType row type from Calcite's validator
*/
RelDataType resultTypeForSelect(RelDataTypeFactory typeFactory, RelDataType validatedRowType);
/**
* SQL row type that would be emitted by the {@link QueryMaker} from {@link #buildQueryMakerForInsert}.
* Called for INSERT and REPLACE. Not called for EXPLAIN, which is handled by the planner itself.
*
* @param typeFactory type factory
* @param validatedRowType row type from Calcite's validator
*/
RelDataType resultTypeForInsert(RelDataTypeFactory typeFactory, RelDataType validatedRowType);
/**
* Create a {@link QueryMaker} for a SELECT query.
*
* @param relRoot planned and validated rel
* @param plannerContext context for this query
*
* @return an executor for the provided query
*
* @throws ValidationException if this engine cannot build an executor for the provided query
*/
@SuppressWarnings("RedundantThrows")
QueryMaker buildQueryMakerForSelect(RelRoot relRoot, PlannerContext plannerContext) throws ValidationException;
/**
* Create a {@link QueryMaker} for an INSERT ... SELECT query.
*
* @param targetDataSource datasource for the INSERT portion of the query
* @param relRoot planned and validated rel for the SELECT portion of the query
* @param plannerContext context for this query
*
* @return an executor for the provided query
*
* @throws ValidationException if this engine cannot build an executor for the provided query
*/
@SuppressWarnings("RedundantThrows")
QueryMaker buildQueryMakerForInsert(
String targetDataSource,
RelRoot relRoot,
PlannerContext plannerContext
) throws ValidationException;
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.run;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.QueryContext;
import java.util.Set;
public class SqlEngines
{
/**
* Validates that a provided query context does not have any of the internal, special context keys listed in the
* {@code specialContextKeys} collection.
*
* Returns quietly if the context is OK; throws {@link ValidationException} if there is a problem.
*
* This is a helper function used by {@link SqlEngine#validateContext} implementations.
*/
public static void validateNoSpecialContextKeys(final QueryContext queryContext, final Set<String> specialContextKeys)
throws ValidationException
{
for (String contextParameterName : queryContext.getMergedParams().keySet()) {
if (specialContextKeys.contains(contextParameterName)) {
throw new ValidationException(
StringUtils.format(
"Cannot execute query with context parameter [%s]",
contextParameterName
)
);
}
}
}
}

View File

@ -57,9 +57,10 @@ public class DruidViewMacro implements TableMacro
public TranslatableTable apply(final List<Object> arguments)
{
final RelDataType rowType;
try (final DruidPlanner planner = plannerFactory.createPlanner(viewSql, new QueryContext())) {
try (final DruidPlanner planner =
plannerFactory.createPlanner(ViewSqlEngine.INSTANCE, viewSql, new QueryContext())) {
planner.validate();
rowType = planner.prepare().getRowType();
rowType = planner.prepare().getValidatedRowType();
}
catch (Exception e) {
throw new RuntimeException(e);

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.view;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.QueryContext;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
/**
* Engine used for getting the row type of views. Does not do any actual planning or execution of the view.
*/
public class ViewSqlEngine implements SqlEngine
{
public static final ViewSqlEngine INSTANCE = new ViewSqlEngine();
private static final String NAME = "view";
private ViewSqlEngine()
{
// Singleton.
}
@Override
public String name()
{
return NAME;
}
@Override
public boolean feature(EngineFeature feature, PlannerContext plannerContext)
{
switch (feature) {
// Use most permissive set of SELECT features, since our goal is to get the row type of the view.
// Later on, the query involving the view will be executed with an actual engine with a different set of
// features, and planning may fail then. But we don't want it to fail now.
case CAN_SELECT:
case ALLOW_BINDABLE_PLAN:
case READ_EXTERNAL_DATA:
case SCAN_ORDER_BY_NON_TIME:
return true;
// Views can't sit on top of INSERT or REPLACE.
case CAN_INSERT:
case CAN_REPLACE:
return false;
// Simplify planning by sticking to basic query types.
case TOPN_QUERY:
case TIMESERIES_QUERY:
case TIME_BOUNDARY_QUERY:
case SCAN_NEEDS_SIGNATURE:
return false;
default:
throw new IAE("Unrecognized feature: %s", feature);
}
}
@Override
public void validateContext(QueryContext queryContext)
{
// No query context validation for view row typing.
}
@Override
public RelDataType resultTypeForSelect(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
return validatedRowType;
}
@Override
public RelDataType resultTypeForInsert(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
// Can't have views of INSERT or REPLACE statements.
throw new UnsupportedOperationException();
}
@Override
public QueryMaker buildQueryMakerForSelect(RelRoot relRoot, PlannerContext plannerContext)
{
// View engine does not execute queries.
throw new UnsupportedOperationException();
}
@Override
public QueryMaker buildQueryMakerForInsert(String targetDataSource, RelRoot relRoot, PlannerContext plannerContext)
{
// Can't have views of INSERT or REPLACE statements.
throw new UnsupportedOperationException();
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.sql.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject;
@ -50,6 +51,8 @@ import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.SqlRowTransformer;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
@ -64,7 +67,6 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
@ -80,7 +82,7 @@ public class SqlResource
private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
private final SqlStatementFactory sqlLifecycleFactory;
private final SqlStatementFactory sqlStatementFactory;
private final SqlLifecycleManager sqlLifecycleManager;
private final ServerConfig serverConfig;
@ -88,16 +90,35 @@ public class SqlResource
public SqlResource(
@Json ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper,
SqlStatementFactory sqlLifecycleFactory,
NativeSqlEngine engine,
SqlStatementFactoryFactory sqlStatementFactoryFactory,
SqlLifecycleManager sqlLifecycleManager,
ServerConfig serverConfig
)
{
this(
jsonMapper,
authorizerMapper,
sqlStatementFactoryFactory.factorize(engine),
sqlLifecycleManager,
serverConfig
);
}
@VisibleForTesting
SqlResource(
final ObjectMapper jsonMapper,
final AuthorizerMapper authorizerMapper,
final SqlStatementFactory sqlStatementFactory,
final SqlLifecycleManager sqlLifecycleManager,
final ServerConfig serverConfig
)
{
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.authorizerMapper = Preconditions.checkNotNull(authorizerMapper, "authorizerMapper");
this.sqlLifecycleFactory = Preconditions.checkNotNull(sqlLifecycleFactory, "sqlLifecycleFactory");
this.sqlStatementFactory = Preconditions.checkNotNull(sqlStatementFactory, "sqlStatementFactory");
this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, "sqlLifecycleManager");
this.serverConfig = serverConfig;
this.serverConfig = Preconditions.checkNotNull(serverConfig, "serverConfig");
}
@POST
@ -108,7 +129,7 @@ public class SqlResource
@Context final HttpServletRequest req
) throws IOException
{
final HttpStatement stmt = sqlLifecycleFactory.httpStatement(sqlQuery, req);
final HttpStatement stmt = sqlStatementFactory.httpStatement(sqlQuery, req);
final String sqlQueryId = stmt.sqlQueryId();
final String currThreadName = Thread.currentThread().getName();

View File

@ -19,10 +19,9 @@
# Properties for Calcite (formerly known as "Saffron").
# Set here to ensure that the properties are absolutely,
# positively read when Calcite first initializes.
#
# This file _should_ be redundant with Calcites, but
# there do seem to be race conditions at various times.
calcite.default.charset=UTF-16LE
calcite.default.nationalcharset=UTF-16LE
calcite.default.collation.name=UTF-16LE$en_US
calcite.volcano.dump.sets=false
calcite.volcano.dump.graphviz=false

View File

@ -68,7 +68,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@ -89,7 +88,7 @@ public class SqlStatementTest
private SpecificSegmentsQuerySegmentWalker walker = null;
private TestRequestLogger testRequestLogger;
private ListeningExecutorService executorService;
private SqlStatementFactory sqlLifecycleFactory;
private SqlStatementFactory sqlStatementFactory;
private final DefaultQueryConfig defaultQueryConfig = new DefaultQueryConfig(
ImmutableMap.of("DEFAULT_KEY", "DEFAULT_VALUE"));
@ -145,7 +144,6 @@ public class SqlStatementTest
final PlannerFactory plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
@ -155,7 +153,7 @@ public class SqlStatementTest
new CalciteRulesManager(ImmutableSet.of())
);
this.sqlLifecycleFactory = new SqlStatementFactory(
this.sqlStatementFactory = new SqlStatementFactoryFactory(
plannerFactory,
new NoopServiceEmitter(),
testRequestLogger,
@ -163,7 +161,7 @@ public class SqlStatementTest
new AuthConfig(),
Suppliers.ofInstance(defaultQueryConfig),
new SqlLifecycleManager()
);
).factorize(CalciteTests.createMockSqlEngine(walker, conglomerate));
}
@After
@ -214,7 +212,7 @@ public class SqlStatementTest
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
CalciteTests.REGULAR_USER_AUTH_RESULT);
DirectStatement stmt = sqlLifecycleFactory.directStatement(sqlReq);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
List<Object[]> results = stmt.execute().toList();
assertEquals(1, results.size());
assertEquals(6L, results.get(0)[0]);
@ -227,7 +225,7 @@ public class SqlStatementTest
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS",
CalciteTests.REGULAR_USER_AUTH_RESULT);
DirectStatement stmt = sqlLifecycleFactory.directStatement(sqlReq);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
try {
stmt.execute();
fail();
@ -244,7 +242,7 @@ public class SqlStatementTest
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.bogus",
CalciteTests.REGULAR_USER_AUTH_RESULT);
DirectStatement stmt = sqlLifecycleFactory.directStatement(sqlReq);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
try {
stmt.execute();
fail();
@ -261,7 +259,7 @@ public class SqlStatementTest
SqlQueryPlus sqlReq = queryPlus(
"select count(*) from forbiddenDatasource",
CalciteTests.REGULAR_USER_AUTH_RESULT);
DirectStatement stmt = sqlLifecycleFactory.directStatement(sqlReq);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
try {
stmt.execute();
fail();
@ -290,7 +288,7 @@ public class SqlStatementTest
@Test
public void testHttpHappyPath()
{
HttpStatement stmt = sqlLifecycleFactory.httpStatement(
HttpStatement stmt = sqlStatementFactory.httpStatement(
makeQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo"),
request(true)
);
@ -303,7 +301,7 @@ public class SqlStatementTest
@Test
public void testHttpSyntaxError()
{
HttpStatement stmt = sqlLifecycleFactory.httpStatement(
HttpStatement stmt = sqlStatementFactory.httpStatement(
makeQuery("SELECT COUNT(*) AS cnt, 'foo' AS"),
request(true)
);
@ -320,7 +318,7 @@ public class SqlStatementTest
@Test
public void testHttpValidationError()
{
HttpStatement stmt = sqlLifecycleFactory.httpStatement(
HttpStatement stmt = sqlStatementFactory.httpStatement(
makeQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.bogus"),
request(true)
);
@ -337,7 +335,7 @@ public class SqlStatementTest
@Test
public void testHttpPermissionError()
{
HttpStatement stmt = sqlLifecycleFactory.httpStatement(
HttpStatement stmt = sqlStatementFactory.httpStatement(
makeQuery("select count(*) from forbiddenDatasource"),
request(false)
);
@ -359,10 +357,10 @@ public class SqlStatementTest
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
CalciteTests.REGULAR_USER_AUTH_RESULT);
PreparedStatement stmt = sqlLifecycleFactory.preparedStatement(sqlReq);
PreparedStatement stmt = sqlStatementFactory.preparedStatement(sqlReq);
PrepareResult prepareResult = stmt.prepare();
RelDataType rowType = prepareResult.getRowType();
RelDataType rowType = prepareResult.getReturnedRowType();
assertEquals(2, rowType.getFieldCount());
List<RelDataTypeField> fields = rowType.getFieldList();
assertEquals("cnt", fields.get(0).getName());
@ -388,7 +386,7 @@ public class SqlStatementTest
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS",
CalciteTests.REGULAR_USER_AUTH_RESULT);
PreparedStatement stmt = sqlLifecycleFactory.preparedStatement(sqlReq);
PreparedStatement stmt = sqlStatementFactory.preparedStatement(sqlReq);
try {
stmt.prepare();
fail();
@ -405,7 +403,7 @@ public class SqlStatementTest
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.bogus",
CalciteTests.REGULAR_USER_AUTH_RESULT);
PreparedStatement stmt = sqlLifecycleFactory.preparedStatement(sqlReq);
PreparedStatement stmt = sqlStatementFactory.preparedStatement(sqlReq);
try {
stmt.prepare();
fail();
@ -422,7 +420,7 @@ public class SqlStatementTest
SqlQueryPlus sqlReq = queryPlus(
"select count(*) from forbiddenDatasource",
CalciteTests.REGULAR_USER_AUTH_RESULT);
PreparedStatement stmt = sqlLifecycleFactory.preparedStatement(sqlReq);
PreparedStatement stmt = sqlStatementFactory.preparedStatement(sqlReq);
try {
stmt.prepare();
fail();
@ -443,7 +441,7 @@ public class SqlStatementTest
.context(ImmutableMap.of(QueryContexts.BY_SEGMENT_KEY, "true"))
.auth(CalciteTests.REGULAR_USER_AUTH_RESULT)
.build();
DirectStatement stmt = sqlLifecycleFactory.directStatement(sqlReq);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
Map<String, Object> context = stmt.sqlRequest().context().getMergedParams();
Assert.assertEquals(2, context.size());
// should contain only query id, not bySegment since it is not valid for SQL
@ -458,7 +456,7 @@ public class SqlStatementTest
.context(ImmutableMap.of(QueryContexts.BY_SEGMENT_KEY, "true"))
.auth(CalciteTests.REGULAR_USER_AUTH_RESULT)
.build();
DirectStatement stmt = sqlLifecycleFactory.directStatement(sqlReq);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
Map<String, Object> context = stmt.sqlRequest().context().getMergedParams();
Assert.assertEquals(2, context.size());
// Statement should contain default query context values

View File

@ -46,6 +46,7 @@ import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
@ -71,8 +72,8 @@ import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.NativeQueryMakerFactory;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import org.apache.druid.sql.calcite.schema.NamedSchema;
@ -116,6 +117,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
/**
@ -219,7 +221,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
binder.bind(QueryScheduler.class)
.toProvider(QuerySchedulerProvider.class)
.in(LazySingleton.class);
binder.bind(QueryMakerFactory.class).to(NativeQueryMakerFactory.class);
binder.bind(SqlEngine.class).to(NativeSqlEngine.class);
binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>(){}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())));
binder.bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of()));
}
@ -916,13 +918,14 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
DruidMeta smallFrameDruidMeta = new DruidMeta(
CalciteTests.createSqlLifecycleFactory(
CalciteTests.createSqlStatementFactory(
CalciteTests.createMockSqlEngine(walker, conglomerate),
new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
@ -934,7 +937,8 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
),
smallFrameConfig,
new ErrorHandler(new ServerConfig()),
injector
exec,
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
)
{
@Override
@ -975,6 +979,8 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
),
rows
);
exec.shutdown();
}
@Test
@ -1006,13 +1012,14 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
DruidMeta smallFrameDruidMeta = new DruidMeta(
CalciteTests.createSqlLifecycleFactory(
CalciteTests.createSqlStatementFactory(
CalciteTests.createMockSqlEngine(walker, conglomerate),
new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
@ -1024,7 +1031,8 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
),
smallFrameConfig,
new ErrorHandler(new ServerConfig()),
injector
exec,
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
)
{
@Override
@ -1069,6 +1077,8 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
),
rows
);
exec.shutdown();
}
@Test

View File

@ -92,7 +92,7 @@ public class DruidStatementTest extends CalciteTestBase
}
private SpecificSegmentsQuerySegmentWalker walker;
private SqlStatementFactory sqlLifecycleFactory;
private SqlStatementFactory sqlStatementFactory;
@Before
public void setUp() throws Exception
@ -105,7 +105,6 @@ public class DruidStatementTest extends CalciteTestBase
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
final PlannerFactory plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
@ -114,7 +113,10 @@ public class DruidStatementTest extends CalciteTestBase
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);
this.sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(plannerFactory);
this.sqlStatementFactory = CalciteTests.createSqlStatementFactory(
CalciteTests.createMockSqlEngine(walker, conglomerate),
plannerFactory
);
}
@After
@ -138,7 +140,7 @@ public class DruidStatementTest extends CalciteTestBase
"",
0,
new QueryContext(),
sqlLifecycleFactory
sqlStatementFactory
);
}
@ -516,7 +518,7 @@ public class DruidStatementTest extends CalciteTestBase
return new DruidJdbcPreparedStatement(
"",
0,
sqlLifecycleFactory.preparedStatement(queryPlus),
sqlStatementFactory.preparedStatement(queryPlus),
Long.MAX_VALUE
);
}

View File

@ -96,6 +96,8 @@ import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
import org.apache.druid.sql.calcite.table.RowSignatures;
@ -120,10 +122,10 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -259,6 +261,9 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public static Closer resourceCloser;
public static int minTopNThreshold = TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD;
public final ObjectMapper queryJsonMapper;
@Nullable
public final SqlEngine engine0;
final boolean useDefault = NullHandling.replaceWithDefault();
@Rule
@ -271,10 +276,21 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public boolean cannotVectorize = false;
public boolean skipVectorize = false;
public ObjectMapper queryJsonMapper;
public SpecificSegmentsQuerySegmentWalker walker = null;
public SqlEngine engine = null;
public QueryLogHook queryLogHook;
public BaseCalciteQueryTest()
{
this(null);
}
public BaseCalciteQueryTest(@Nullable final SqlEngine engine)
{
this.queryJsonMapper = createQueryJsonMapper();
this.engine0 = engine;
}
static {
TIMESERIES_CONTEXT_LOS_ANGELES.put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID);
TIMESERIES_CONTEXT_LOS_ANGELES.put(PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z");
@ -470,7 +486,6 @@ public class BaseCalciteQueryTest extends CalciteTestBase
@Rule
public QueryLogHook getQueryLogHook()
{
queryJsonMapper = createQueryJsonMapper();
return queryLogHook = QueryLogHook.create(queryJsonMapper);
}
@ -478,6 +493,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public void setUp() throws Exception
{
walker = createQuerySegmentWalker();
engine = createEngine();
// also register the static injected mapper, though across multiple test runs
ObjectMapper mapper = CalciteTests.getJsonMapper();
@ -490,6 +506,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
{
walker.close();
walker = null;
engine = null;
}
public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker() throws IOException
@ -500,6 +517,18 @@ public class BaseCalciteQueryTest extends CalciteTestBase
);
}
public SqlEngine createEngine() throws IOException
{
if (engine0 == null) {
return new NativeSqlEngine(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
queryJsonMapper
);
} else {
return engine0;
}
}
public ObjectMapper createQueryJsonMapper()
{
// ugly workaround, for some reason the static mapper from Calcite.INJECTOR seems to get messed up over
@ -571,9 +600,11 @@ public class BaseCalciteQueryTest extends CalciteTestBase
log.error(e, "Expected CannotPlanException for query: %s", sql);
Assert.fail(sql);
}
Assert.assertEquals(sql,
StringUtils.format("Cannot build plan for query: %s. %s", sql, expectedError),
e.getMessage());
Assert.assertEquals(
sql,
StringUtils.format("Cannot build plan for query. %s", expectedError),
e.getMessage()
);
}
/**
@ -728,11 +759,16 @@ public class BaseCalciteQueryTest extends CalciteTestBase
final List<Object[]> expectedResults
)
{
log.info("SQL: %s", sql);
queryLogHook.clearRecordedQueries();
final Pair<RowSignature, List<Object[]>> plannerResults =
getResults(plannerConfig, queryContext, DEFAULT_PARAMETERS, sql, authenticationResult);
verifyResults(sql, expectedQueries, expectedResults, plannerResults);
testQuery(
plannerConfig,
queryContext,
Collections.emptyList(),
sql,
authenticationResult,
expectedQueries,
expectedResults,
null
);
}
public void testQuery(
@ -780,6 +816,9 @@ public class BaseCalciteQueryTest extends CalciteTestBase
);
}
/**
* All testQuery roads lead to this method.
*/
public void testQuery(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext,
@ -824,7 +863,8 @@ public class BaseCalciteQueryTest extends CalciteTestBase
expectedExceptionInitializer.accept(expectedException);
}
final Pair<RowSignature, List<Object[]>> plannerResults = getResults(plannerConfig, theQueryContext, parameters, sql, authenticationResult);
final Pair<RowSignature, List<Object[]>> plannerResults =
getResults(plannerConfig, theQueryContext, parameters, sql, authenticationResult);
verifyResults(sql, theQueries, plannerResults, expectedResultsVerifier);
}
}
@ -850,7 +890,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
);
}
public Pair<RowSignature, List<Object[]>> getResults(
private Pair<RowSignature, List<Object[]>> getResults(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext,
final List<SqlParameter> parameters,
@ -862,7 +902,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
final ObjectMapper objectMapper
)
{
final SqlStatementFactory sqlLifecycleFactory = getSqlLifecycleFactory(
final SqlStatementFactory sqlStatementFactory = getSqlStatementFactory(
plannerConfig,
new AuthConfig(),
operatorTable,
@ -870,7 +910,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
authorizerMapper,
objectMapper
);
final DirectStatement stmt = sqlLifecycleFactory.directStatement(
final DirectStatement stmt = sqlStatementFactory.directStatement(
SqlQueryPlus.builder(sql)
.context(queryContext)
.sqlParameters(parameters)
@ -878,7 +918,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
.build()
);
Sequence<Object[]> results = stmt.execute();
RelDataType rowType = stmt.prepareResult().getRowType();
RelDataType rowType = stmt.prepareResult().getReturnedRowType();
return new Pair<>(
RowSignatures.fromRelDataType(rowType.getFieldNames(), rowType),
results.toList()
@ -1001,7 +1041,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
AuthenticationResult authenticationResult
)
{
SqlStatementFactory lifecycleFactory = getSqlLifecycleFactory(
SqlStatementFactory lifecycleFactory = getSqlStatementFactory(
plannerConfig,
authConfig,
createOperatorTable(),
@ -1019,7 +1059,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
return stmt.allResources();
}
public SqlStatementFactory getSqlLifecycleFactory(
public SqlStatementFactory getSqlStatementFactory(
PlannerConfig plannerConfig,
AuthConfig authConfig,
DruidOperatorTable operatorTable,
@ -1040,10 +1080,6 @@ public class BaseCalciteQueryTest extends CalciteTestBase
final PlannerFactory plannerFactory = new PlannerFactory(
rootSchema,
new TestQueryMakerFactory(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
objectMapper
),
operatorTable,
macroTable,
plannerConfig,
@ -1052,7 +1088,11 @@ public class BaseCalciteQueryTest extends CalciteTestBase
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);
final SqlStatementFactory sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(plannerFactory, authConfig);
final SqlStatementFactory sqlStatementFactory = CalciteTests.createSqlStatementFactory(
engine,
plannerFactory,
authConfig
);
viewManager.createView(
plannerFactory,
@ -1097,7 +1137,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
"invalidView",
"SELECT __time, dim1, dim2, m1 FROM druid.invalidDatasource WHERE dim2 = 'a'"
);
return sqlLifecycleFactory;
return sqlStatementFactory;
}
protected void cannotVectorize()
@ -1214,7 +1254,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
}
/**
* Reset the walker and conglomerate with required number of merge buffers. Default value is 2.
* Reset the conglomerate, walker, and engine with required number of merge buffers. Default value is 2.
*/
protected void requireMergeBuffers(int numMergeBuffers) throws IOException
{
@ -1222,7 +1262,8 @@ public class BaseCalciteQueryTest extends CalciteTestBase
resourceCloser,
QueryStackTests.getProcessingConfig(true, numMergeBuffers)
);
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
walker = createQuerySegmentWalker();
engine = createEngine();
}
protected Map<String, Object> withTimestampResultContext(

View File

@ -154,7 +154,7 @@ public class CalciteExplainQueryTest extends BaseCalciteQueryTest
// Skip vectorization since otherwise the "context" will change for each subtest.
skipVectorize();
String legacyExplanation = "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, dim1:STRING, dim2:STRING, dim3:STRING, cnt:LONG, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX<hyperUnique>}])\n";
String legacyExplanationWithContext = "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"useNativeQueryExplain\":false},\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, dim1:STRING, dim2:STRING, dim3:STRING, cnt:LONG, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX<hyperUnique>}])\n";
String legacyExplanationWithContext = "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"dim3\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"useNativeQueryExplain\":false,\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, dim1:STRING, dim2:STRING, dim3:STRING, cnt:LONG, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX<hyperUnique>}])\n";
String explanation = "[{"
+ "\"query\":{\"queryType\":\"scan\","
+ "\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},"

View File

@ -92,6 +92,11 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
protected boolean didTest = false;
public CalciteIngestionDmlTest()
{
super(IngestionTestSqlEngine.INSTANCE);
}
@After
@Override
public void tearDown() throws Exception
@ -267,7 +272,7 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
throw new ISE("Test must not have expectedQuery");
}
final SqlStatementFactory sqlLifecycleFactory = getSqlLifecycleFactory(
final SqlStatementFactory sqlStatementFactory = getSqlStatementFactory(
plannerConfig,
new AuthConfig(),
createOperatorTable(),
@ -276,7 +281,7 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
queryJsonMapper
);
DirectStatement stmt = sqlLifecycleFactory.directStatement(
DirectStatement stmt = sqlStatementFactory.directStatement(
SqlQueryPlus
.builder(sql)
.context(queryContext)

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -35,6 +36,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
@ -51,6 +53,7 @@ import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -80,7 +83,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
}
@Test
public void testInsertFromView()
public void testInsertFromViewA()
{
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM view.aview PARTITIONED BY ALL TIME")
@ -99,6 +102,58 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.verify();
}
@Test
public void testInsertFromViewC()
{
final RowSignature expectedSignature =
RowSignature.builder()
.add("dim1_firstchar", ColumnType.STRING)
.add("dim2", ColumnType.STRING)
.add("l2", ColumnType.LONG)
.build();
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM view.cview PARTITIONED BY ALL TIME")
.expectTarget("dst", expectedSignature)
.expectResources(viewRead("cview"), dataSourceWrite("dst"))
.expectQuery(
newScanQueryBuilder()
.dataSource(
join(
new QueryDataSource(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(selector("dim2", "a", null))
.columns("dim1", "dim2")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource("numfoo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim2", "l2")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
),
"j0.",
"(\"dim2\" == \"j0.dim2\")",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "substring(\"dim1\", 0, 1)", ColumnType.STRING),
expressionVirtualColumn("v1", "'a'", ColumnType.STRING)
)
.columns("j0.l2", "v0", "v1")
.context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
.build()
)
.verify();
}
@Test
public void testInsertIntoExistingTable()
{
@ -162,6 +217,20 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.verify();
}
@Test
public void testSelectFromSystemTable()
{
// TestInsertSqlEngine does not include ALLOW_BINDABLE_PLAN, so cannot query system tables.
testIngestionQuery()
.sql("INSERT INTO dst SELECT * FROM INFORMATION_SCHEMA.COLUMNS PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot query table [INFORMATION_SCHEMA.COLUMNS] with SQL engine 'ingestion-test'."
)
.verify();
}
@Test
public void testInsertIntoSystemTable()
{
@ -169,7 +238,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource."
"Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
)
.verify();
}
@ -181,7 +250,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [view.aview] because it is not a Druid datasource."
"Cannot INSERT into [view.aview] because it is not a Druid datasource (schema = druid)."
)
.verify();
}
@ -211,7 +280,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot INSERT into [nonexistent.dst] because it is not a Druid datasource."
"Cannot INSERT into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
)
.verify();
}
@ -364,7 +433,10 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "SELECT __time, FLOOR(m1) as floor_m1, dim1, CEIL(m2) as ceil_m2 FROM foo "
+ "CLUSTERED BY 2, dim1 DESC, CEIL(m2)"
)
.expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause")
.expectValidationError(
SqlPlanningException.class,
"CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"
)
.verify();
}
@ -526,7 +598,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.columns("x", "y", "z")
.context(
queryJsonMapper.readValue(
"{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}",
"{\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}",
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
)
)
@ -540,18 +612,24 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
// Use testQuery for EXPLAIN (not testIngestionQuery).
testQuery(
new PlannerConfig(),
ImmutableMap.of("sqlQueryId", "dummy"),
Collections.emptyList(),
StringUtils.format(
"EXPLAIN PLAN FOR INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
),
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
new DefaultResultsVerifier(
ImmutableList.of(
new Object[]{
expectedExplanation,
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"
}
)
),
null
),
null
);
// Not using testIngestionQuery, so must set didTest manually to satisfy the check in tearDown.
@ -579,6 +657,18 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
didTest = true;
}
@Test
public void testSurfaceErrorsWhenInsertingThroughIncorrectSelectStatment()
{
assertQueryIsUnplannable(
"INSERT INTO druid.dst SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo PARTITIONED BY ALL TIME",
"Possible error: SQL requires 'UNION' but only 'UNION ALL' is supported."
);
// Not using testIngestionQuery, so must set didTest manually to satisfy the check in tearDown.
didTest = true;
}
@Test
public void testInsertFromExternalUnauthorized()
{
@ -788,7 +878,10 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
testIngestionQuery()
.context(context)
.sql("INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "sqlOuterLimit cannot be provided on INSERT or REPLACE queries.")
.expectValidationError(
SqlPlanningException.class,
"sqlOuterLimit cannot be provided with INSERT."
)
.verify();
}
}

View File

@ -118,6 +118,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
@ -355,6 +356,46 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testCannotInsertWithNativeEngine()
{
final SqlPlanningException e = Assert.assertThrows(
SqlPlanningException.class,
() -> testQuery(
"INSERT INTO dst SELECT * FROM foo PARTITIONED BY ALL",
ImmutableList.of(),
ImmutableList.of()
)
);
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(
CoreMatchers.equalTo("Cannot execute INSERT with SQL engine 'native'.")
)
);
}
@Test
public void testCannotReplaceWithNativeEngine()
{
final SqlPlanningException e = Assert.assertThrows(
SqlPlanningException.class,
() -> testQuery(
"REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL",
ImmutableList.of(),
ImmutableList.of()
)
);
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(
CoreMatchers.equalTo("Cannot execute REPLACE with SQL engine 'native'.")
)
);
}
@Test
public void testAggregatorsOnInformationSchemaColumns()
{
@ -13890,15 +13931,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testSurfaceErrorsWhenInsertingThroughIncorrectSelectStatment()
{
assertQueryIsUnplannable(
"INSERT INTO druid.dst SELECT dim2, dim1, m1 FROM foo2 UNION SELECT dim1, dim2, m1 FROM foo PARTITIONED BY ALL TIME",
"Possible error: SQL requires 'UNION' but only 'UNION ALL' is supported."
);
}
@Test
public void testPlanWithInFilterLessThanInSubQueryThreshold()
{

View File

@ -45,6 +45,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -436,7 +437,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO INFORMATION_SCHEMA.COLUMNS OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot REPLACE into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource."
"Cannot REPLACE into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
)
.verify();
}
@ -448,7 +449,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO view.aview OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot REPLACE into [view.aview] because it is not a Druid datasource."
"Cannot REPLACE into [view.aview] because it is not a Druid datasource (schema = druid)."
)
.verify();
}
@ -478,7 +479,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO nonexistent.dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"Cannot REPLACE into [nonexistent.dst] because it is not a Druid datasource."
"Cannot REPLACE into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
)
.verify();
}
@ -605,7 +606,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.columns("x", "y", "z")
.context(
queryJsonMapper.readValue(
"{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}",
"{\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"}",
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
)
)
@ -619,18 +620,24 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
// Use testQuery for EXPLAIN (not testIngestionQuery).
testQuery(
new PlannerConfig(),
ImmutableMap.of("sqlQueryId", "dummy"),
Collections.emptyList(),
StringUtils.format(
"EXPLAIN PLAN FOR REPLACE INTO dst OVERWRITE ALL SELECT * FROM %s PARTITIONED BY ALL TIME",
externSql(externalDataSource)
),
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
new DefaultResultsVerifier(
ImmutableList.of(
new Object[]{
expectedExplanation,
"[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"
}
)
),
null
),
null
);
// Not using testIngestionQuery, so must set didTest manually to satisfy the check in tearDown.
@ -761,7 +768,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
testIngestionQuery()
.context(context)
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(SqlPlanningException.class, "sqlOuterLimit cannot be provided on INSERT or REPLACE queries.")
.expectValidationError(SqlPlanningException.class, "sqlOuterLimit cannot be provided with REPLACE.")
.verify();
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class CalciteScanSignatureTest extends BaseCalciteQueryTest
{
public CalciteScanSignatureTest()
{
super(null);
}
@Test
public void testScanSignature()
{
final Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"v0\",\"type\":\"STRING\"}]");
testQuery(
"SELECT CONCAT(dim1, '-', dim1, '_', dim1) as dimX FROM foo",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(expressionVirtualColumn(
"v0",
"concat(\"dim1\",'-',\"dim1\",'_',\"dim1\")",
ColumnType.STRING
))
.columns("v0")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(context)
.build()
),
ImmutableList.of(
new Object[]{"-_"},
new Object[]{"10.1-10.1_10.1"},
new Object[]{"2-2_2"},
new Object[]{"1-1_1"},
new Object[]{"def-def_def"},
new Object[]{"abc-abc_abc"}
)
);
}
@Override
public SqlEngine createEngine() throws IOException
{
// Create an engine that says yes to EngineFeature.SCAN_NEEDS_SIGNATURE.
return new ScanSignatureTestSqlEngine(super.createEngine());
}
private static class ScanSignatureTestSqlEngine implements SqlEngine
{
private final SqlEngine parent;
public ScanSignatureTestSqlEngine(final SqlEngine parent)
{
this.parent = parent;
}
@Override
public String name()
{
return getClass().getName();
}
@Override
public boolean feature(EngineFeature feature, PlannerContext plannerContext)
{
return feature == EngineFeature.SCAN_NEEDS_SIGNATURE || parent.feature(feature, plannerContext);
}
@Override
public void validateContext(QueryContext queryContext)
{
// No validation.
}
@Override
public RelDataType resultTypeForSelect(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
return validatedRowType;
}
@Override
public RelDataType resultTypeForInsert(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
throw new UnsupportedOperationException();
}
@Override
public QueryMaker buildQueryMakerForSelect(RelRoot relRoot, PlannerContext plannerContext)
throws ValidationException
{
return parent.buildQueryMakerForSelect(relRoot, plannerContext);
}
@Override
public QueryMaker buildQueryMakerForInsert(String targetDataSource, RelRoot relRoot, PlannerContext plannerContext)
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.QueryContext;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.table.RowSignatures;
public class IngestionTestSqlEngine implements SqlEngine
{
public static final IngestionTestSqlEngine INSTANCE = new IngestionTestSqlEngine();
private IngestionTestSqlEngine()
{
}
@Override
public String name()
{
return "ingestion-test";
}
@Override
public void validateContext(QueryContext queryContext)
{
// No validation.
}
@Override
public RelDataType resultTypeForSelect(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
throw new UnsupportedOperationException();
}
@Override
public RelDataType resultTypeForInsert(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
// Matches the return structure of TestInsertQueryMaker.
return typeFactory.createStructType(
ImmutableList.of(
typeFactory.createSqlType(SqlTypeName.VARCHAR),
typeFactory.createSqlType(SqlTypeName.OTHER)
),
ImmutableList.of("dataSource", "signature")
);
}
@Override
public boolean feature(final EngineFeature feature, final PlannerContext plannerContext)
{
switch (feature) {
case CAN_SELECT:
case ALLOW_BINDABLE_PLAN:
case TIMESERIES_QUERY:
case TOPN_QUERY:
case TIME_BOUNDARY_QUERY:
case SCAN_NEEDS_SIGNATURE:
return false;
case CAN_INSERT:
case CAN_REPLACE:
case READ_EXTERNAL_DATA:
case SCAN_ORDER_BY_NON_TIME:
return true;
default:
throw new IAE("Unrecognized feature: %s", feature);
}
}
@Override
public QueryMaker buildQueryMakerForSelect(RelRoot relRoot, PlannerContext plannerContext)
{
throw new UnsupportedOperationException();
}
@Override
public QueryMaker buildQueryMakerForInsert(String targetDataSource, RelRoot relRoot, PlannerContext plannerContext)
{
final RowSignature signature = RowSignatures.fromRelDataType(
relRoot.validatedRowType.getFieldNames(),
relRoot.validatedRowType
);
return new TestInsertQueryMaker(targetDataSource, signature);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
@ -44,6 +45,7 @@ import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@ -58,10 +60,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
@ -104,6 +104,7 @@ public class SqlVectorizedExpressionSanityTest extends InitializedNullHandlingTe
private static Closer CLOSER;
private static QueryRunnerFactoryConglomerate CONGLOMERATE;
private static SpecificSegmentsQuerySegmentWalker WALKER;
private static SqlEngine ENGINE;
@Nullable
private static PlannerFactory PLANNER_FACTORY;
@ -138,9 +139,9 @@ public class SqlVectorizedExpressionSanityTest extends InitializedNullHandlingTe
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(CONGLOMERATE, WALKER, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
ENGINE = CalciteTests.createMockSqlEngine(WALKER, CONGLOMERATE);
PLANNER_FACTORY = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(WALKER, CONGLOMERATE),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
@ -180,18 +181,22 @@ public class SqlVectorizedExpressionSanityTest extends InitializedNullHandlingTe
public static void sanityTestVectorizedSqlQueries(PlannerFactory plannerFactory, String query)
throws ValidationException
{
final Map<String, Object> vector = ImmutableMap.of(
final QueryContext vector = new QueryContext(
ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, "force",
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, "force"
)
);
final Map<String, Object> nonvector = ImmutableMap.of(
final QueryContext nonvector = new QueryContext(
ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, "false",
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, "false"
)
);
try (
final DruidPlanner vectorPlanner = plannerFactory.createPlannerForTesting(vector, query);
final DruidPlanner nonVectorPlanner = plannerFactory.createPlannerForTesting(nonvector, query)
final DruidPlanner vectorPlanner = plannerFactory.createPlannerForTesting(ENGINE, query, vector);
final DruidPlanner nonVectorPlanner = plannerFactory.createPlannerForTesting(ENGINE, query, nonvector)
) {
final PlannerResult vectorPlan = vectorPlanner.plan();
final PlannerResult nonVectorPlan = nonVectorPlanner.plan();

View File

@ -20,16 +20,11 @@
package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.QueryFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
/**
@ -37,56 +32,18 @@ import org.apache.druid.sql.calcite.run.QueryMaker;
*/
public class TestInsertQueryMaker implements QueryMaker
{
private final RelDataType resultType;
private final String targetDataSource;
private final RowSignature signature;
public TestInsertQueryMaker(
final RelDataTypeFactory typeFactory,
final String targetDataSource,
final RowSignature signature
)
{
this.resultType = typeFactory.createStructType(
ImmutableList.of(
typeFactory.createSqlType(SqlTypeName.VARCHAR),
typeFactory.createSqlType(SqlTypeName.OTHER)
),
ImmutableList.of("dataSource", "signature")
);
this.targetDataSource = targetDataSource;
this.signature = signature;
}
@Override
public boolean feature(final QueryFeature feature)
{
switch (feature) {
// INSERT queries should stick to groupBy, scan.
case CAN_RUN_TIMESERIES:
case CAN_RUN_TOPN:
case CAN_RUN_TIME_BOUNDARY:
return false;
// INSERT uses external data.
case CAN_READ_EXTERNAL_DATA:
return true;
// INSERT uses Scan + ORDER BY.
case SCAN_CAN_ORDER_BY_NON_TIME:
return true;
default:
throw new IAE("Unrecognized feature: %s", feature);
}
}
@Override
public RelDataType getResultType()
{
return resultType;
}
@Override
public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
{

View File

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.rel.RelRoot;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.NativeQueryMakerFactory;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.sql.calcite.table.RowSignatures;
public class TestQueryMakerFactory implements QueryMakerFactory
{
private final QueryLifecycleFactory queryLifecycleFactory;
private final ObjectMapper jsonMapper;
public TestQueryMakerFactory(
final QueryLifecycleFactory queryLifecycleFactory,
final ObjectMapper jsonMapper
)
{
this.queryLifecycleFactory = queryLifecycleFactory;
this.jsonMapper = jsonMapper;
}
@Override
public QueryMaker buildForSelect(RelRoot relRoot, PlannerContext plannerContext)
{
return new NativeQueryMakerFactory(queryLifecycleFactory, jsonMapper).buildForSelect(relRoot, plannerContext);
}
@Override
public QueryMaker buildForInsert(String targetDataSource, RelRoot relRoot, PlannerContext plannerContext)
{
final RowSignature signature = RowSignatures.fromRelDataType(
relRoot.validatedRowType.getFieldNames(),
relRoot.validatedRowType
);
return new TestInsertQueryMaker(relRoot.rel.getCluster().getTypeFactory(), targetDataSource, signature);
}
}

View File

@ -63,7 +63,6 @@ import org.joda.time.DateTimeZone;
import org.junit.Assert;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
@ -88,6 +87,7 @@ class ExpressionTestHelper
NamedViewSchema.NAME, new NamedViewSchema(EasyMock.createMock(ViewSchema.class))
)
),
null /* Don't need engine */,
new QueryContext()
);

View File

@ -23,12 +23,12 @@ import com.google.common.collect.ImmutableMap;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.NamedDruidSchema;
@ -42,8 +42,12 @@ import org.junit.Test;
public class ExternalTableScanRuleTest
{
@Test
public void testMatchesWhenExternalScanUnsupported() throws ValidationException
public void testMatchesWhenExternalScanUnsupported()
{
final NativeSqlEngine engine = CalciteTests.createMockSqlEngine(
EasyMock.createMock(QuerySegmentWalker.class),
EasyMock.createMock(QueryRunnerFactoryConglomerate.class)
);
final PlannerContext plannerContext = PlannerContext.create(
"DUMMY", // The actual query isn't important for this test
CalciteTests.createOperatorTable(),
@ -57,20 +61,17 @@ public class ExternalTableScanRuleTest
NamedViewSchema.NAME, new NamedViewSchema(EasyMock.createMock(ViewSchema.class))
)
),
engine,
new QueryContext()
);
plannerContext.setQueryMaker(
CalciteTests.createMockQueryMakerFactory(
EasyMock.createMock(QuerySegmentWalker.class),
EasyMock.createMock(QueryRunnerFactoryConglomerate.class)
)
.buildForSelect(EasyMock.createMock(RelRoot.class), plannerContext)
engine.buildQueryMakerForSelect(EasyMock.createMock(RelRoot.class), plannerContext)
);
ExternalTableScanRule rule = new ExternalTableScanRule(plannerContext);
rule.matches(EasyMock.createMock(RelOptRuleCall.class));
Assert.assertEquals(
"SQL query requires scanning external datasources that is not suported.",
"Cannot use 'EXTERN' with SQL engine 'native'.",
plannerContext.getPlanningError()
);
}

View File

@ -173,6 +173,7 @@ public class CalcitePlannerModuleTest extends CalciteTestBase
new DefaultObjectMapper(),
injector.getInstance(PlannerConfig.class),
rootSchema,
null,
new QueryContext()
);
boolean containsCustomRule = injector.getInstance(CalciteRulesManager.class)

View File

@ -96,6 +96,7 @@ public class DruidRexExecutorTest extends InitializedNullHandlingTest
NamedViewSchema.NAME, new NamedViewSchema(EasyMock.createMock(ViewSchema.class))
)
),
null /* Don't need an engine */,
new QueryContext()
);

View File

@ -114,11 +114,12 @@ import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.NativeQueryMakerFactory;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaManager;
@ -144,7 +145,6 @@ import org.joda.time.Duration;
import org.joda.time.chrono.ISOChronology;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@ -758,12 +758,12 @@ public class CalciteTests
public static final DruidViewMacroFactory DRUID_VIEW_MACRO_FACTORY = new TestDruidViewMacroFactory();
public static QueryMakerFactory createMockQueryMakerFactory(
public static NativeSqlEngine createMockSqlEngine(
final QuerySegmentWalker walker,
final QueryRunnerFactoryConglomerate conglomerate
)
{
return new NativeQueryMakerFactory(createMockQueryLifecycleFactory(walker, conglomerate), getJsonMapper());
return new NativeSqlEngine(createMockQueryLifecycleFactory(walker, conglomerate), getJsonMapper());
}
public static QueryLifecycleFactory createMockQueryLifecycleFactory(
@ -790,17 +790,21 @@ public class CalciteTests
);
}
public static SqlStatementFactory createSqlLifecycleFactory(final PlannerFactory plannerFactory)
public static SqlStatementFactory createSqlStatementFactory(
final SqlEngine engine,
final PlannerFactory plannerFactory
)
{
return createSqlLifecycleFactory(plannerFactory, new AuthConfig());
return createSqlStatementFactory(engine, plannerFactory, new AuthConfig());
}
public static SqlStatementFactory createSqlLifecycleFactory(
public static SqlStatementFactory createSqlStatementFactory(
final SqlEngine engine,
final PlannerFactory plannerFactory,
final AuthConfig authConfig
)
{
return new SqlStatementFactory(
return new SqlStatementFactoryFactory(
plannerFactory,
new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
new NoopRequestLogger(),
@ -808,7 +812,7 @@ public class CalciteTests
authConfig,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())),
new SqlLifecycleManager()
);
).factorize(engine);
}
public static ObjectMapper getJsonMapper()

View File

@ -73,7 +73,6 @@ import org.junit.runner.RunWith;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Map;
import java.util.Properties;

View File

@ -24,12 +24,17 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.google.inject.util.Providers;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.JSR311Resource;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Assert;
@ -46,7 +51,7 @@ public class SqlHttpModuleTest
@Mock
private ObjectMapper jsonMpper;
@Mock
private SqlStatementFactory sqlLifecycleFactory;
private SqlStatementFactoryFactory sqlStatementFactoryFactory;
private SqlHttpModule target;
private Injector injector;
@ -54,14 +59,20 @@ public class SqlHttpModuleTest
@Before
public void setUp()
{
EasyMock.expect(sqlStatementFactoryFactory.factorize(EasyMock.capture(Capture.newInstance())))
.andReturn(EasyMock.mock(SqlStatementFactory.class))
.anyTimes();
EasyMock.replay(sqlStatementFactoryFactory);
target = new SqlHttpModule();
injector = Guice.createInjector(
new LifecycleModule(),
new DruidGuiceExtensions(),
binder -> {
binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMpper);
binder.bind(SqlStatementFactory.class).toInstance(sqlLifecycleFactory);
binder.bind(SqlStatementFactoryFactory.class).toInstance(sqlStatementFactoryFactory);
binder.bind(AuthorizerMapper.class).toInstance(new AuthorizerMapper(Collections.emptyMap()));
binder.bind(NativeSqlEngine.class).toProvider(Providers.of(new NativeSqlEngine(null, null)));
},
target
);

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@ -73,10 +72,13 @@ import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.HttpStatement;
import org.apache.druid.sql.PreparedStatement;
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlPlanningException.PlanningError;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
@ -85,12 +87,15 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.planner.UnsupportedSQLQueryException;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -105,7 +110,6 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -153,7 +157,8 @@ public class SqlResourceTest extends CalciteTestBase
private HttpServletRequest req;
private ListeningExecutorService executorService;
private SqlLifecycleManager lifecycleManager;
private SqlStatementFactory sqlLifecycleFactory;
private NativeSqlEngine engine;
private SqlStatementFactory sqlStatementFactory;
private CountDownLatch lifecycleAddLatch;
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> validateAndAuthorizeLatchSupplier = new SettableSupplier<>();
@ -225,7 +230,6 @@ public class SqlResourceTest extends CalciteTestBase
final PlannerFactory plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryMakerFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
@ -249,15 +253,18 @@ public class SqlResourceTest extends CalciteTestBase
final ServiceEmitter emitter = new NoopServiceEmitter();
final AuthConfig authConfig = new AuthConfig();
final DefaultQueryConfig defaultQueryConfig = new DefaultQueryConfig(ImmutableMap.of());
sqlLifecycleFactory = new SqlStatementFactory(
engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
final SqlToolbox sqlToolbox = new SqlToolbox(
engine,
plannerFactory,
emitter,
testRequestLogger,
scheduler,
authConfig,
Suppliers.ofInstance(defaultQueryConfig),
defaultQueryConfig,
lifecycleManager
)
);
sqlStatementFactory = new SqlStatementFactory()
{
@Override
public HttpStatement httpStatement(
@ -266,7 +273,7 @@ public class SqlResourceTest extends CalciteTestBase
)
{
TestHttpStatement stmt = new TestHttpStatement(
lifecycleToolbox,
sqlToolbox,
sqlQuery,
req,
validateAndAuthorizeLatchSupplier,
@ -278,11 +285,23 @@ public class SqlResourceTest extends CalciteTestBase
onExecute = NULL_ACTION;
return stmt;
}
@Override
public DirectStatement directStatement(SqlQueryPlus sqlRequest)
{
throw new UnsupportedOperationException();
}
@Override
public PreparedStatement preparedStatement(SqlQueryPlus sqlRequest)
{
throw new UnsupportedOperationException();
}
};
resource = new SqlResource(
JSON_MAPPER,
CalciteTests.TEST_AUTHORIZER_MAPPER,
sqlLifecycleFactory,
sqlStatementFactory,
lifecycleManager,
new ServerConfig()
);
@ -667,7 +686,7 @@ public class SqlResourceTest extends CalciteTestBase
public void testArrayResultFormatWithHeader_nullColumnType() throws Exception
{
// Test a query that returns null header for some of the columns
final String query = "SELECT (1, 2)";
final String query = "SELECT (1, 2) FROM INFORMATION_SCHEMA.COLUMNS LIMIT 1";
Assert.assertEquals(
ImmutableList.of(
Collections.singletonList("EXPR$0"),
@ -783,7 +802,7 @@ public class SqlResourceTest extends CalciteTestBase
@Test
public void testArrayLinesResultFormatWithHeader_nullColumnType() throws Exception
{
final String query = "SELECT (1, 2)";
final String query = "SELECT (1, 2) FROM INFORMATION_SCHEMA.COLUMNS LIMIT 1";
final Pair<QueryException, String> pair = doPostRaw(
new SqlQuery(query, ResultFormat.ARRAYLINES, true, true, true, null, null)
);
@ -1040,7 +1059,7 @@ public class SqlResourceTest extends CalciteTestBase
@Test
public void testObjectLinesResultFormatWithFullHeader_nullColumnType() throws Exception
{
final String query = "SELECT (1, 2)";
final String query = "SELECT (1, 2) FROM INFORMATION_SCHEMA.COLUMNS LIMIT 1";
final Pair<QueryException, String> pair =
doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, true, true, true, null, null));
Assert.assertNull(pair.lhs);
@ -1116,7 +1135,7 @@ public class SqlResourceTest extends CalciteTestBase
@Test
public void testCsvResultFormatWithHeaders_nullColumnType() throws Exception
{
final String query = "SELECT (1, 2)";
final String query = "SELECT (1, 2) FROM INFORMATION_SCHEMA.COLUMNS LIMIT 1";
final Pair<QueryException, String> pair = doPostRaw(
new SqlQuery(query, ResultFormat.CSV, true, true, true, null, null)
);
@ -1209,7 +1228,7 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertEquals(PlanningError.UNSUPPORTED_SQL_ERROR.getErrorClass(), exception.getErrorClass());
Assert.assertTrue(
exception.getMessage()
.contains("Cannot build plan for query: SELECT dim1 FROM druid.foo ORDER BY dim1. " +
.contains("Cannot build plan for query. " +
"Possible error: SQL query requires order by non-time column [dim1 ASC] that is not supported.")
);
checkSqlRequestLog(false);
@ -1234,7 +1253,7 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertEquals(PlanningError.UNSUPPORTED_SQL_ERROR.getErrorClass(), exception.getErrorClass());
Assert.assertTrue(
exception.getMessage()
.contains("Cannot build plan for query: SELECT max(dim1) FROM druid.foo. " +
.contains("Cannot build plan for query. " +
"Possible error: Max aggregation is not supported for 'STRING' type")
);
checkSqlRequestLog(false);
@ -1346,7 +1365,7 @@ public class SqlResourceTest extends CalciteTestBase
resource = new SqlResource(
JSON_MAPPER,
CalciteTests.TEST_AUTHORIZER_MAPPER,
sqlLifecycleFactory,
sqlStatementFactory,
lifecycleManager,
new ServerConfig() {
@Override
@ -1391,7 +1410,7 @@ public class SqlResourceTest extends CalciteTestBase
resource = new SqlResource(
JSON_MAPPER,
CalciteTests.TEST_AUTHORIZER_MAPPER,
sqlLifecycleFactory,
sqlStatementFactory,
lifecycleManager,
new ServerConfig()
{
@ -1492,7 +1511,12 @@ public class SqlResourceTest extends CalciteTestBase
public void testQueryTimeoutException() throws Exception
{
final String sqlQueryId = "timeoutTest";
Map<String, Object> queryContext = ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1, BaseQuery.SQL_QUERY_ID, sqlQueryId);
Map<String, Object> queryContext = ImmutableMap.of(
QueryContexts.TIMEOUT_KEY,
1,
BaseQuery.SQL_QUERY_ID,
sqlQueryId
);
final QueryException timeoutException = doPost(
new SqlQuery(
"SELECT CAST(__time AS DATE), dim1, dim2, dim3 FROM druid.foo GROUP by __time, dim1, dim2, dim3 ORDER BY dim2 DESC",
@ -1626,7 +1650,12 @@ public class SqlResourceTest extends CalciteTestBase
public void testQueryContextException() throws Exception
{
final String sqlQueryId = "badQueryContextTimeout";
Map<String, Object> queryContext = ImmutableMap.of(QueryContexts.TIMEOUT_KEY, "2000'", BaseQuery.SQL_QUERY_ID, sqlQueryId);
Map<String, Object> queryContext = ImmutableMap.of(
QueryContexts.TIMEOUT_KEY,
"2000'",
BaseQuery.SQL_QUERY_ID,
sqlQueryId
);
final QueryException queryContextException = doPost(
new SqlQuery(
"SELECT 1337",
@ -1646,6 +1675,31 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertTrue(lifecycleManager.getAll(sqlQueryId).isEmpty());
}
@Test
public void testQueryContextKeyNotAllowed() throws Exception
{
Map<String, Object> queryContext = ImmutableMap.of(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, "all");
final QueryException queryContextException = doPost(
new SqlQuery(
"SELECT 1337",
ResultFormat.OBJECT,
false,
false,
false,
queryContext,
null
)
).lhs;
Assert.assertNotNull(queryContextException);
Assert.assertEquals(PlanningError.VALIDATION_ERROR.getErrorCode(), queryContextException.getErrorCode());
MatcherAssert.assertThat(
queryContextException.getMessage(),
CoreMatchers.containsString(
"Cannot execute query with context parameter [sqlInsertSegmentGranularity]")
);
checkSqlRequestLog(false);
}
@SuppressWarnings("unchecked")
private void checkSqlRequestLog(boolean success)
{