SQL: Add context and contextual functions to planner. (#3919)

* SQL: Add context and contextual functions to planner.

Added support for context parameters specified as JDBC connection properties
or a JSON object for SQL-over-JSON-over-HTTP.

Also added features that depend on context functionality:

- Added CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP functions.
- Added support for time zones other than UTC via a "timeZone" context.
- Pass down query context to Druid queries too.

Also some bug fixes:

- Fix DATE handling, it was largely done incorrectly before.
- Fix CAST(__time TO DATE) which should do a floor-to-day.
- Fix non-equality comparisons to FLOOR(__time TO X).
- Fix maxQueryCount property.

* Pass down context to nested queries too.
This commit is contained in:
Gian Merlino 2017-02-15 14:09:14 -08:00 committed by Jonathan Wei
parent 3c54fc912a
commit 16ef513c7d
45 changed files with 1940 additions and 574 deletions

View File

@ -48,10 +48,10 @@ import io.druid.segment.TestHelper;
import io.druid.segment.column.ValueType;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.table.RowSignature;
import io.druid.sql.calcite.util.CalciteTests;
@ -61,7 +61,6 @@ import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.tools.Planner;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
@ -157,7 +156,6 @@ public class SqlBenchmark
final Map<String, Table> tableMap = ImmutableMap.<String, Table>of(
"foo",
new DruidTable(
new QueryMaker(walker, plannerConfig),
new TableDataSource("foo"),
RowSignature.builder()
.add("__time", ValueType.LONG)
@ -177,6 +175,7 @@ public class SqlBenchmark
};
plannerFactory = new PlannerFactory(
Calcites.createRootSchema(druidSchema),
walker,
CalciteTests.createOperatorTable(),
plannerConfig
);
@ -233,8 +232,8 @@ public class SqlBenchmark
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryPlanner(Blackhole blackhole) throws Exception
{
try (final Planner planner = plannerFactory.createPlanner()) {
final PlannerResult plannerResult = Calcites.plan(planner, sqlQuery);
try (final DruidPlanner planner = plannerFactory.createPlanner(null)) {
final PlannerResult plannerResult = planner.plan(sqlQuery);
final ArrayList<Object[]> results = Sequences.toList(plannerResult.run(), Lists.<Object[]>newArrayList());
blackhole.consume(results);
}

View File

@ -5,7 +5,7 @@ layout: doc_page
Query Context
=============
The query context is used for various query configuration parameters.
The query context is used for various query configuration parameters. The following parameters apply to all queries.
|property |default | description |
|-----------------|---------------------|----------------------|
@ -17,7 +17,21 @@ The query context is used for various query configuration parameters.
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. |
In addition, some query types offer context parameters specific to that query type.
### TopN queries
|property |default | description |
|-----------------|---------------------|----------------------|
|minTopNThreshold | `1000` | The top minTopNThreshold local results from each segment are returned for merging to determine the global topN. |
|`maxResults`|500000|Maximum number of results groupBy query can process. Default value used can be changed by `druid.query.groupBy.maxResults` in druid configuration at broker and historical nodes. At query time you can only lower the value.|
|`maxIntermediateRows`|50000|Maximum number of intermediate rows while processing single segment for groupBy query. Default value used can be changed by `druid.query.groupBy.maxIntermediateRows` in druid configuration at broker and historical nodes. At query time you can only lower the value.|
|`groupByIsSingleThreaded`|false|Whether to run single threaded group By queries. Default value used can be changed by `druid.query.groupBy.singleThreaded` in druid configuration at historical nodes.|
### Timeseries queries
|property |default | description |
|-----------------|---------------------|----------------------|
|skipEmptyBuckets | `false` | Disable timeseries zero-filling behavior, so only buckets with results will be returned. |
### GroupBy queries
See [GroupBy query context](groupbyquery.html#query-context).

View File

@ -31,14 +31,24 @@ jdbc:avatica:remote:url=http://BROKER:8082/druid/v2/sql/avatica/
Example code:
```java
Connection connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/");
ResultSet resultSet = connection.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM data_source");
while (resultSet.next()) {
// Do something
// Connect to /druid/v2/sql/avatica/ on your broker.
String url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/";
// Set any connection context parameters you need here (see "Connection context" below).
// Or leave empty for default behavior.
Properties connectionProperties = new Properties();
try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
try (ResultSet resultSet = connection.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM data_source")) {
while (resultSet.next()) {
// Do something
}
}
}
```
Table metadata is available over JDBC using `connection.getMetaData()`.
Table metadata is available over JDBC using `connection.getMetaData()` or by querying the "INFORMATION_SCHEMA" tables
(see below).
Parameterized queries don't work properly, so avoid those.
@ -61,6 +71,17 @@ curl -XPOST -H'Content-Type: application/json' http://BROKER:8082/druid/v2/sql/
Metadata is only available over the HTTP API by querying the "INFORMATION_SCHEMA" tables (see below).
You can provide [connection context parameters](#connection-context) by adding a "context" map, like:
```json
{
"query" : "SELECT COUNT(*) FROM data_source WHERE foo = 'bar' AND __time > TIMESTAMP '2000-01-01 00:00:00'",
"context" : {
"sqlTimeZone" : "America/Los_Angeles"
}
}
```
### Metadata
Druid brokers cache column type metadata for each dataSource and use it to plan SQL queries. This cache is updated
@ -77,7 +98,7 @@ SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE SCHEMA_NAME = 'druid' AND TABLE_N
See the [INFORMATION_SCHEMA tables](#information_schema-tables) section below for details on the available metadata.
You can also access table and column metadata through JDBC using `connection.getMetaData()`.
You can access table and column metadata through JDBC using `connection.getMetaData()`.
### Approximate queries
@ -91,8 +112,8 @@ algorithm.
- TopN-style queries with a single grouping column, like
`SELECT col1, SUM(col2) FROM data_source GROUP BY col1 ORDER BY SUM(col2) DESC LIMIT 100`, by default will be executed
as [TopN queries](topnquery.html), which use an approximate algorithm. To disable this behavior, and use exact
algorithms for topN-style queries, set
[druid.sql.planner.useApproximateTopN](../configuration/broker.html#sql-planner-configuration) to "false".
algorithms for topN-style queries, set "useApproximateTopN" to "false", either through query context or through broker
configuration.
### Time functions
@ -101,6 +122,10 @@ Druid's SQL language supports a number of time operations, including:
- `FLOOR(__time TO <granularity>)` for grouping or filtering on time buckets, like `SELECT FLOOR(__time TO MONTH), SUM(cnt) FROM data_source GROUP BY FLOOR(__time TO MONTH)`
- `EXTRACT(<granularity> FROM __time)` for grouping or filtering on time parts, like `SELECT EXTRACT(HOUR FROM __time), SUM(cnt) FROM data_source GROUP BY EXTRACT(HOUR FROM __time)`
- Comparisons to `TIMESTAMP '<time string>'` for time filters, like `SELECT COUNT(*) FROM data_source WHERE __time >= TIMESTAMP '2000-01-01 00:00:00' AND __time < TIMESTAMP '2001-01-01 00:00:00'`
- `CURRENT_TIMESTAMP` for the current time, usable in filters like `SELECT COUNT(*) FROM data_source WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR`
By default, time operations use the UTC time zone. You can change the time zone for time operations by setting the
connection context parameter "sqlTimeZone" to the name of the time zone, like "America/Los_Angeles".
### Subqueries
@ -138,6 +163,21 @@ For this query, the broker will first translate the inner select on data_source_
configuration parameter `druid.sql.planner.maxSemiJoinRowsInMemory` controls the maximum number of values that will be
materialized for this kind of plan.
### Connection context
Druid's SQL layer supports a connection context that influences SQL query planning and Druid native query execution.
The parameters in the table below affect SQL planning. All other context parameters you provide will be attached to
Druid queries and can affect how they run. See [Query context](query-context.html) for details on the possible options.
|Parameter|Description|Default value|
|---------|-----------|-------------|
|`sqlTimeZone`|Sets the time zone for this connection. Should be a time zone name like "America/Los_Angeles".|UTC|
|`useApproximateCountDistinct`|Whether to use an approximate cardinalty algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the broker|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.html) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.html) will be used instead.|druid.sql.planner.useApproximateTopN on the broker|
|`useFallback`|Whether to evaluate operations on the broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|druid.sql.planner.useFallback on the broker|
Connection context can be specified as JDBC connection properties or as a "context" object in the JSON API.
### Configuration
Druid's SQL layer can be configured on the [Broker node](../configuration/broker.html#sql-planner-configuration).

View File

@ -33,6 +33,7 @@ import io.druid.sql.calcite.aggregation.Aggregations;
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.expression.Expressions;
import io.druid.sql.calcite.expression.RowExtraction;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
@ -63,6 +64,7 @@ public class QuantileSqlAggregator implements SqlAggregator
public Aggregation toDruidAggregation(
final String name,
final RowSignature rowSignature,
final PlannerContext plannerContext,
final List<Aggregation> existingAggregations,
final Project project,
final AggregateCall aggregateCall,
@ -70,6 +72,7 @@ public class QuantileSqlAggregator implements SqlAggregator
)
{
final RowExtraction rex = Expressions.toRowExtraction(
plannerContext,
rowSignature.getRowOrder(),
Expressions.fromFieldAccess(
rowSignature,

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation.histogram.sql;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.druid.granularity.QueryGranularities;
@ -41,11 +42,11 @@ import io.druid.segment.IndexBuilder;
import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.sql.calcite.CalciteQueryTest;
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.filtration.Filtration;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
@ -55,7 +56,6 @@ import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Planner;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -134,7 +134,7 @@ public class QuantileSqlAggregatorTest
new QuantileSqlAggregator()
)
);
plannerFactory = new PlannerFactory(rootSchema, operatorTable, plannerConfig);
plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig);
}
@After
@ -147,7 +147,7 @@ public class QuantileSqlAggregatorTest
@Test
public void testQuantileOnFloatAndLongs() throws Exception
{
try (final Planner planner = plannerFactory.createPlanner()) {
try (final DruidPlanner planner = plannerFactory.createPlanner(null)) {
final String sql = "SELECT\n"
+ "APPROX_QUANTILE(m1, 0.01),\n"
+ "APPROX_QUANTILE(m1, 0.5, 50),\n"
@ -159,7 +159,7 @@ public class QuantileSqlAggregatorTest
+ "APPROX_QUANTILE(cnt, 0.5)\n"
+ "FROM foo";
final PlannerResult plannerResult = Calcites.plan(planner, sql);
final PlannerResult plannerResult = planner.plan(sql);
// Verify results
final List<Object[]> results = Sequences.toList(plannerResult.run(), new ArrayList<Object[]>());
@ -200,7 +200,7 @@ public class QuantileSqlAggregatorTest
new QuantilePostAggregator("a6", "a4:agg", 0.999f),
new QuantilePostAggregator("a7", "a7:agg", 0.50f)
))
.context(CalciteQueryTest.TIMESERIES_CONTEXT)
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
@ -210,7 +210,7 @@ public class QuantileSqlAggregatorTest
@Test
public void testQuantileOnComplexColumn() throws Exception
{
try (final Planner planner = plannerFactory.createPlanner()) {
try (final DruidPlanner planner = plannerFactory.createPlanner(null)) {
final String sql = "SELECT\n"
+ "APPROX_QUANTILE(hist_m1, 0.01),\n"
+ "APPROX_QUANTILE(hist_m1, 0.5, 50),\n"
@ -221,7 +221,7 @@ public class QuantileSqlAggregatorTest
+ "APPROX_QUANTILE(hist_m1, 0.999) FILTER(WHERE dim1 = 'abc')\n"
+ "FROM foo";
final PlannerResult plannerResult = Calcites.plan(planner, sql);
final PlannerResult plannerResult = planner.plan(sql);
// Verify results
final List<Object[]> results = Sequences.toList(plannerResult.run(), new ArrayList<Object[]>());
@ -260,7 +260,7 @@ public class QuantileSqlAggregatorTest
new QuantilePostAggregator("a5", "a5:agg", 0.999f),
new QuantilePostAggregator("a6", "a4:agg", 0.999f)
))
.context(CalciteQueryTest.TIMESERIES_CONTEXT)
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);

View File

@ -19,6 +19,8 @@
package io.druid.sql.avatica;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
@ -28,14 +30,21 @@ import java.util.concurrent.Future;
*/
public class DruidConnection
{
private final Map<String, Object> context;
private final Map<Integer, DruidStatement> statements;
private Future<?> timeoutFuture;
public DruidConnection()
public DruidConnection(final Map<String, Object> context)
{
this.context = ImmutableMap.copyOf(context);
this.statements = new HashMap<>();
}
public Map<String, Object> context()
{
return context;
}
public Map<Integer, DruidStatement> statements()
{
return statements;

View File

@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -45,6 +46,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -53,6 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class DruidMeta extends MetaImpl
{
private static final Logger log = new Logger(DruidMeta.class);
private static final Set<String> SKIP_PROPERTIES = ImmutableSet.of("user", "password");
private final PlannerFactory plannerFactory;
private final ScheduledExecutorService exec;
@ -82,7 +85,14 @@ public class DruidMeta extends MetaImpl
@Override
public void openConnection(final ConnectionHandle ch, final Map<String, String> info)
{
getDruidConnection(ch.id, true);
// Build connection context.
final ImmutableMap.Builder<String, Object> context = ImmutableMap.builder();
for (Map.Entry<String, String> entry : info.entrySet()) {
if (!SKIP_PROPERTIES.contains(entry.getKey())) {
context.put(entry);
}
}
openDruidConnection(ch.id, context.build());
}
@Override
@ -136,7 +146,7 @@ public class DruidMeta extends MetaImpl
throw new ISE("Too many open statements, limit is[%,d]", config.getMaxStatementsPerConnection());
}
connection.statements().put(statement.id, new DruidStatement(ch.id, statement.id));
connection.statements().put(statement.id, new DruidStatement(ch.id, statement.id, connection.context()));
log.debug("Connection[%s] opened statement[%s].", ch.id, statement.id);
return statement;
}
@ -483,35 +493,38 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
private DruidConnection getDruidConnection(final String connectionId)
private DruidConnection openDruidConnection(final String connectionId, final Map<String, Object> context)
{
return getDruidConnection(connectionId, false);
synchronized (connections) {
if (connections.containsKey(connectionId)) {
throw new ISE("Connection[%s] already open.", connectionId);
}
if (connections.size() >= config.getMaxConnections()) {
throw new ISE("Too many connections, limit is[%,d]", config.getMaxConnections());
}
connections.put(connectionId, new DruidConnection(context));
log.debug("Connection[%s] opened.", connectionId);
// Call getDruidConnection to start the timeout timer.
return getDruidConnection(connectionId);
}
}
private DruidConnection getDruidConnection(final String connectionId, final boolean createIfNotExists)
private DruidConnection getDruidConnection(final String connectionId)
{
DruidConnection connection;
final DruidConnection connection;
synchronized (connections) {
connection = connections.get(connectionId);
if (connection == null && createIfNotExists) {
if (connections.size() >= config.getMaxConnections()) {
throw new ISE("Too many connections, limit is[%,d]", config.getMaxConnections());
}
connection = new DruidConnection();
connections.put(connectionId, connection);
log.debug("Connection[%s] opened.", connectionId);
}
if (connection == null) {
throw new ISE("Connection[%s] not open", connectionId);
}
}
final DruidConnection finalConnection = connection;
return finalConnection.sync(
return connection.sync(
exec.schedule(
new Runnable()
{
@ -521,8 +534,8 @@ public class DruidMeta extends MetaImpl
final List<DruidStatement> statements = new ArrayList<>();
synchronized (connections) {
if (connections.remove(connectionId) == finalConnection) {
statements.addAll(finalConnection.statements().values());
if (connections.remove(connectionId) == connection) {
statements.addAll(connection.statements().values());
log.debug("Connection[%s] timed out, closing %,d statements.", connectionId, statements.size());
}
}

View File

@ -26,7 +26,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
import io.druid.sql.calcite.rel.QueryMaker;
@ -35,7 +35,6 @@ import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.tools.Planner;
import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
@ -43,6 +42,7 @@ import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Statement handle for {@link DruidMeta}. Thread-safe.
@ -61,6 +61,7 @@ public class DruidStatement implements Closeable
private final String connectionId;
private final int statementId;
private final Map<String, Object> queryContext;
private final Object lock = new Object();
private State state = State.NEW;
@ -71,10 +72,11 @@ public class DruidStatement implements Closeable
private Yielder<Object[]> yielder;
private int offset = 0;
public DruidStatement(final String connectionId, final int statementId)
public DruidStatement(final String connectionId, final int statementId, final Map<String, Object> queryContext)
{
this.connectionId = connectionId;
this.statementId = statementId;
this.queryContext = queryContext;
}
public static List<ColumnMetaData> createColumnMetaData(final RelDataType rowType)
@ -123,10 +125,10 @@ public class DruidStatement implements Closeable
public DruidStatement prepare(final PlannerFactory plannerFactory, final String query, final long maxRowCount)
{
try (final Planner planner = plannerFactory.createPlanner()) {
try (final DruidPlanner planner = plannerFactory.createPlanner(queryContext)) {
synchronized (lock) {
ensure(State.NEW);
this.plannerResult = Calcites.plan(planner, query);
this.plannerResult = planner.plan(query);
this.maxRowCount = maxRowCount;
this.query = query;
this.signature = Meta.Signature.create(

View File

@ -33,6 +33,7 @@ import io.druid.segment.column.ValueType;
import io.druid.sql.calcite.expression.Expressions;
import io.druid.sql.calcite.expression.RowExtraction;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
@ -62,6 +63,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
public Aggregation toDruidAggregation(
final String name,
final RowSignature rowSignature,
final PlannerContext plannerContext,
final List<Aggregation> existingAggregations,
final Project project,
final AggregateCall aggregateCall,
@ -74,6 +76,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
Iterables.getOnlyElement(aggregateCall.getArgList())
);
final RowExtraction rex = Expressions.toRowExtraction(
plannerContext,
rowSignature.getRowOrder(),
rexNode
);

View File

@ -20,6 +20,7 @@
package io.druid.sql.calcite.aggregation;
import io.druid.query.filter.DimFilter;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
@ -45,6 +46,7 @@ public interface SqlAggregator
*
* @param name desired output name of the aggregation
* @param rowSignature signature of the rows being aggregated
* @param plannerContext SQL planner context
* @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
* ignored if you do not want to re-use existing aggregations.
* @param project SQL projection to apply before the aggregate call, may be null
@ -57,6 +59,7 @@ public interface SqlAggregator
Aggregation toDruidAggregation(
final String name,
final RowSignature rowSignature,
final PlannerContext plannerContext,
final List<Aggregation> existingAggregations,
final Project project,
final AggregateCall aggregateCall,

View File

@ -20,6 +20,7 @@
package io.druid.sql.calcite.expression;
import io.druid.query.extraction.StrlenExtractionFn;
import io.druid.sql.calcite.planner.PlannerContext;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
@ -43,12 +44,13 @@ public class CharLengthExpressionConversion extends AbstractExpressionConversion
@Override
public RowExtraction convert(
final ExpressionConverter converter,
final PlannerContext plannerContext,
final List<String> rowOrder,
final RexNode expression
)
{
final RexCall call = (RexCall) expression;
final RowExtraction arg = converter.convert(rowOrder, call.getOperands().get(0));
final RowExtraction arg = converter.convert(plannerContext, rowOrder, call.getOperands().get(0));
if (arg == null) {
return null;
}

View File

@ -19,6 +19,7 @@
package io.druid.sql.calcite.expression;
import io.druid.sql.calcite.planner.PlannerContext;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
@ -44,11 +45,17 @@ public interface ExpressionConversion
* Translate a row-expression to a Druid column reference. Note that this signature will probably need to change
* once we support extractions from multiple columns.
*
* @param converter converter that can be used to convert sub-expressions
* @param rowOrder order of fields in the Druid rows to be extracted from
* @param expression expression meant to be applied on top of the table
* @param converter converter that can be used to convert sub-expressions
* @param plannerContext SQL planner context
* @param rowOrder order of fields in the Druid rows to be extracted from
* @param expression expression meant to be applied on top of the table
*
* @return (columnName, extractionFn) or null
*/
RowExtraction convert(ExpressionConverter converter, List<String> rowOrder, RexNode expression);
RowExtraction convert(
ExpressionConverter converter,
PlannerContext plannerContext,
List<String> rowOrder,
RexNode expression
);
}

View File

@ -21,10 +21,13 @@ package io.druid.sql.calcite.expression;
import com.google.common.collect.Maps;
import io.druid.java.util.common.ISE;
import io.druid.sql.calcite.planner.PlannerContext;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import java.util.List;
import java.util.Map;
@ -72,12 +75,13 @@ public class ExpressionConverter
* Translate a row-expression to a Druid row extraction. Note that this signature will probably need to change
* once we support extractions from multiple columns.
*
* @param rowOrder order of fields in the Druid rows to be extracted from
* @param expression expression meant to be applied on top of the table
* @param plannerContext SQL planner context
* @param rowOrder order of fields in the Druid rows to be extracted from
* @param expression expression meant to be applied on top of the table
*
* @return (columnName, extractionFn) or null
*/
public RowExtraction convert(List<String> rowOrder, RexNode expression)
public RowExtraction convert(PlannerContext plannerContext, List<String> rowOrder, RexNode expression)
{
if (expression.getKind() == SqlKind.INPUT_REF) {
final RexInputRef ref = (RexInputRef) expression;
@ -88,18 +92,29 @@ public class ExpressionConverter
return RowExtraction.of(columnName, null);
} else if (expression.getKind() == SqlKind.CAST) {
// TODO(gianm): Probably not a good idea to ignore CAST like this.
return convert(rowOrder, ((RexCall) expression).getOperands().get(0));
final RexNode operand = ((RexCall) expression).getOperands().get(0);
if (expression.getType().getSqlTypeName() == SqlTypeName.DATE
&& operand.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP) {
// Handling casting TIMESTAMP to DATE by flooring to DAY.
return FloorExpressionConversion.applyTimestampFloor(
convert(plannerContext, rowOrder, operand),
TimeUnits.toQueryGranularity(TimeUnitRange.DAY, plannerContext.getTimeZone())
);
} else {
// Ignore other casts.
// TODO(gianm): Probably not a good idea to ignore other CASTs like this.
return convert(plannerContext, rowOrder, ((RexCall) expression).getOperands().get(0));
}
} else {
// Try conversion using an ExpressionConversion specific to this operator.
final RowExtraction retVal;
if (expression.getKind() == SqlKind.OTHER_FUNCTION) {
final ExpressionConversion conversion = otherFunctionMap.get(((RexCall) expression).getOperator().getName());
retVal = conversion != null ? conversion.convert(this, rowOrder, expression) : null;
retVal = conversion != null ? conversion.convert(this, plannerContext, rowOrder, expression) : null;
} else {
final ExpressionConversion conversion = kindMap.get(expression.getKind());
retVal = conversion != null ? conversion.convert(this, rowOrder, expression) : null;
retVal = conversion != null ? conversion.convert(this, plannerContext, rowOrder, expression) : null;
}
return retVal;

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Chars;
import io.druid.granularity.QueryGranularity;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.math.expr.ExprType;
import io.druid.query.aggregation.PostAggregator;
@ -36,8 +37,8 @@ import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.ExpressionPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.extraction.TimeFormatExtractionFn;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.LikeDimFilter;
import io.druid.query.filter.NotDimFilter;
@ -46,7 +47,11 @@ import io.druid.query.ordering.StringComparator;
import io.druid.query.ordering.StringComparators;
import io.druid.segment.column.Column;
import io.druid.sql.calcite.aggregation.PostAggregatorFactory;
import io.druid.sql.calcite.filtration.BoundRefKey;
import io.druid.sql.calcite.filtration.Bounds;
import io.druid.sql.calcite.filtration.Filtration;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.core.Project;
@ -56,6 +61,8 @@ import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import java.util.Calendar;
import java.util.List;
@ -138,22 +145,24 @@ public class Expressions
* Translate a Calcite row-expression to a Druid row extraction. Note that this signature will probably need to
* change once we support extractions from multiple columns.
*
* @param rowOrder order of fields in the Druid rows to be extracted from
* @param expression expression meant to be applied on top of the rows
* @param plannerContext SQL planner context
* @param rowOrder order of fields in the Druid rows to be extracted from
* @param expression expression meant to be applied on top of the rows
*
* @return RowExtraction or null if not possible
*/
public static RowExtraction toRowExtraction(
final PlannerContext plannerContext,
final List<String> rowOrder,
final RexNode expression
)
{
return EXPRESSION_CONVERTER.convert(rowOrder, expression);
return EXPRESSION_CONVERTER.convert(plannerContext, rowOrder, expression);
}
/**
* Translate a Calcite row-expression to a Druid PostAggregator. One day, when possible, this could be folded
* into {@link #toRowExtraction(List, RexNode)}.
* into {@link #toRowExtraction(PlannerContext, List, RexNode)}.
*
* @param name name of the PostAggregator
* @param rowOrder order of fields in the Druid rows to be extracted from
@ -231,7 +240,7 @@ public class Expressions
/**
* Translate a row-expression to a Druid math expression. One day, when possible, this could be folded into
* {@link #toRowExtraction(List, RexNode)}.
* {@link #toRowExtraction(PlannerContext, List, RexNode)}.
*
* @param rowOrder order of fields in the Druid rows to be extracted from
* @param expression expression meant to be applied on top of the rows
@ -329,13 +338,35 @@ public class Expressions
}
}
/**
* Translates "literal" (a TIMESTAMP or DATE literal) to milliseconds since the epoch using the provided
* session time zone.
*
* @param literal TIMESTAMP or DATE literal
* @param timeZone session time zone
*
* @return milliseconds time
*/
public static long toMillisLiteral(final RexNode literal, final DateTimeZone timeZone)
{
final SqlTypeName typeName = literal.getType().getSqlTypeName();
if (literal.getKind() != SqlKind.LITERAL || (typeName != SqlTypeName.TIMESTAMP && typeName != SqlTypeName.DATE)) {
throw new IAE("Expected TIMESTAMP or DATE literal but got[%s:%s]", literal.getKind(), typeName);
}
final Calendar calendar = (Calendar) RexLiteral.value(literal);
return Calcites.calciteTimestampToJoda(calendar.getTimeInMillis(), timeZone).getMillis();
}
/**
* Translates "condition" to a Druid filter, or returns null if we cannot translate the condition.
*
* @param rowSignature row signature of the dataSource to be filtered
* @param expression Calcite row expression
* @param plannerContext planner context
* @param rowSignature row signature of the dataSource to be filtered
* @param expression Calcite row expression
*/
public static DimFilter toFilter(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RexNode expression
)
@ -345,7 +376,7 @@ public class Expressions
|| expression.getKind() == SqlKind.NOT) {
final List<DimFilter> filters = Lists.newArrayList();
for (final RexNode rexNode : ((RexCall) expression).getOperands()) {
final DimFilter nextFilter = toFilter(rowSignature, rexNode);
final DimFilter nextFilter = toFilter(plannerContext, rowSignature, rexNode);
if (nextFilter == null) {
return null;
}
@ -362,7 +393,7 @@ public class Expressions
}
} else {
// Handle filter conditions on everything else.
return toLeafFilter(rowSignature, expression);
return toLeafFilter(plannerContext, rowSignature, expression);
}
}
@ -370,10 +401,12 @@ public class Expressions
* Translates "condition" to a Druid filter, assuming it does not contain any boolean expressions. Returns null
* if we cannot translate the condition.
*
* @param rowSignature row signature of the dataSource to be filtered
* @param expression Calcite row expression
* @param plannerContext planner context
* @param rowSignature row signature of the dataSource to be filtered
* @param expression Calcite row expression
*/
private static DimFilter toLeafFilter(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RexNode expression
)
@ -388,7 +421,11 @@ public class Expressions
if (kind == SqlKind.LIKE) {
final List<RexNode> operands = ((RexCall) expression).getOperands();
final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowSignature.getRowOrder(), operands.get(0));
final RowExtraction rex = EXPRESSION_CONVERTER.convert(
plannerContext,
rowSignature.getRowOrder(),
operands.get(0)
);
if (rex == null || !rex.isFilterable(rowSignature)) {
return null;
}
@ -424,7 +461,7 @@ public class Expressions
}
// lhs must be translatable to a RowExtraction to be filterable
final RowExtraction rex = EXPRESSION_CONVERTER.convert(rowSignature.getRowOrder(), lhs);
final RowExtraction rex = EXPRESSION_CONVERTER.convert(plannerContext, rowSignature.getRowOrder(), lhs);
if (rex == null || !rex.isFilterable(rowSignature)) {
return null;
}
@ -432,25 +469,48 @@ public class Expressions
final String column = rex.getColumn();
final ExtractionFn extractionFn = rex.getExtractionFn();
if (column.equals(Column.TIME_COLUMN_NAME) && ExtractionFns.toQueryGranularity(extractionFn) != null) {
// lhs is FLOOR(__time TO gran); convert to range
final QueryGranularity gran = ExtractionFns.toQueryGranularity(extractionFn);
final long rhsMillis = ((Calendar) RexLiteral.value(rhs)).getTimeInMillis();
if (gran.truncate(rhsMillis) != rhsMillis) {
// Nothing matches.
return Filtration.matchNothing();
} else {
// Match any __time within the granular bucket.
return new BoundDimFilter(
Column.TIME_COLUMN_NAME,
String.valueOf(gran.truncate(rhsMillis)),
String.valueOf(gran.next(gran.truncate(rhsMillis))),
false,
true,
null,
null,
StringComparators.NUMERIC
if (column.equals(Column.TIME_COLUMN_NAME) && extractionFn instanceof TimeFormatExtractionFn) {
// Check if we can strip the extractionFn and convert the filter to a direct filter on __time.
// This allows potential conversion to query-level "intervals" later on, which is ideal for Druid queries.
final QueryGranularity granularity = ExtractionFns.toQueryGranularity(extractionFn);
if (granularity != null) {
// lhs is FLOOR(__time TO granularity); rhs must be a timestamp
final long rhsMillis = toMillisLiteral(rhs, plannerContext.getTimeZone());
final Interval rhsInterval = new Interval(
granularity.truncate(rhsMillis),
granularity.next(granularity.truncate(rhsMillis))
);
// Is rhs aligned on granularity boundaries?
final boolean rhsAligned = rhsInterval.getStartMillis() == rhsMillis;
// Create a BoundRefKey that strips the extractionFn and compares __time as a number.
final BoundRefKey boundRefKey = new BoundRefKey(column, null, StringComparators.NUMERIC);
if (kind == SqlKind.EQUALS) {
return rhsAligned
? Bounds.interval(boundRefKey, rhsInterval)
: Filtration.matchNothing();
} else if (kind == SqlKind.NOT_EQUALS) {
return rhsAligned
? new NotDimFilter(Bounds.interval(boundRefKey, rhsInterval))
: Filtration.matchEverything();
} else if ((!flip && kind == SqlKind.GREATER_THAN) || (flip && kind == SqlKind.LESS_THAN)) {
return Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
} else if ((!flip && kind == SqlKind.GREATER_THAN_OR_EQUAL) || (flip && kind == SqlKind.LESS_THAN_OR_EQUAL)) {
return rhsAligned
? Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getStartMillis()))
: Bounds.greaterThanOrEqualTo(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
} else if ((!flip && kind == SqlKind.LESS_THAN) || (flip && kind == SqlKind.GREATER_THAN)) {
return rhsAligned
? Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getStartMillis()))
: Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
} else if ((!flip && kind == SqlKind.LESS_THAN_OR_EQUAL) || (flip && kind == SqlKind.GREATER_THAN_OR_EQUAL)) {
return Bounds.lessThan(boundRefKey, String.valueOf(rhsInterval.getEndMillis()));
} else {
throw new IllegalStateException("WTF?! Shouldn't have got here...");
}
}
}
@ -458,37 +518,37 @@ public class Expressions
final RexLiteral rhsLiteral = (RexLiteral) rhs;
if (SqlTypeName.NUMERIC_TYPES.contains(rhsLiteral.getTypeName())) {
val = String.valueOf(RexLiteral.value(rhsLiteral));
} else if (rhsLiteral.getTypeName() == SqlTypeName.CHAR) {
} else if (SqlTypeName.CHAR_TYPES.contains(rhsLiteral.getTypeName())) {
val = String.valueOf(RexLiteral.stringValue(rhsLiteral));
} else if (SqlTypeName.DATETIME_TYPES.contains(rhsLiteral.getTypeName())) {
val = String.valueOf(((Calendar) RexLiteral.value(rhsLiteral)).getTimeInMillis());
} else if (SqlTypeName.TIMESTAMP == rhsLiteral.getTypeName() || SqlTypeName.DATE == rhsLiteral.getTypeName()) {
val = String.valueOf(toMillisLiteral(rhsLiteral, plannerContext.getTimeZone()));
} else {
// Hope for the best.
val = String.valueOf(RexLiteral.value(rhsLiteral));
// Don't know how to filter on this kind of literal.
return null;
}
// Numeric lhs needs a numeric comparison.
final boolean lhsIsNumeric = SqlTypeName.NUMERIC_TYPES.contains(lhs.getType().getSqlTypeName())
|| SqlTypeName.DATETIME_TYPES.contains(lhs.getType().getSqlTypeName());
|| SqlTypeName.TIMESTAMP == lhs.getType().getSqlTypeName()
|| SqlTypeName.DATE == lhs.getType().getSqlTypeName();
final StringComparator comparator = lhsIsNumeric ? StringComparators.NUMERIC : StringComparators.LEXICOGRAPHIC;
final BoundRefKey boundRefKey = new BoundRefKey(column, extractionFn, comparator);
final DimFilter filter;
// Always use BoundDimFilters, to simplify filter optimization later (it helps to remember the comparator).
if (kind == SqlKind.EQUALS) {
filter = new BoundDimFilter(column, val, val, false, false, null, extractionFn, comparator);
filter = Bounds.equalTo(boundRefKey, val);
} else if (kind == SqlKind.NOT_EQUALS) {
filter = new NotDimFilter(
new BoundDimFilter(column, val, val, false, false, null, extractionFn, comparator)
);
filter = new NotDimFilter(Bounds.equalTo(boundRefKey, val));
} else if ((!flip && kind == SqlKind.GREATER_THAN) || (flip && kind == SqlKind.LESS_THAN)) {
filter = new BoundDimFilter(column, val, null, true, false, null, extractionFn, comparator);
filter = Bounds.greaterThan(boundRefKey, val);
} else if ((!flip && kind == SqlKind.GREATER_THAN_OR_EQUAL) || (flip && kind == SqlKind.LESS_THAN_OR_EQUAL)) {
filter = new BoundDimFilter(column, val, null, false, false, null, extractionFn, comparator);
filter = Bounds.greaterThanOrEqualTo(boundRefKey, val);
} else if ((!flip && kind == SqlKind.LESS_THAN) || (flip && kind == SqlKind.GREATER_THAN)) {
filter = new BoundDimFilter(column, null, val, false, true, null, extractionFn, comparator);
filter = Bounds.lessThan(boundRefKey, val);
} else if ((!flip && kind == SqlKind.LESS_THAN_OR_EQUAL) || (flip && kind == SqlKind.GREATER_THAN_OR_EQUAL)) {
filter = new BoundDimFilter(column, null, val, false, false, null, extractionFn, comparator);
filter = Bounds.lessThanOrEqualTo(boundRefKey, val);
} else {
throw new IllegalStateException("WTF?! Shouldn't have got here...");
}

View File

@ -19,8 +19,10 @@
package io.druid.sql.calcite.expression;
import io.druid.granularity.QueryGranularity;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.extraction.TimeFormatExtractionFn;
import io.druid.sql.calcite.planner.PlannerContext;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
@ -46,6 +48,7 @@ public class ExtractExpressionConversion extends AbstractExpressionConversion
@Override
public RowExtraction convert(
final ExpressionConverter converter,
final PlannerContext plannerContext,
final List<String> rowOrder,
final RexNode expression
)
@ -56,7 +59,7 @@ public class ExtractExpressionConversion extends AbstractExpressionConversion
final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
final RexNode expr = call.getOperands().get(1);
final RowExtraction rex = converter.convert(rowOrder, expr);
final RowExtraction rex = converter.convert(plannerContext, rowOrder, expr);
if (rex == null) {
return null;
}
@ -76,10 +79,22 @@ public class ExtractExpressionConversion extends AbstractExpressionConversion
baseExtractionFn = rex.getExtractionFn();
}
if (baseExtractionFn instanceof TimeFormatExtractionFn) {
final TimeFormatExtractionFn baseTimeFormatFn = (TimeFormatExtractionFn) baseExtractionFn;
final QueryGranularity queryGranularity = ExtractionFns.toQueryGranularity(baseTimeFormatFn);
if (queryGranularity != null) {
// Combine EXTRACT(X FROM FLOOR(Y TO Z)) into a single extractionFn.
return RowExtraction.of(
rex.getColumn(),
new TimeFormatExtractionFn(dateTimeFormat, plannerContext.getTimeZone(), null, queryGranularity, true)
);
}
}
return RowExtraction.of(
rex.getColumn(),
ExtractionFns.compose(
new TimeFormatExtractionFn(dateTimeFormat, null, null, null, true),
new TimeFormatExtractionFn(dateTimeFormat, plannerContext.getTimeZone(), null, null, true),
baseExtractionFn
)
);

View File

@ -31,17 +31,18 @@ import java.util.List;
public class ExtractionFns
{
/**
* Converts extractionFn to a QueryGranularity, if possible.
* Converts extractionFn to a QueryGranularity, if possible. This is the inverse of
* {@link #fromQueryGranularity(QueryGranularity)}.
*
* @param extractionFn function
*
* @return
* @return query granularity, or null if extractionFn cannot be translated
*/
public static QueryGranularity toQueryGranularity(final ExtractionFn extractionFn)
{
if (extractionFn instanceof TimeFormatExtractionFn) {
final TimeFormatExtractionFn fn = (TimeFormatExtractionFn) extractionFn;
if (fn.getFormat() == null && fn.getTimeZone() == null && fn.getLocale() == null) {
if (fn.getFormat() == null && fn.getTimeZone() == null && fn.getLocale() == null && fn.isAsMillis()) {
return fn.getGranularity();
}
}
@ -49,6 +50,24 @@ public class ExtractionFns
return null;
}
/**
* Converts a QueryGranularity to an extractionFn, if possible. This is the inverse of
* {@link #toQueryGranularity(ExtractionFn)}. This will always return a non-null extractionFn if
* queryGranularity is non-null.
*
* @param queryGranularity granularity
*
* @return extractionFn, or null if queryGranularity is null
*/
public static ExtractionFn fromQueryGranularity(final QueryGranularity queryGranularity)
{
if (queryGranularity == null) {
return null;
} else {
return new TimeFormatExtractionFn(null, null, null, queryGranularity, true);
}
}
/**
* Compose f and g, returning an ExtractionFn that computes f(g(x)). Null f or g are treated like identity functions.
*

View File

@ -21,7 +21,7 @@ package io.druid.sql.calcite.expression;
import io.druid.granularity.QueryGranularity;
import io.druid.query.extraction.BucketExtractionFn;
import io.druid.query.extraction.TimeFormatExtractionFn;
import io.druid.sql.calcite.planner.PlannerContext;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
@ -44,9 +44,28 @@ public class FloorExpressionConversion extends AbstractExpressionConversion
return INSTANCE;
}
public static RowExtraction applyTimestampFloor(
final RowExtraction rex,
final QueryGranularity queryGranularity
)
{
if (rex == null || queryGranularity == null) {
return null;
}
return RowExtraction.of(
rex.getColumn(),
ExtractionFns.compose(
ExtractionFns.fromQueryGranularity(queryGranularity),
rex.getExtractionFn()
)
);
}
@Override
public RowExtraction convert(
final ExpressionConverter converter,
final PlannerContext plannerContext,
final List<String> rowOrder,
final RexNode expression
)
@ -54,7 +73,7 @@ public class FloorExpressionConversion extends AbstractExpressionConversion
final RexCall call = (RexCall) expression;
final RexNode arg = call.getOperands().get(0);
final RowExtraction rex = converter.convert(rowOrder, arg);
final RowExtraction rex = converter.convert(plannerContext, rowOrder, arg);
if (rex == null) {
return null;
} else if (call.getOperands().size() == 1) {
@ -67,20 +86,7 @@ public class FloorExpressionConversion extends AbstractExpressionConversion
// FLOOR(expr TO timeUnit)
final RexLiteral flag = (RexLiteral) call.getOperands().get(1);
final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
final QueryGranularity queryGranularity = TimeUnits.toQueryGranularity(timeUnit);
if (queryGranularity != null) {
return RowExtraction.of(
rex.getColumn(),
ExtractionFns.compose(
new TimeFormatExtractionFn(null, null, null, queryGranularity, true),
rex.getExtractionFn()
)
);
} else {
// We don't have a queryGranularity for this timeUnit.
return null;
}
return applyTimestampFloor(rex, TimeUnits.toQueryGranularity(timeUnit, plannerContext.getTimeZone()));
} else {
// WTF? FLOOR with 3 arguments?
return null;

View File

@ -20,6 +20,7 @@
package io.druid.sql.calcite.expression;
import io.druid.query.extraction.SubstringDimExtractionFn;
import io.druid.sql.calcite.planner.PlannerContext;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
@ -44,12 +45,13 @@ public class SubstringExpressionConversion extends AbstractExpressionConversion
@Override
public RowExtraction convert(
final ExpressionConverter converter,
final PlannerContext plannerContext,
final List<String> rowOrder,
final RexNode expression
)
{
final RexCall call = (RexCall) expression;
final RowExtraction arg = converter.convert(rowOrder, call.getOperands().get(0));
final RowExtraction arg = converter.convert(plannerContext, rowOrder, call.getOperands().get(0));
if (arg == null) {
return null;
}
@ -61,11 +63,12 @@ public class SubstringExpressionConversion extends AbstractExpressionConversion
length = null;
}
return RowExtraction.of(arg.getColumn(),
ExtractionFns.compose(
new SubstringDimExtractionFn(index, length),
arg.getExtractionFn()
)
return RowExtraction.of(
arg.getColumn(),
ExtractionFns.compose(
new SubstringDimExtractionFn(index, length),
arg.getExtractionFn()
)
);
}
}

View File

@ -20,23 +20,25 @@
package io.druid.sql.calcite.expression;
import com.google.common.collect.ImmutableMap;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
import java.util.Map;
public class TimeUnits
{
private static final Map<TimeUnitRange, QueryGranularity> QUERY_GRANULARITY_MAP = ImmutableMap.<TimeUnitRange, QueryGranularity>builder()
.put(TimeUnitRange.SECOND, QueryGranularities.SECOND)
.put(TimeUnitRange.MINUTE, QueryGranularities.MINUTE)
.put(TimeUnitRange.HOUR, QueryGranularities.HOUR)
.put(TimeUnitRange.DAY, QueryGranularities.DAY)
.put(TimeUnitRange.WEEK, QueryGranularities.WEEK)
.put(TimeUnitRange.MONTH, QueryGranularities.MONTH)
.put(TimeUnitRange.QUARTER, QueryGranularities.QUARTER)
.put(TimeUnitRange.YEAR, QueryGranularities.YEAR)
private static final Map<TimeUnitRange, Period> PERIOD_MAP = ImmutableMap.<TimeUnitRange, Period>builder()
.put(TimeUnitRange.SECOND, Period.seconds(1))
.put(TimeUnitRange.MINUTE, Period.minutes(1))
.put(TimeUnitRange.HOUR, Period.hours(1))
.put(TimeUnitRange.DAY, Period.days(1))
.put(TimeUnitRange.WEEK, Period.weeks(1))
.put(TimeUnitRange.MONTH, Period.months(1))
.put(TimeUnitRange.QUARTER, Period.months(3))
.put(TimeUnitRange.YEAR, Period.years(1))
.build();
// Note that QUARTER is not supported here.
@ -53,19 +55,25 @@ public class TimeUnits
/**
* Returns the Druid QueryGranularity corresponding to a Calcite TimeUnitRange, or null if there is none.
*
* @param timeUnitRange timeUnit
* @param timeUnitRange time unit
* @param timeZone session time zone
*
* @return queryGranularity, or null
*/
public static QueryGranularity toQueryGranularity(final TimeUnitRange timeUnitRange)
public static QueryGranularity toQueryGranularity(final TimeUnitRange timeUnitRange, final DateTimeZone timeZone)
{
return QUERY_GRANULARITY_MAP.get(timeUnitRange);
final Period period = PERIOD_MAP.get(timeUnitRange);
if (period == null) {
return null;
}
return new PeriodGranularity(period, null, timeZone);
}
/**
* Returns the Joda format string corresponding to extracting on a Calcite TimeUnitRange, or null if there is none.
*
* @param timeUnitRange timeUnit
* @param timeUnitRange time unit
*
* @return queryGranularity, or null
*/

View File

@ -24,7 +24,10 @@ import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import io.druid.java.util.common.ISE;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.ordering.StringComparators;
import org.joda.time.Interval;
import java.util.List;
@ -114,4 +117,93 @@ public class Bounds
boundRefKey.getComparator()
);
}
public static BoundDimFilter equalTo(final BoundRefKey boundRefKey, final String value)
{
return new BoundDimFilter(
boundRefKey.getDimension(),
value,
value,
false,
false,
null,
boundRefKey.getExtractionFn(),
boundRefKey.getComparator()
);
}
public static BoundDimFilter greaterThan(final BoundRefKey boundRefKey, final String value)
{
return new BoundDimFilter(
boundRefKey.getDimension(),
value,
null,
true,
false,
null,
boundRefKey.getExtractionFn(),
boundRefKey.getComparator()
);
}
public static BoundDimFilter greaterThanOrEqualTo(final BoundRefKey boundRefKey, final String value)
{
return new BoundDimFilter(
boundRefKey.getDimension(),
value,
null,
false,
false,
null,
boundRefKey.getExtractionFn(),
boundRefKey.getComparator()
);
}
public static BoundDimFilter lessThan(final BoundRefKey boundRefKey, final String value)
{
return new BoundDimFilter(
boundRefKey.getDimension(),
null,
value,
false,
true,
null,
boundRefKey.getExtractionFn(),
boundRefKey.getComparator()
);
}
public static BoundDimFilter lessThanOrEqualTo(final BoundRefKey boundRefKey, final String value)
{
return new BoundDimFilter(
boundRefKey.getDimension(),
null,
value,
false,
false,
null,
boundRefKey.getExtractionFn(),
boundRefKey.getComparator()
);
}
public static BoundDimFilter interval(final BoundRefKey boundRefKey, final Interval interval)
{
if (!boundRefKey.getComparator().equals(StringComparators.NUMERIC)) {
// Interval comparison only works with NUMERIC comparator.
throw new ISE("Comparator must be NUMERIC but was[%s]", boundRefKey.getComparator());
}
return new BoundDimFilter(
boundRefKey.getDimension(),
String.valueOf(interval.getStartMillis()),
String.valueOf(interval.getEndMillis()),
false,
true,
null,
boundRefKey.getExtractionFn(),
boundRefKey.getComparator()
);
}
}

View File

@ -19,53 +19,24 @@
package io.druid.sql.calcite.planner;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Chars;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.segment.column.ValueType;
import io.druid.sql.calcite.rel.DruidConvention;
import io.druid.sql.calcite.rel.DruidRel;
import io.druid.sql.calcite.schema.DruidSchema;
import io.druid.sql.calcite.schema.InformationSchema;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.interpreter.BindableConvention;
import org.apache.calcite.interpreter.BindableRel;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.ConversionUtil;
import org.apache.calcite.util.Pair;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* Entry points for Calcite.
* Utility functions for Calcite.
*/
public class Calcites
{
@ -124,40 +95,13 @@ public class Calcites
}
}
public static PlannerResult plan(
final Planner planner,
final String sql
) throws SqlParseException, ValidationException, RelConversionException
{
SqlExplain explain = null;
SqlNode parsed = planner.parse(sql);
if (parsed.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) parsed;
parsed = explain.getExplicandum();
}
final SqlNode validated = planner.validate(parsed);
final RelRoot root = planner.rel(validated);
try {
return planWithDruidConvention(planner, explain, root);
}
catch (RelOptPlanner.CannotPlanException e) {
// Try again with BINDABLE convention. Used for querying Values, metadata tables, and fallback.
try {
return planWithBindableConvention(planner, explain, root);
}
catch (Exception e2) {
e.addSuppressed(e2);
throw e;
}
}
}
public static ValueType getValueTypeForSqlTypeName(SqlTypeName sqlTypeName)
{
if (SqlTypeName.APPROX_TYPES.contains(sqlTypeName)) {
return ValueType.FLOAT;
} else if (SqlTypeName.DATETIME_TYPES.contains(sqlTypeName) || SqlTypeName.EXACT_TYPES.contains(sqlTypeName)) {
} else if (SqlTypeName.TIMESTAMP == sqlTypeName
|| SqlTypeName.DATE == sqlTypeName
|| SqlTypeName.EXACT_TYPES.contains(sqlTypeName)) {
return ValueType.LONG;
} else if (SqlTypeName.CHAR_TYPES.contains(sqlTypeName)) {
return ValueType.STRING;
@ -168,142 +112,56 @@ public class Calcites
}
}
private static PlannerResult planWithDruidConvention(
final Planner planner,
final SqlExplain explain,
final RelRoot root
) throws RelConversionException
/**
* Calcite expects "TIMESTAMP" types to be an instant that has the expected local time fields if printed as UTC.
*
* @param dateTime joda timestamp
* @param timeZone session time zone
*
* @return Calcite style millis
*/
public static long jodaToCalciteTimestamp(final DateTime dateTime, final DateTimeZone timeZone)
{
final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
Rules.DRUID_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(DruidConvention.instance())
.plus(root.collation),
root.rel
);
if (explain != null) {
return planExplanation(druidRel, explain);
} else {
final Supplier<Sequence<Object[]>> resultsSupplier = new Supplier<Sequence<Object[]>>()
{
@Override
public Sequence<Object[]> get()
{
if (root.isRefTrivial()) {
return druidRel.runQuery();
} else {
// Add a mapping on top to accommodate root.fields.
return Sequences.map(
druidRel.runQuery(),
new Function<Object[], Object[]>()
{
@Override
public Object[] apply(final Object[] input)
{
final Object[] retVal = new Object[root.fields.size()];
for (int i = 0; i < root.fields.size(); i++) {
retVal[i] = input[root.fields.get(i).getKey()];
}
return retVal;
}
}
);
}
}
};
return new PlannerResult(resultsSupplier, root.validatedRowType);
}
return dateTime.withZone(timeZone).withZoneRetainFields(DateTimeZone.UTC).getMillis();
}
private static PlannerResult planWithBindableConvention(
final Planner planner,
final SqlExplain explain,
final RelRoot root
) throws RelConversionException
/**
* Calcite expects "DATE" types to be number of days from the epoch to the UTC date matching the local time fields.
*
* @param dateTime joda timestamp
* @param timeZone session time zone
*
* @return Calcite style date
*/
public static int jodaToCalciteDate(final DateTime dateTime, final DateTimeZone timeZone)
{
BindableRel bindableRel = (BindableRel) planner.transform(
Rules.BINDABLE_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(BindableConvention.INSTANCE)
.plus(root.collation),
root.rel
);
if (!root.isRefTrivial()) {
// Add a projection on top to accommodate root.fields.
final List<RexNode> projects = new ArrayList<>();
final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder();
for (int field : Pair.left(root.fields)) {
projects.add(rexBuilder.makeInputRef(bindableRel, field));
}
bindableRel = new Bindables.BindableProject(
bindableRel.getCluster(),
bindableRel.getTraitSet(),
bindableRel,
projects,
root.validatedRowType
);
}
if (explain != null) {
return planExplanation(bindableRel, explain);
} else {
final BindableRel theRel = bindableRel;
final DataContext dataContext = new DataContext()
{
@Override
public SchemaPlus getRootSchema()
{
return null;
}
@Override
public JavaTypeFactory getTypeFactory()
{
return (JavaTypeFactory) planner.getTypeFactory();
}
@Override
public QueryProvider getQueryProvider()
{
return null;
}
@Override
public Object get(final String name)
{
return null;
}
};
final Supplier<Sequence<Object[]>> resultsSupplier = new Supplier<Sequence<Object[]>>()
{
@Override
public Sequence<Object[]> get()
{
final Enumerable enumerable = theRel.bind(dataContext);
return Sequences.simple(enumerable);
}
};
return new PlannerResult(resultsSupplier, root.validatedRowType);
}
final DateTime date = dateTime.withZone(timeZone).dayOfMonth().roundFloorCopy();
return Days.daysBetween(new DateTime(0L, DateTimeZone.UTC), date.withZoneRetainFields(DateTimeZone.UTC)).getDays();
}
private static PlannerResult planExplanation(
final RelNode rel,
final SqlExplain explain
)
/**
* The inverse of {@link #jodaToCalciteTimestamp(DateTime, DateTimeZone)}.
*
* @param timestamp Calcite style timestamp
* @param timeZone session time zone
*
* @return joda timestamp, with time zone set to the session time zone
*/
public static DateTime calciteTimestampToJoda(final long timestamp, final DateTimeZone timeZone)
{
final String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel());
final Supplier<Sequence<Object[]>> resultsSupplier = Suppliers.ofInstance(
Sequences.simple(ImmutableList.of(new Object[]{explanation})));
final RelDataTypeFactory typeFactory = rel.getCluster().getTypeFactory();
return new PlannerResult(
resultsSupplier,
typeFactory.createStructType(
ImmutableList.of(typeFactory.createSqlType(SqlTypeName.VARCHAR)),
ImmutableList.of("PLAN")
)
);
return new DateTime(timestamp, DateTimeZone.UTC).withZoneRetainFields(timeZone);
}
/**
* The inverse of {@link #jodaToCalciteDate(DateTime, DateTimeZone)}.
*
* @param date Calcite style date
* @param timeZone session time zone
*
* @return joda timestamp, with time zone set to the session time zone
*/
public static DateTime calciteDateToJoda(final int date, final DateTimeZone timeZone)
{
return new DateTime(0L, DateTimeZone.UTC).plusDays(date).withZoneRetainFields(timeZone);
}
}

View File

@ -19,41 +19,86 @@
package io.druid.sql.calcite.planner;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.ISE;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql2rel.SqlRexContext;
import org.apache.calcite.sql2rel.SqlRexConvertlet;
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.StandardConvertletTable;
import java.util.Locale;
import java.util.Map;
public class DruidConvertletTable implements SqlRexConvertletTable
{
private static final DruidConvertletTable INSTANCE = new DruidConvertletTable();
private DruidConvertletTable()
private static final SqlRexConvertlet BYPASS_CONVERTLET = new SqlRexConvertlet()
{
}
@Override
public RexNode convertCall(SqlRexContext cx, SqlCall call)
{
return StandardConvertletTable.INSTANCE.convertCall(cx, call);
}
};
public static DruidConvertletTable instance()
private final PlannerContext plannerContext;
private final Map<SqlOperator, SqlRexConvertlet> table;
public DruidConvertletTable(final PlannerContext plannerContext)
{
return INSTANCE;
this.plannerContext = plannerContext;
final SqlRexConvertlet currentTimestampAndFriendsConvertlet = new CurrentTimestampAndFriendsConvertlet();
this.table = ImmutableMap.<SqlOperator, SqlRexConvertlet>builder()
.put(SqlStdOperatorTable.CURRENT_TIMESTAMP, currentTimestampAndFriendsConvertlet)
.put(SqlStdOperatorTable.CURRENT_TIME, currentTimestampAndFriendsConvertlet)
.put(SqlStdOperatorTable.CURRENT_DATE, currentTimestampAndFriendsConvertlet)
.put(SqlStdOperatorTable.LOCALTIMESTAMP, currentTimestampAndFriendsConvertlet)
.put(SqlStdOperatorTable.LOCALTIME, currentTimestampAndFriendsConvertlet)
.build();
}
@Override
public SqlRexConvertlet get(SqlCall call)
{
if (call.getKind() == SqlKind.EXTRACT && call.getOperandList().get(1).getKind() != SqlKind.LITERAL) {
return new SqlRexConvertlet()
{
@Override
public RexNode convertCall(SqlRexContext cx, SqlCall call)
{
return StandardConvertletTable.INSTANCE.convertCall(cx, call);
}
};
// Avoid using the standard convertlet for EXTRACT(TIMEUNIT FROM col), since we want to handle it directly
// in ExtractExpressionConversion.
return BYPASS_CONVERTLET;
} else {
return StandardConvertletTable.INSTANCE.get(call);
final SqlRexConvertlet convertlet = table.get(call.getOperator());
return convertlet != null ? convertlet : StandardConvertletTable.INSTANCE.get(call);
}
}
private class CurrentTimestampAndFriendsConvertlet implements SqlRexConvertlet
{
@Override
public RexNode convertCall(final SqlRexContext cx, final SqlCall call)
{
final SqlOperator operator = call.getOperator();
if (operator == SqlStdOperatorTable.CURRENT_TIMESTAMP || operator == SqlStdOperatorTable.LOCALTIMESTAMP) {
return cx.getRexBuilder().makeTimestampLiteral(
plannerContext.getLocalNow().toCalendar(Locale.getDefault()),
RelDataType.PRECISION_NOT_SPECIFIED
);
} else if (operator == SqlStdOperatorTable.CURRENT_TIME || operator == SqlStdOperatorTable.LOCALTIME) {
return cx.getRexBuilder().makeTimeLiteral(
plannerContext.getLocalNow().toCalendar(Locale.getDefault()),
RelDataType.PRECISION_NOT_SPECIFIED
);
} else if (operator == SqlStdOperatorTable.CURRENT_DATE) {
return cx.getRexBuilder().makeDateLiteral(
plannerContext.getLocalNow().hourOfDay().roundFloorCopy().toCalendar(Locale.getDefault())
);
} else {
throw new ISE("WTF?! Should not have got here, operator was: %s", operator);
}
}
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.sql.calcite.planner;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.sql.calcite.rel.DruidConvention;
import io.druid.sql.calcite.rel.DruidRel;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.interpreter.BindableConvention;
import org.apache.calcite.interpreter.BindableRel;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
public class DruidPlanner implements Closeable
{
private final Planner planner;
private final PlannerContext plannerContext;
public DruidPlanner(final Planner planner, final PlannerContext plannerContext)
{
this.planner = planner;
this.plannerContext = plannerContext;
}
public PlannerResult plan(final String sql) throws SqlParseException, ValidationException, RelConversionException
{
SqlExplain explain = null;
SqlNode parsed = planner.parse(sql);
if (parsed.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) parsed;
parsed = explain.getExplicandum();
}
final SqlNode validated = planner.validate(parsed);
final RelRoot root = planner.rel(validated);
try {
return planWithDruidConvention(explain, root);
}
catch (RelOptPlanner.CannotPlanException e) {
// Try again with BINDABLE convention. Used for querying Values, metadata tables, and fallback.
try {
return planWithBindableConvention(explain, root);
}
catch (Exception e2) {
e.addSuppressed(e2);
throw e;
}
}
}
public PlannerContext getPlannerContext()
{
return plannerContext;
}
@Override
public void close()
{
planner.close();
}
private PlannerResult planWithDruidConvention(
final SqlExplain explain,
final RelRoot root
) throws RelConversionException
{
final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
Rules.DRUID_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(DruidConvention.instance())
.plus(root.collation),
root.rel
);
if (explain != null) {
return planExplanation(druidRel, explain);
} else {
final Supplier<Sequence<Object[]>> resultsSupplier = new Supplier<Sequence<Object[]>>()
{
@Override
public Sequence<Object[]> get()
{
if (root.isRefTrivial()) {
return druidRel.runQuery();
} else {
// Add a mapping on top to accommodate root.fields.
return Sequences.map(
druidRel.runQuery(),
new Function<Object[], Object[]>()
{
@Override
public Object[] apply(final Object[] input)
{
final Object[] retVal = new Object[root.fields.size()];
for (int i = 0; i < root.fields.size(); i++) {
retVal[i] = input[root.fields.get(i).getKey()];
}
return retVal;
}
}
);
}
}
};
return new PlannerResult(resultsSupplier, root.validatedRowType);
}
}
private PlannerResult planWithBindableConvention(
final SqlExplain explain,
final RelRoot root
) throws RelConversionException
{
BindableRel bindableRel = (BindableRel) planner.transform(
Rules.BINDABLE_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(BindableConvention.INSTANCE)
.plus(root.collation),
root.rel
);
if (!root.isRefTrivial()) {
// Add a projection on top to accommodate root.fields.
final List<RexNode> projects = new ArrayList<>();
final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder();
for (int field : Pair.left(root.fields)) {
projects.add(rexBuilder.makeInputRef(bindableRel, field));
}
bindableRel = new Bindables.BindableProject(
bindableRel.getCluster(),
bindableRel.getTraitSet(),
bindableRel,
projects,
root.validatedRowType
);
}
if (explain != null) {
return planExplanation(bindableRel, explain);
} else {
final BindableRel theRel = bindableRel;
final DataContext dataContext = plannerContext.createDataContext((JavaTypeFactory) planner.getTypeFactory());
final Supplier<Sequence<Object[]>> resultsSupplier = new Supplier<Sequence<Object[]>>()
{
@Override
public Sequence<Object[]> get()
{
final Enumerable enumerable = theRel.bind(dataContext);
return Sequences.simple(enumerable);
}
};
return new PlannerResult(resultsSupplier, root.validatedRowType);
}
}
private PlannerResult planExplanation(
final RelNode rel,
final SqlExplain explain
)
{
final String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel());
final Supplier<Sequence<Object[]>> resultsSupplier = Suppliers.ofInstance(
Sequences.simple(ImmutableList.of(new Object[]{explanation})));
final RelDataTypeFactory typeFactory = rel.getCluster().getTypeFactory();
return new PlannerResult(
resultsSupplier,
typeFactory.createStructType(
ImmutableList.of(typeFactory.createSqlType(SqlTypeName.VARCHAR)),
ImmutableList.of("PLAN")
)
);
}
}

View File

@ -20,10 +20,17 @@
package io.druid.sql.calcite.planner;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.IAE;
import org.joda.time.Period;
import java.util.Map;
public class PlannerConfig
{
public static final String CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT = "useApproximateCountDistinct";
public static final String CTX_KEY_USE_APPROXIMATE_TOPN = "useApproximateTopN";
public static final String CTX_KEY_USE_FALLBACK = "useFallback";
@JsonProperty
private Period metadataRefreshPeriod = new Period("PT1M");
@ -65,7 +72,7 @@ public class PlannerConfig
public int getMaxQueryCount()
{
return maxTopNLimit;
return maxQueryCount;
}
public int getSelectThreshold()
@ -88,8 +95,56 @@ public class PlannerConfig
return useFallback;
}
public PlannerConfig withOverrides(final Map<String, Object> context)
{
if (context == null) {
return this;
}
final PlannerConfig newConfig = new PlannerConfig();
newConfig.metadataRefreshPeriod = getMetadataRefreshPeriod();
newConfig.maxSemiJoinRowsInMemory = getMaxSemiJoinRowsInMemory();
newConfig.maxTopNLimit = getMaxTopNLimit();
newConfig.maxQueryCount = getMaxQueryCount();
newConfig.selectThreshold = getSelectThreshold();
newConfig.useApproximateCountDistinct = getContextBoolean(
context,
CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT,
isUseApproximateCountDistinct()
);
newConfig.useApproximateTopN = getContextBoolean(
context,
CTX_KEY_USE_APPROXIMATE_TOPN,
isUseApproximateTopN()
);
newConfig.useFallback = getContextBoolean(
context,
CTX_KEY_USE_FALLBACK,
isUseFallback()
);
return newConfig;
}
private static boolean getContextBoolean(
final Map<String, Object> context,
final String parameter,
final boolean defaultValue
)
{
final Object value = context.get(parameter);
if (value == null) {
return defaultValue;
} else if (value instanceof String) {
return Boolean.parseBoolean((String) value);
} else if (value instanceof Boolean) {
return (Boolean) value;
} else {
throw new IAE("Expected parameter[%s] to be boolean", parameter);
}
}
@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
@ -98,7 +153,7 @@ public class PlannerConfig
return false;
}
PlannerConfig that = (PlannerConfig) o;
final PlannerConfig that = (PlannerConfig) o;
if (maxSemiJoinRowsInMemory != that.maxSemiJoinRowsInMemory) {
return false;

View File

@ -0,0 +1,150 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.sql.calcite.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import java.util.Map;
/**
* Like {@link PlannerConfig}, but that has static configuration and this class contains dynamic, per-query
* configuration.
*/
public class PlannerContext
{
public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp";
public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone";
private final PlannerConfig plannerConfig;
private final DateTime localNow;
private final Map<String, Object> queryContext;
private PlannerContext(
final PlannerConfig plannerConfig,
final DateTime localNow,
final Map<String, Object> queryContext
)
{
this.plannerConfig = Preconditions.checkNotNull(plannerConfig, "plannerConfig");
this.queryContext = queryContext != null ? ImmutableMap.copyOf(queryContext) : ImmutableMap.<String, Object>of();
this.localNow = Preconditions.checkNotNull(localNow, "localNow");
}
public static PlannerContext create(
final PlannerConfig plannerConfig,
final Map<String, Object> queryContext
)
{
final DateTime utcNow;
final DateTimeZone timeZone;
if (queryContext != null) {
final Object tsParam = queryContext.get(CTX_SQL_CURRENT_TIMESTAMP);
final Object tzParam = queryContext.get(CTX_SQL_TIME_ZONE);
if (tsParam != null) {
utcNow = new DateTime(tsParam, DateTimeZone.UTC);
} else {
utcNow = new DateTime(DateTimeZone.UTC);
}
if (tzParam != null) {
timeZone = DateTimeZone.forID(String.valueOf(tzParam));
} else {
timeZone = DateTimeZone.UTC;
}
} else {
utcNow = new DateTime(DateTimeZone.UTC);
timeZone = DateTimeZone.UTC;
}
return new PlannerContext(plannerConfig.withOverrides(queryContext), utcNow.withZone(timeZone), queryContext);
}
public PlannerConfig getPlannerConfig()
{
return plannerConfig;
}
public DateTime getLocalNow()
{
return localNow;
}
public DateTimeZone getTimeZone()
{
return localNow.getZone();
}
public Map<String, Object> getQueryContext()
{
return queryContext;
}
public DataContext createDataContext(final JavaTypeFactory typeFactory)
{
class DruidDataContext implements DataContext
{
private final Map<String, Object> context = ImmutableMap.<String, Object>of(
DataContext.Variable.UTC_TIMESTAMP.camelName, localNow.getMillis(),
DataContext.Variable.CURRENT_TIMESTAMP.camelName, localNow.getMillis(),
DataContext.Variable.LOCAL_TIMESTAMP.camelName, new Interval(
new DateTime("1970-01-01T00:00:00.000", localNow.getZone()),
localNow
).toDurationMillis(),
DataContext.Variable.TIME_ZONE.camelName, localNow.getZone().toTimeZone()
);
@Override
public SchemaPlus getRootSchema()
{
throw new UnsupportedOperationException();
}
@Override
public JavaTypeFactory getTypeFactory()
{
return typeFactory;
}
@Override
public QueryProvider getQueryProvider()
{
throw new UnsupportedOperationException();
}
@Override
public Object get(final String name)
{
return context.get(name);
}
}
return new DruidDataContext();
}
}

View File

@ -20,6 +20,8 @@
package io.druid.sql.calcite.planner;
import com.google.inject.Inject;
import io.druid.query.QuerySegmentWalker;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.schema.DruidSchema;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
@ -33,28 +35,34 @@ import org.apache.calcite.schema.Schemas;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import java.util.Map;
public class PlannerFactory
{
private final SchemaPlus rootSchema;
private final QuerySegmentWalker walker;
private final DruidOperatorTable operatorTable;
private final PlannerConfig plannerConfig;
@Inject
public PlannerFactory(
final SchemaPlus rootSchema,
final QuerySegmentWalker walker,
final DruidOperatorTable operatorTable,
final PlannerConfig plannerConfig
)
{
this.rootSchema = rootSchema;
this.walker = walker;
this.operatorTable = operatorTable;
this.plannerConfig = plannerConfig;
}
public Planner createPlanner()
public DruidPlanner createPlanner(final Map<String, Object> queryContext)
{
final PlannerContext plannerContext = PlannerContext.create(plannerConfig, queryContext);
final QueryMaker queryMaker = new QueryMaker(walker, plannerContext);
final FrameworkConfig frameworkConfig = Frameworks
.newConfigBuilder()
.parserConfig(
@ -67,15 +75,15 @@ public class PlannerFactory
)
.defaultSchema(rootSchema)
.traitDefs(ConventionTraitDef.INSTANCE, RelCollationTraitDef.INSTANCE)
.convertletTable(DruidConvertletTable.instance())
.convertletTable(new DruidConvertletTable(plannerContext))
.operatorTable(operatorTable)
.programs(Rules.programs(operatorTable, plannerConfig))
.programs(Rules.programs(queryMaker, operatorTable))
.executor(new RexExecutorImpl(Schemas.createDataContext(null)))
.context(Contexts.EMPTY_CONTEXT)
.typeSystem(RelDataTypeSystem.DEFAULT)
.defaultSchema(rootSchema.getSubSchema(DruidSchema.NAME))
.build();
return Frameworks.getPlanner(frameworkConfig);
return new DruidPlanner(Frameworks.getPlanner(frameworkConfig), plannerContext);
}
}

View File

@ -20,10 +20,12 @@
package io.druid.sql.calcite.planner;
import com.google.common.collect.ImmutableList;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.rule.DruidFilterRule;
import io.druid.sql.calcite.rule.DruidRelToBindableRule;
import io.druid.sql.calcite.rule.DruidRelToDruidRule;
import io.druid.sql.calcite.rule.DruidSemiJoinRule;
import io.druid.sql.calcite.rule.DruidTableScanRule;
import io.druid.sql.calcite.rule.GroupByRules;
import io.druid.sql.calcite.rule.SelectRules;
import org.apache.calcite.interpreter.Bindables;
@ -36,7 +38,6 @@ import org.apache.calcite.rel.rules.AggregateRemoveRule;
import org.apache.calcite.rel.rules.AggregateStarTableRule;
import org.apache.calcite.rel.rules.AggregateValuesRule;
import org.apache.calcite.rel.rules.CalcRemoveRule;
import org.apache.calcite.rel.rules.DateRangeRules;
import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
import org.apache.calcite.rel.rules.FilterJoinRule;
import org.apache.calcite.rel.rules.FilterMergeRule;
@ -133,6 +134,7 @@ public class Rules
);
// Rules from RelOptUtil's registerAbstractRels.
// Omit DateRangeRules due to https://issues.apache.org/jira/browse/CALCITE-1601
private static final List<RelOptRule> RELOPTUTIL_ABSTRACT_RULES =
ImmutableList.of(
AggregateProjectPullUpConstantsRule.INSTANCE2,
@ -147,8 +149,7 @@ public class Rules
PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
UnionMergeRule.INSTANCE,
ProjectToWindowRule.PROJECT,
FilterMergeRule.INSTANCE,
DateRangeRules.FILTER_INSTANCE
FilterMergeRule.INSTANCE
);
private Rules()
@ -156,38 +157,42 @@ public class Rules
// No instantiation.
}
public static List<Program> programs(final DruidOperatorTable operatorTable, final PlannerConfig plannerConfig)
public static List<Program> programs(final QueryMaker queryMaker, final DruidOperatorTable operatorTable)
{
return ImmutableList.of(
Programs.ofRules(druidConventionRuleSet(operatorTable, plannerConfig)),
Programs.ofRules(bindableConventionRuleSet(operatorTable, plannerConfig))
Programs.ofRules(druidConventionRuleSet(queryMaker, operatorTable)),
Programs.ofRules(bindableConventionRuleSet(queryMaker, operatorTable))
);
}
private static List<RelOptRule> druidConventionRuleSet(
final DruidOperatorTable operatorTable,
final PlannerConfig plannerConfig
final QueryMaker queryMaker,
final DruidOperatorTable operatorTable
)
{
return ImmutableList.<RelOptRule>builder()
.addAll(baseRuleSet(operatorTable, plannerConfig))
.addAll(baseRuleSet(queryMaker, operatorTable))
.add(DruidRelToDruidRule.instance())
.build();
}
private static List<RelOptRule> bindableConventionRuleSet(
final DruidOperatorTable operatorTable,
final PlannerConfig plannerConfig
final QueryMaker queryMaker,
final DruidOperatorTable operatorTable
)
{
return ImmutableList.<RelOptRule>builder()
.addAll(baseRuleSet(operatorTable, plannerConfig))
.addAll(baseRuleSet(queryMaker, operatorTable))
.addAll(Bindables.RULES)
.build();
}
private static List<RelOptRule> baseRuleSet(final DruidOperatorTable operatorTable, final PlannerConfig plannerConfig)
private static List<RelOptRule> baseRuleSet(
final QueryMaker queryMaker,
final DruidOperatorTable operatorTable
)
{
final PlannerConfig plannerConfig = queryMaker.getPlannerContext().getPlannerConfig();
final ImmutableList.Builder<RelOptRule> rules = ImmutableList.builder();
// Calcite rules.
@ -202,14 +207,15 @@ public class Rules
}
// Druid-specific rules.
rules.add(new DruidTableScanRule(queryMaker));
rules.add(DruidFilterRule.instance());
if (plannerConfig.getMaxSemiJoinRowsInMemory() > 0) {
rules.add(DruidSemiJoinRule.create(plannerConfig));
rules.add(DruidSemiJoinRule.instance());
}
rules.addAll(SelectRules.rules());
rules.addAll(GroupByRules.rules(operatorTable, plannerConfig));
rules.addAll(GroupByRules.rules(operatorTable));
return rules.build();
}

View File

@ -131,7 +131,11 @@ public class DruidNestedGroupBy extends DruidRel<DruidNestedGroupBy>
return null;
} else {
return new QueryDataSource(
queryBuilder.toGroupByQuery(queryDataSource, sourceRel.getOutputRowSignature())
queryBuilder.toGroupByQuery(
queryDataSource,
sourceRel.getOutputRowSignature(),
getPlannerContext().getQueryContext()
)
);
}
}

View File

@ -281,12 +281,14 @@ public class DruidQueryBuilder
*
* @param dataSource data source to query
* @param sourceRowSignature row signature of the dataSource
* @param context query context
*
* @return query or null
*/
public TimeseriesQuery toTimeseriesQuery(
final DataSource dataSource,
final RowSignature sourceRowSignature
final RowSignature sourceRowSignature,
final Map<String, Object> context
)
{
if (grouping == null || having != null) {
@ -331,8 +333,9 @@ public class DruidQueryBuilder
descending = false;
}
final Map<String, Object> context = Maps.newHashMap();
context.put("skipEmptyBuckets", true);
final Map<String, Object> theContext = Maps.newHashMap();
theContext.put("skipEmptyBuckets", true);
theContext.putAll(context);
return new TimeseriesQuery(
dataSource,
@ -342,7 +345,7 @@ public class DruidQueryBuilder
queryGranularity,
grouping.getAggregatorFactories(),
grouping.getPostAggregators(),
context
theContext
);
}
@ -351,6 +354,7 @@ public class DruidQueryBuilder
*
* @param dataSource data source to query
* @param sourceRowSignature row signature of the dataSource
* @param context query context
* @param maxTopNLimit maxTopNLimit from a PlannerConfig
* @param useApproximateTopN from a PlannerConfig
*
@ -359,6 +363,7 @@ public class DruidQueryBuilder
public TopNQuery toTopNQuery(
final DataSource dataSource,
final RowSignature sourceRowSignature,
final Map<String, Object> context,
final int maxTopNLimit,
final boolean useApproximateTopN
)
@ -417,7 +422,7 @@ public class DruidQueryBuilder
QueryGranularities.ALL,
grouping.getAggregatorFactories(),
grouping.getPostAggregators(),
null
context
);
}
@ -426,12 +431,14 @@ public class DruidQueryBuilder
*
* @param dataSource data source to query
* @param sourceRowSignature row signature of the dataSource
* @param context query context
*
* @return query or null
*/
public GroupByQuery toGroupByQuery(
final DataSource dataSource,
final RowSignature sourceRowSignature
final RowSignature sourceRowSignature,
final Map<String, Object> context
)
{
if (grouping == null) {
@ -450,7 +457,7 @@ public class DruidQueryBuilder
grouping.getPostAggregators(),
having != null ? new DimFilterHavingSpec(having) : null,
limitSpec,
null
context
);
}
@ -459,12 +466,14 @@ public class DruidQueryBuilder
*
* @param dataSource data source to query
* @param sourceRowSignature row signature of the dataSource
* @param context query context
*
* @return query or null
*/
public SelectQuery toSelectQuery(
final DataSource dataSource,
final RowSignature sourceRowSignature
final RowSignature sourceRowSignature,
final Map<String, Object> context
)
{
if (grouping != null) {
@ -499,7 +508,7 @@ public class DruidQueryBuilder
selectProjection != null ? selectProjection.getMetrics() : ImmutableList.<String>of(),
null,
new PagingSpec(null, 0) /* dummy -- will be replaced */,
null
context
);
}

View File

@ -56,10 +56,11 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
final RelTraitSet traitSet,
final RelOptTable table,
final DruidTable druidTable,
final QueryMaker queryMaker,
final DruidQueryBuilder queryBuilder
)
{
super(cluster, traitSet, druidTable.getQueryMaker());
super(cluster, traitSet, queryMaker);
this.table = Preconditions.checkNotNull(table, "table");
this.druidTable = Preconditions.checkNotNull(druidTable, "druidTable");
this.queryBuilder = Preconditions.checkNotNull(queryBuilder, "queryBuilder");
@ -71,7 +72,8 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
public static DruidQueryRel fullScan(
final RelOptCluster cluster,
final RelOptTable table,
final DruidTable druidTable
final DruidTable druidTable,
final QueryMaker queryMaker
)
{
return new DruidQueryRel(
@ -79,6 +81,7 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
cluster.traitSetOf(Convention.NONE),
table,
druidTable,
queryMaker,
DruidQueryBuilder.fullScan(druidTable.getRowSignature(), cluster.getTypeFactory())
);
}
@ -88,7 +91,8 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
{
final GroupByQuery groupByQuery = getQueryBuilder().toGroupByQuery(
druidTable.getDataSource(),
druidTable.getRowSignature()
druidTable.getRowSignature(),
getPlannerContext().getQueryContext()
);
if (groupByQuery == null) {
@ -109,6 +113,7 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
getTraitSet().plus(BindableConvention.INSTANCE),
table,
druidTable,
getQueryMaker(),
queryBuilder
);
}
@ -121,6 +126,7 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
getTraitSet().replace(DruidConvention.instance()),
table,
druidTable,
getQueryMaker(),
queryBuilder
);
}
@ -145,6 +151,7 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
table,
druidTable,
getQueryMaker(),
newQueryBuilder
);
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Throwables;
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.QueryDataSource;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.DataContext;
import org.apache.calcite.interpreter.BindableRel;
@ -82,6 +83,11 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode imple
return queryMaker;
}
public PlannerContext getPlannerContext()
{
return queryMaker.getPlannerContext();
}
public abstract T asDruidConvention();
@Override

View File

@ -46,7 +46,8 @@ import io.druid.query.topn.DimensionAndMetricValueExtractor;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.column.Column;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.rel.type.RelDataTypeField;
@ -56,7 +57,6 @@ import org.apache.calcite.util.NlsString;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -67,15 +67,20 @@ import java.util.concurrent.atomic.AtomicReference;
public class QueryMaker
{
private final QuerySegmentWalker walker;
private final PlannerConfig plannerConfig;
private final PlannerContext plannerContext;
public QueryMaker(
final QuerySegmentWalker walker,
final PlannerConfig plannerConfig
final PlannerContext plannerContext
)
{
this.walker = walker;
this.plannerConfig = plannerConfig;
this.plannerContext = plannerContext;
}
public PlannerContext getPlannerContext()
{
return plannerContext;
}
public Sequence<Object[]> runQuery(
@ -85,7 +90,12 @@ public class QueryMaker
)
{
if (dataSource instanceof QueryDataSource) {
final GroupByQuery outerQuery = queryBuilder.toGroupByQuery(dataSource, sourceRowSignature);
final GroupByQuery outerQuery = queryBuilder.toGroupByQuery(
dataSource,
sourceRowSignature,
plannerContext.getQueryContext()
);
if (outerQuery == null) {
// Bug in the planner rules. They shouldn't allow this to happen.
throw new IllegalStateException("Can't use QueryDataSource without an outer groupBy query!");
@ -94,7 +104,11 @@ public class QueryMaker
return executeGroupBy(queryBuilder, outerQuery);
}
final TimeseriesQuery timeseriesQuery = queryBuilder.toTimeseriesQuery(dataSource, sourceRowSignature);
final TimeseriesQuery timeseriesQuery = queryBuilder.toTimeseriesQuery(
dataSource,
sourceRowSignature,
plannerContext.getQueryContext()
);
if (timeseriesQuery != null) {
return executeTimeseries(queryBuilder, timeseriesQuery);
}
@ -102,19 +116,28 @@ public class QueryMaker
final TopNQuery topNQuery = queryBuilder.toTopNQuery(
dataSource,
sourceRowSignature,
plannerConfig.getMaxTopNLimit(),
plannerConfig.isUseApproximateTopN()
plannerContext.getQueryContext(),
plannerContext.getPlannerConfig().getMaxTopNLimit(),
plannerContext.getPlannerConfig().isUseApproximateTopN()
);
if (topNQuery != null) {
return executeTopN(queryBuilder, topNQuery);
}
final GroupByQuery groupByQuery = queryBuilder.toGroupByQuery(dataSource, sourceRowSignature);
final GroupByQuery groupByQuery = queryBuilder.toGroupByQuery(
dataSource,
sourceRowSignature,
plannerContext.getQueryContext()
);
if (groupByQuery != null) {
return executeGroupBy(queryBuilder, groupByQuery);
}
final SelectQuery selectQuery = queryBuilder.toSelectQuery(dataSource, sourceRowSignature);
final SelectQuery selectQuery = queryBuilder.toSelectQuery(
dataSource,
sourceRowSignature,
plannerContext.getQueryContext()
);
if (selectQuery != null) {
return executeSelect(queryBuilder, selectQuery);
}
@ -155,22 +178,22 @@ public class QueryMaker
@Override
public Sequence<Object[]> next()
{
final SelectQuery query = baseQuery.withPagingSpec(
final SelectQuery queryWithPagination = baseQuery.withPagingSpec(
new PagingSpec(
pagingIdentifiers.get(),
plannerConfig.getSelectThreshold(),
plannerContext.getPlannerConfig().getSelectThreshold(),
true
)
);
Hook.QUERY_PLAN.run(query);
Hook.QUERY_PLAN.run(queryWithPagination);
morePages.set(false);
final AtomicBoolean gotResult = new AtomicBoolean();
return Sequences.concat(
Sequences.map(
query.run(walker, Maps.<String, Object>newHashMap()),
queryWithPagination.run(walker, Maps.<String, Object>newHashMap()),
new Function<Result<SelectResultValue>, Sequence<Object[]>>()
{
@Override
@ -335,8 +358,10 @@ public class QueryMaker
{
if (SqlTypeName.CHAR_TYPES.contains(sqlType)) {
return ColumnMetaData.Rep.of(String.class);
} else if (SqlTypeName.DATETIME_TYPES.contains(sqlType)) {
} else if (sqlType == SqlTypeName.TIMESTAMP) {
return ColumnMetaData.Rep.of(Long.class);
} else if (sqlType == SqlTypeName.DATE) {
return ColumnMetaData.Rep.of(Integer.class);
} else if (sqlType == SqlTypeName.INTEGER) {
return ColumnMetaData.Rep.of(Integer.class);
} else if (sqlType == SqlTypeName.BIGINT) {
@ -350,7 +375,7 @@ public class QueryMaker
}
}
private static Object coerce(final Object value, final SqlTypeName sqlType)
private Object coerce(final Object value, final SqlTypeName sqlType)
{
final Object coercedValue;
@ -367,24 +392,33 @@ public class QueryMaker
} else if (value == null) {
coercedValue = null;
} else if (sqlType == SqlTypeName.DATE) {
final Long millis = (Long) coerce(value, SqlTypeName.TIMESTAMP);
if (millis == null) {
return null;
} else {
return new DateTime(millis.longValue()).dayOfMonth().roundFloorCopy().getMillis();
}
} else if (sqlType == SqlTypeName.TIMESTAMP) {
final DateTime dateTime;
if (value instanceof Number) {
coercedValue = new DateTime(((Number) value).longValue()).getMillis();
dateTime = new DateTime(((Number) value).longValue());
} else if (value instanceof String) {
coercedValue = Long.parseLong((String) value);
} else if (value instanceof Calendar) {
coercedValue = ((Calendar) value).getTimeInMillis();
dateTime = new DateTime(Long.parseLong((String) value));
} else if (value instanceof DateTime) {
coercedValue = ((DateTime) value).getMillis();
dateTime = (DateTime) value;
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
return Calcites.jodaToCalciteDate(dateTime, plannerContext.getTimeZone());
} else if (sqlType == SqlTypeName.TIMESTAMP) {
final DateTime dateTime;
if (value instanceof Number) {
dateTime = new DateTime(((Number) value).longValue());
} else if (value instanceof String) {
dateTime = new DateTime(Long.parseLong((String) value));
} else if (value instanceof DateTime) {
dateTime = (DateTime) value;
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
return Calcites.jodaToCalciteTimestamp(dateTime, plannerContext.getTimeZone());
} else if (sqlType == SqlTypeName.INTEGER) {
if (value instanceof String) {
coercedValue = Ints.tryParse((String) value);

View File

@ -53,6 +53,7 @@ public class DruidFilterRule extends RelOptRule
}
final DimFilter dimFilter = Expressions.toFilter(
druidRel.getPlannerContext(),
druidRel.getSourceRowSignature(),
filter.getCondition()
);

View File

@ -70,9 +70,9 @@ public class DruidSemiJoinRule extends RelOptRule
}
};
private final PlannerConfig plannerConfig;
private static final DruidSemiJoinRule INSTANCE = new DruidSemiJoinRule();
private DruidSemiJoinRule(final PlannerConfig plannerConfig)
private DruidSemiJoinRule()
{
super(
operand(
@ -88,12 +88,11 @@ public class DruidSemiJoinRule extends RelOptRule
)
)
);
this.plannerConfig = plannerConfig;
}
public static DruidSemiJoinRule create(final PlannerConfig plannerConfig)
public static DruidSemiJoinRule instance()
{
return new DruidSemiJoinRule(plannerConfig);
return INSTANCE;
}
@Override
@ -129,6 +128,7 @@ public class DruidSemiJoinRule extends RelOptRule
}
final RelBuilder relBuilder = call.builder();
final PlannerConfig plannerConfig = left.getPlannerContext().getPlannerConfig();
if (join.getJoinType() == JoinRelType.LEFT) {
// Join can be eliminated since the right-hand side cannot have any effect (nothing is being selected,

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.sql.calcite.rule;
import io.druid.sql.calcite.rel.DruidQueryRel;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.logical.LogicalTableScan;
public class DruidTableScanRule extends RelOptRule
{
private final QueryMaker queryMaker;
public DruidTableScanRule(
final QueryMaker queryMaker
)
{
super(operand(LogicalTableScan.class, any()));
this.queryMaker = queryMaker;
}
@Override
public void onMatch(final RelOptRuleCall call)
{
final LogicalTableScan scan = call.rel(0);
final RelOptTable table = scan.getTable();
final DruidTable druidTable = table.unwrap(DruidTable.class);
if (druidTable != null) {
call.transformTo(
DruidQueryRel.fullScan(scan.getCluster(), table, druidTable, queryMaker)
);
}
}
}

View File

@ -54,7 +54,7 @@ import io.druid.sql.calcite.expression.RowExtraction;
import io.druid.sql.calcite.filtration.Filtration;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.rel.DruidNestedGroupBy;
import io.druid.sql.calcite.rel.DruidRel;
import io.druid.sql.calcite.rel.Grouping;
@ -87,12 +87,12 @@ public class GroupByRules
// No instantiation.
}
public static List<RelOptRule> rules(final DruidOperatorTable operatorTable, final PlannerConfig plannerConfig)
public static List<RelOptRule> rules(final DruidOperatorTable operatorTable)
{
return ImmutableList.of(
new DruidAggregateRule(operatorTable, plannerConfig),
new DruidAggregateProjectRule(operatorTable, plannerConfig),
new DruidAggregateProjectFilterRule(operatorTable, plannerConfig),
new DruidAggregateRule(operatorTable),
new DruidAggregateProjectRule(operatorTable),
new DruidAggregateProjectFilterRule(operatorTable),
new DruidGroupByPostAggregationRule(),
new DruidGroupByHavingRule(),
new DruidGroupByLimitRule()
@ -115,9 +115,13 @@ public class GroupByRules
Preconditions.checkArgument(fieldName == null ^ expression == null, "must have either fieldName or expression");
}
public static FieldOrExpression fromRexNode(final List<String> rowOrder, final RexNode rexNode)
public static FieldOrExpression fromRexNode(
final PlannerContext plannerContext,
final List<String> rowOrder,
final RexNode rexNode
)
{
final RowExtraction rex = Expressions.toRowExtraction(rowOrder, rexNode);
final RowExtraction rex = Expressions.toRowExtraction(plannerContext, rowOrder, rexNode);
if (rex != null && rex.getExtractionFn() == null) {
// This was a simple field access.
return fieldName(rex.getColumn());
@ -156,13 +160,11 @@ public class GroupByRules
public static class DruidAggregateRule extends RelOptRule
{
private final DruidOperatorTable operatorTable;
private final PlannerConfig plannerConfig;
private DruidAggregateRule(final DruidOperatorTable operatorTable, final PlannerConfig plannerConfig)
private DruidAggregateRule(final DruidOperatorTable operatorTable)
{
super(operand(Aggregate.class, operand(DruidRel.class, none())));
this.operatorTable = operatorTable;
this.plannerConfig = plannerConfig;
}
@Override
@ -184,7 +186,7 @@ public class GroupByRules
null,
aggregate,
operatorTable,
plannerConfig.isUseApproximateCountDistinct()
druidRel.getPlannerContext().getPlannerConfig().isUseApproximateCountDistinct()
);
if (newDruidRel != null) {
call.transformTo(newDruidRel);
@ -195,13 +197,11 @@ public class GroupByRules
public static class DruidAggregateProjectRule extends RelOptRule
{
private final DruidOperatorTable operatorTable;
private final PlannerConfig plannerConfig;
private DruidAggregateProjectRule(final DruidOperatorTable operatorTable, final PlannerConfig plannerConfig)
private DruidAggregateProjectRule(final DruidOperatorTable operatorTable)
{
super(operand(Aggregate.class, operand(Project.class, operand(DruidRel.class, none()))));
this.operatorTable = operatorTable;
this.plannerConfig = plannerConfig;
}
@Override
@ -225,7 +225,7 @@ public class GroupByRules
project,
aggregate,
operatorTable,
plannerConfig.isUseApproximateCountDistinct()
druidRel.getPlannerContext().getPlannerConfig().isUseApproximateCountDistinct()
);
if (newDruidRel != null) {
call.transformTo(newDruidRel);
@ -236,13 +236,11 @@ public class GroupByRules
public static class DruidAggregateProjectFilterRule extends RelOptRule
{
private final DruidOperatorTable operatorTable;
private final PlannerConfig plannerConfig;
private DruidAggregateProjectFilterRule(final DruidOperatorTable operatorTable, final PlannerConfig plannerConfig)
private DruidAggregateProjectFilterRule(final DruidOperatorTable operatorTable)
{
super(operand(Aggregate.class, operand(Project.class, operand(Filter.class, operand(DruidRel.class, none())))));
this.operatorTable = operatorTable;
this.plannerConfig = plannerConfig;
}
@Override
@ -268,7 +266,7 @@ public class GroupByRules
project,
aggregate,
operatorTable,
plannerConfig.isUseApproximateCountDistinct()
druidRel.getPlannerContext().getPlannerConfig().isUseApproximateCountDistinct()
);
if (newDruidRel != null) {
call.transformTo(newDruidRel);
@ -397,7 +395,7 @@ public class GroupByRules
// Filter that should be applied before aggregating.
final DimFilter filter;
if (filter0 != null) {
filter = Expressions.toFilter(sourceRowSignature, filter0.getCondition());
filter = Expressions.toFilter(druidRel.getPlannerContext(), sourceRowSignature, filter0.getCondition());
if (filter == null) {
// Can't plan this filter.
return null;
@ -437,6 +435,7 @@ public class GroupByRules
} else {
final RexNode rexNode = Expressions.fromFieldAccess(sourceRowSignature, project, i);
final RowExtraction rex = Expressions.toRowExtraction(
druidRel.getPlannerContext(),
sourceRowSignature.getRowOrder(),
rexNode
);
@ -467,6 +466,7 @@ public class GroupByRules
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
final AggregateCall aggCall = aggregate.getAggCallList().get(i);
final Aggregation aggregation = translateAggregateCall(
druidRel.getPlannerContext(),
sourceRowSignature,
project,
aggCall,
@ -595,6 +595,7 @@ public class GroupByRules
Preconditions.checkState(canApplyHaving(druidRel), "Cannot applyHaving.");
final DimFilter dimFilter = Expressions.toFilter(
druidRel.getPlannerContext(),
druidRel.getOutputRowSignature(),
postFilter.getCondition()
);
@ -698,8 +699,10 @@ public class GroupByRules
throw new ISE("WTF?! Don't know what to do with direction[%s]", collation.getDirection());
}
if (SqlTypeName.NUMERIC_TYPES.contains(sortExpression.getType().getSqlTypeName())
|| SqlTypeName.DATETIME_TYPES.contains(sortExpression.getType().getSqlTypeName())) {
final SqlTypeName sortExpressionType = sortExpression.getType().getSqlTypeName();
if (SqlTypeName.NUMERIC_TYPES.contains(sortExpressionType)
|| SqlTypeName.TIMESTAMP == sortExpressionType
|| SqlTypeName.DATE == sortExpressionType) {
comparator = StringComparators.NUMERIC;
} else {
comparator = StringComparators.LEXICOGRAPHIC;
@ -724,6 +727,7 @@ public class GroupByRules
* @return translated aggregation, or null if translation failed.
*/
private static Aggregation translateAggregateCall(
final PlannerContext plannerContext,
final RowSignature sourceRowSignature,
final Project project,
final AggregateCall call,
@ -747,7 +751,7 @@ public class GroupByRules
}
final RexNode expression = project.getChildExps().get(call.filterArg);
final DimFilter filter = Expressions.toFilter(sourceRowSignature, expression);
final DimFilter filter = Expressions.toFilter(plannerContext, sourceRowSignature, expression);
if (filter == null) {
return null;
}
@ -763,6 +767,7 @@ public class GroupByRules
return approximateCountDistinct ? APPROX_COUNT_DISTINCT.toDruidAggregation(
name,
sourceRowSignature,
plannerContext,
existingAggregations,
project,
call,
@ -780,7 +785,7 @@ public class GroupByRules
final int inputField = Iterables.getOnlyElement(call.getArgList());
final RexNode rexNode = Expressions.fromFieldAccess(sourceRowSignature, project, inputField);
final FieldOrExpression foe = FieldOrExpression.fromRexNode(rowOrder, rexNode);
final FieldOrExpression foe = FieldOrExpression.fromRexNode(plannerContext, rowOrder, rexNode);
if (foe != null) {
input = foe;
@ -799,6 +804,7 @@ public class GroupByRules
// Operand 1: Filter
final DimFilter filter = Expressions.toFilter(
plannerContext,
sourceRowSignature,
caseCall.getOperands().get(0)
);
@ -825,7 +831,7 @@ public class GroupByRules
input = null;
} else if (RexLiteral.isNullLiteral(arg2)) {
// Maybe case A
input = FieldOrExpression.fromRexNode(rowOrder, arg1);
input = FieldOrExpression.fromRexNode(plannerContext, rowOrder, arg1);
if (input == null) {
return null;
}
@ -852,7 +858,8 @@ public class GroupByRules
final String expression = input.getExpression();
final boolean isLong = SqlTypeName.INT_TYPES.contains(outputType)
|| SqlTypeName.DATETIME_TYPES.contains(outputType);
|| SqlTypeName.TIMESTAMP == outputType
|| SqlTypeName.DATE == outputType;
if (kind == SqlKind.SUM || kind == SqlKind.SUM0) {
retVal = isLong
@ -897,6 +904,7 @@ public class GroupByRules
return sqlAggregator != null ? sqlAggregator.toDruidAggregation(
name,
sourceRowSignature,
plannerContext,
existingAggregations,
project,
call,

View File

@ -45,6 +45,11 @@ import java.util.List;
public class SelectRules
{
private static final List<RelOptRule> RULES = ImmutableList.of(
new DruidSelectProjectionRule(),
new DruidSelectSortRule()
);
private SelectRules()
{
// No instantiation.
@ -52,10 +57,7 @@ public class SelectRules
public static List<RelOptRule> rules()
{
return ImmutableList.of(
new DruidSelectProjectionRule(),
new DruidSelectSortRule()
);
return RULES;
}
static class DruidSelectProjectionRule extends RelOptRule
@ -93,6 +95,7 @@ public class SelectRules
for (int i = 0; i < project.getRowType().getFieldCount(); i++) {
final RexNode rexNode = project.getChildExps().get(i);
final RowExtraction rex = Expressions.toRowExtraction(
druidRel.getPlannerContext(),
sourceRowSignature.getRowOrder(),
rexNode
);

View File

@ -48,7 +48,6 @@ import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.column.ValueType;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.table.RowSignature;
import io.druid.timeline.DataSegment;
@ -75,7 +74,6 @@ public class DruidSchema extends AbstractSchema
private final TimelineServerView serverView;
private final PlannerConfig config;
private final ExecutorService cacheExec;
private final QueryMaker queryMaker;
private final ConcurrentMap<String, Table> tables;
// For awaitInitialization.
@ -101,7 +99,6 @@ public class DruidSchema extends AbstractSchema
this.serverView = Preconditions.checkNotNull(serverView, "serverView");
this.config = Preconditions.checkNotNull(config, "config");
this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d");
this.queryMaker = new QueryMaker(walker, config);
this.tables = Maps.newConcurrentMap();
}
@ -319,7 +316,6 @@ public class DruidSchema extends AbstractSchema
}
return new DruidTable(
queryMaker,
new TableDataSource(dataSource),
rowSignature.build()
);

View File

@ -21,11 +21,9 @@ package io.druid.sql.calcite.table;
import com.google.common.base.Preconditions;
import io.druid.query.DataSource;
import io.druid.sql.calcite.rel.DruidQueryRel;
import io.druid.sql.calcite.rel.QueryMaker;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.Schema;
@ -35,26 +33,18 @@ import org.apache.calcite.schema.TranslatableTable;
public class DruidTable implements TranslatableTable
{
private final QueryMaker queryMaker;
private final DataSource dataSource;
private final RowSignature rowSignature;
public DruidTable(
final QueryMaker queryMaker,
final DataSource dataSource,
final RowSignature rowSignature
)
{
this.queryMaker = Preconditions.checkNotNull(queryMaker, "queryMaker");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
}
public QueryMaker getQueryMaker()
{
return queryMaker;
}
public DataSource getDataSource()
{
return dataSource;
@ -86,8 +76,7 @@ public class DruidTable implements TranslatableTable
@Override
public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable table)
{
final RelOptCluster cluster = context.getCluster();
return DruidQueryRel.fullScan(cluster, table, this);
return LogicalTableScan.create(context.getCluster(), table);
}
@Override

View File

@ -22,17 +22,23 @@ package io.druid.sql.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
public class SqlQuery
{
private final String query;
private final Map<String, Object> context;
@JsonCreator
public SqlQuery(
@JsonProperty("query") final String query
@JsonProperty("query") final String query,
@JsonProperty("context") final Map<String, Object> context
)
{
this.query = Preconditions.checkNotNull(query, "query");
this.context = context == null ? ImmutableMap.<String, Object>of() : context;
}
@JsonProperty
@ -41,8 +47,14 @@ public class SqlQuery
return query;
}
@JsonProperty
public Map<String, Object> getContext()
{
return context;
}
@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
@ -51,15 +63,20 @@ public class SqlQuery
return false;
}
SqlQuery sqlQuery = (SqlQuery) o;
final SqlQuery sqlQuery = (SqlQuery) o;
return query != null ? query.equals(sqlQuery.query) : sqlQuery.query == null;
if (query != null ? !query.equals(sqlQuery.query) : sqlQuery.query != null) {
return false;
}
return context != null ? context.equals(sqlQuery.context) : sqlQuery.context == null;
}
@Override
public int hashCode()
{
return query != null ? query.hashCode() : 0;
int result = query != null ? query.hashCode() : 0;
result = 31 * result + (context != null ? context.hashCode() : 0);
return result;
}
@Override
@ -67,6 +84,7 @@ public class SqlQuery
{
return "SqlQuery{" +
"query='" + query + '\'' +
", context=" + context +
'}';
}
}

View File

@ -30,13 +30,14 @@ import io.druid.java.util.common.guava.Yielders;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.QueryInterruptedException;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.planner.PlannerResult;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Planner;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.ISODateTimeFormat;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
@ -78,9 +79,11 @@ public class SqlResource
// (Non-trivial since we don't know the dataSources up-front)
final PlannerResult plannerResult;
final DateTimeZone timeZone;
try (final Planner planner = plannerFactory.createPlanner()) {
plannerResult = Calcites.plan(planner, sqlQuery.getQuery());
try (final DruidPlanner planner = plannerFactory.createPlanner(sqlQuery.getContext())) {
plannerResult = planner.plan(sqlQuery.getQuery());
timeZone = planner.getPlannerContext().getTimeZone();
}
catch (Exception e) {
log.warn(e, "Failed to handle query: %s", sqlQuery);
@ -102,9 +105,11 @@ public class SqlResource
// Remember which columns are time-typed, so we can emit ISO8601 instead of millis values.
final List<RelDataTypeField> fieldList = plannerResult.rowType().getFieldList();
final boolean[] timeColumns = new boolean[fieldList.size()];
final boolean[] dateColumns = new boolean[fieldList.size()];
for (int i = 0; i < fieldList.size(); i++) {
final SqlTypeName sqlTypeName = fieldList.get(i).getType().getSqlTypeName();
timeColumns[i] = SqlTypeName.DATETIME_TYPES.contains(sqlTypeName);
timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
}
final Yielder<Object[]> yielder0 = Yielders.each(plannerResult.run());
@ -127,7 +132,13 @@ public class SqlResource
final Object value;
if (timeColumns[i]) {
value = new DateTime((long) row[i]);
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteTimestampToJoda((long) row[i], timeZone)
);
} else if (dateColumns[i]) {
value = ISODateTimeFormat.dateTime().print(
Calcites.calciteDateToJoda((int) row[i], timeZone)
);
} else {
value = row[i];
}

View File

@ -37,6 +37,7 @@ import org.apache.calcite.avatica.AvaticaClientRuntimeException;
import org.apache.calcite.schema.SchemaPlus;
import org.eclipse.jetty.server.Server;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -48,14 +49,17 @@ import org.junit.rules.TemporaryFolder;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
@ -88,6 +92,7 @@ public class DruidAvaticaHandlerTest
private SpecificSegmentsQuerySegmentWalker walker;
private Server server;
private Connection client;
private Connection clientLosAngeles;
@Before
public void setUp() throws Exception
@ -103,7 +108,7 @@ public class DruidAvaticaHandlerTest
);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final DruidAvaticaHandler handler = new DruidAvaticaHandler(
new DruidMeta(new PlannerFactory(rootSchema, operatorTable, plannerConfig), AVATICA_CONFIG),
new DruidMeta(new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig), AVATICA_CONFIG),
new DruidNode("dummy", "dummy", 1),
new AvaticaMonitor()
);
@ -117,16 +122,22 @@ public class DruidAvaticaHandlerTest
DruidAvaticaHandler.AVATICA_PATH
);
client = DriverManager.getConnection(url);
final Properties propertiesLosAngeles = new Properties();
propertiesLosAngeles.setProperty("sqlTimeZone", "America/Los_Angeles");
clientLosAngeles = DriverManager.getConnection(url, propertiesLosAngeles);
}
@After
public void tearDown() throws Exception
{
client.close();
clientLosAngeles.close();
server.stop();
walker.close();
walker = null;
client = null;
clientLosAngeles = null;
server = null;
}
@ -160,17 +171,43 @@ public class DruidAvaticaHandlerTest
public void testTimestampsInResponse() throws Exception
{
final ResultSet resultSet = client.createStatement().executeQuery(
"SELECT __time FROM druid.foo LIMIT 1"
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
);
Assert.assertEquals(
ImmutableList.of(
ImmutableMap.of("__time", new DateTime("2000-01-01T00:00:00.000Z").toDate())
ImmutableMap.of(
"__time", new Timestamp(new DateTime("2000-01-01T00:00:00.000Z").getMillis()),
"t2", new Date(new DateTime("2000-01-01").getMillis())
)
),
getRows(resultSet)
);
}
@Test
public void testTimestampsInResponseLosAngelesTimeZone() throws Exception
{
final ResultSet resultSet = clientLosAngeles.createStatement().executeQuery(
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1"
);
final DateTimeZone timeZone = DateTimeZone.forID("America/Los_Angeles");
final DateTime localDateTime = new DateTime("2000-01-01T00Z", timeZone);
final List<Map<String, Object>> resultRows = getRows(resultSet);
Assert.assertEquals(
ImmutableList.of(
ImmutableMap.of(
"__time", new Timestamp(Calcites.jodaToCalciteTimestamp(localDateTime, timeZone)),
"t2", new Date(Calcites.jodaToCalciteTimestamp(localDateTime.dayOfMonth().roundFloorCopy(), timeZone))
)
),
resultRows
);
}
@Test
public void testFieldAliasingSelect() throws Exception
{

View File

@ -65,7 +65,7 @@ public class DruidStatementTest
)
);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
plannerFactory = new PlannerFactory(rootSchema, operatorTable, plannerConfig);
plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig);
}
@After
@ -79,7 +79,7 @@ public class DruidStatementTest
public void testSignature() throws Exception
{
final String sql = "SELECT * FROM druid.foo";
final DruidStatement statement = new DruidStatement("", 0).prepare(plannerFactory, sql, -1);
final DruidStatement statement = new DruidStatement("", 0, null).prepare(plannerFactory, sql, -1);
// Check signature.
final Meta.Signature signature = statement.getSignature();
@ -117,7 +117,7 @@ public class DruidStatementTest
public void testSelectAllInFirstFrame() throws Exception
{
final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo";
final DruidStatement statement = new DruidStatement("", 0).prepare(plannerFactory, sql, -1);
final DruidStatement statement = new DruidStatement("", 0, null).prepare(plannerFactory, sql, -1);
// First frame, ask for all rows.
Meta.Frame frame = statement.execute().nextFrame(DruidStatement.START_OFFSET, 6);
@ -143,7 +143,7 @@ public class DruidStatementTest
public void testSelectSplitOverTwoFrames() throws Exception
{
final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo";
final DruidStatement statement = new DruidStatement("", 0).prepare(plannerFactory, sql, -1);
final DruidStatement statement = new DruidStatement("", 0, null).prepare(plannerFactory, sql, -1);
// First frame, ask for 2 rows.
Meta.Frame frame = statement.execute().nextFrame(DruidStatement.START_OFFSET, 2);

File diff suppressed because it is too large Load Diff

View File

@ -28,11 +28,15 @@ import io.druid.query.QueryInterruptedException;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.planner.PlannerFactory;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.QueryLogHook;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.sql.http.SqlQuery;
import io.druid.sql.http.SqlResource;
import org.apache.calcite.schema.SchemaPlus;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -56,28 +60,38 @@ public class SqlResourceTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
private SpecificSegmentsQuerySegmentWalker walker = null;
private SqlResource resource;
@Before
public void setUp() throws Exception
{
Calcites.setSystemProperties();
walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
final SchemaPlus rootSchema = Calcites.createRootSchema(
CalciteTests.createMockSchema(
CalciteTests.createMockWalker(temporaryFolder.newFolder()),
plannerConfig
)
CalciteTests.createMockSchema(walker, plannerConfig)
);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
resource = new SqlResource(JSON_MAPPER, new PlannerFactory(rootSchema, operatorTable, plannerConfig));
resource = new SqlResource(JSON_MAPPER, new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig));
}
@After
public void tearDown() throws Exception
{
walker.close();
walker = null;
}
@Test
public void testCountStar() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT COUNT(*) AS cnt FROM druid.foo")
new SqlQuery("SELECT COUNT(*) AS cnt FROM druid.foo", null)
);
Assert.assertEquals(
@ -92,12 +106,30 @@ public class SqlResourceTest
public void testTimestampsInResponse() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT __time FROM druid.foo LIMIT 1")
new SqlQuery("SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1", null)
);
Assert.assertEquals(
ImmutableList.of(
ImmutableMap.of("__time", "2000-01-01T00:00:00.000Z")
ImmutableMap.of("__time", "2000-01-01T00:00:00.000Z", "t2", "2000-01-01T00:00:00.000Z")
),
rows
);
}
@Test
public void testTimestampsInResponseLosAngelesTimeZone() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery(
"SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1",
ImmutableMap.<String, Object>of(PlannerContext.CTX_SQL_TIME_ZONE, "America/Los_Angeles")
)
);
Assert.assertEquals(
ImmutableList.of(
ImmutableMap.of("__time", "1999-12-31T16:00:00.000-08:00", "t2", "1999-12-31T00:00:00.000-08:00")
),
rows
);
@ -107,7 +139,7 @@ public class SqlResourceTest
public void testFieldAliasingSelect() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1")
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo LIMIT 1", null)
);
Assert.assertEquals(
@ -122,7 +154,7 @@ public class SqlResourceTest
public void testFieldAliasingGroupBy() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2")
new SqlQuery("SELECT dim2 \"x\", dim2 \"y\" FROM druid.foo GROUP BY dim2", null)
);
Assert.assertEquals(
@ -139,7 +171,7 @@ public class SqlResourceTest
public void testExplainCountStar() throws Exception
{
final List<Map<String, Object>> rows = doPost(
new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo")
new SqlQuery("EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo", null)
);
Assert.assertEquals(
@ -160,7 +192,7 @@ public class SqlResourceTest
expectedException.expectMessage("Column 'dim3' not found in any table");
doPost(
new SqlQuery("SELECT dim3 FROM druid.foo")
new SqlQuery("SELECT dim3 FROM druid.foo", null)
);
Assert.fail();
@ -173,7 +205,7 @@ public class SqlResourceTest
expectedException.expectMessage("Cannot build plan for query: SELECT TRIM(dim1) FROM druid.foo");
// TRIM unsupported
doPost(new SqlQuery("SELECT TRIM(dim1) FROM druid.foo"));
doPost(new SqlQuery("SELECT TRIM(dim1) FROM druid.foo", null));
Assert.fail();
}