mirror of https://github.com/apache/druid.git
SQL: Make row extractions extensible and add one for lookups. (#3991)
This is a reopening of #3989, since that PR was merged to master prematurely and accidentally.
This commit is contained in:
parent
bad250fe6d
commit
3216134f8c
|
@ -129,6 +129,16 @@ Druid's SQL language supports a number of time operations, including:
|
|||
By default, time operations use the UTC time zone. You can change the time zone for time operations by setting the
|
||||
connection context parameter "sqlTimeZone" to the name of the time zone, like "America/Los_Angeles".
|
||||
|
||||
### Query-time lookups
|
||||
|
||||
Druid [query-time lookups](lookups.html) can be accessed through the `LOOKUP(expression, lookupName)` function. The
|
||||
"lookupName" must refer to a lookup you have registered with Druid's lookup framework. For example, the following
|
||||
query can be used to perform a groupBy on looked-up values:
|
||||
|
||||
```sql
|
||||
SELECT LOOKUP(col, 'my_lookup') AS col_with_lookup FROM data_source GROUP BY LOOKUP(col, 'my_lookup')
|
||||
```
|
||||
|
||||
### Subqueries
|
||||
|
||||
Druid's SQL layer supports many types of subqueries, including the ones listed below.
|
||||
|
@ -231,7 +241,6 @@ language. Some unsupported SQL features include:
|
|||
Additionally, some Druid features are not supported by the SQL language. Some unsupported Druid features include:
|
||||
|
||||
- [Multi-value dimensions](multi-value-dimensions.html).
|
||||
- [Query-time lookups](lookups.html).
|
||||
- [DataSketches](../development/extensions-core/datasketches-aggregators.html).
|
||||
|
||||
## Third-party SQL libraries
|
||||
|
|
|
@ -33,6 +33,7 @@ import io.druid.sql.calcite.aggregation.Aggregations;
|
|||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.Expressions;
|
||||
import io.druid.sql.calcite.expression.RowExtraction;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
import org.apache.calcite.rel.core.AggregateCall;
|
||||
|
@ -64,6 +65,7 @@ public class QuantileSqlAggregator implements SqlAggregator
|
|||
public Aggregation toDruidAggregation(
|
||||
final String name,
|
||||
final RowSignature rowSignature,
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<Aggregation> existingAggregations,
|
||||
final Project project,
|
||||
|
@ -72,6 +74,7 @@ public class QuantileSqlAggregator implements SqlAggregator
|
|||
)
|
||||
{
|
||||
final RowExtraction rex = Expressions.toRowExtraction(
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
rowSignature.getRowOrder(),
|
||||
Expressions.fromFieldAccess(
|
||||
|
|
|
@ -43,6 +43,7 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.SqlExtractionOperator;
|
||||
import io.druid.sql.calcite.filtration.Filtration;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
|
@ -130,9 +131,8 @@ public class QuantileSqlAggregatorTest
|
|||
)
|
||||
);
|
||||
final DruidOperatorTable operatorTable = new DruidOperatorTable(
|
||||
ImmutableSet.<SqlAggregator>of(
|
||||
new QuantileSqlAggregator()
|
||||
)
|
||||
ImmutableSet.<SqlAggregator>of(new QuantileSqlAggregator()),
|
||||
ImmutableSet.<SqlExtractionOperator>of()
|
||||
);
|
||||
plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig);
|
||||
}
|
||||
|
|
|
@ -71,6 +71,11 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.easymock</groupId>
|
||||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
|
|
|
@ -33,6 +33,7 @@ import io.druid.segment.column.ValueType;
|
|||
import io.druid.sql.calcite.expression.Expressions;
|
||||
import io.druid.sql.calcite.expression.RowExtraction;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
import org.apache.calcite.rel.core.AggregateCall;
|
||||
|
@ -63,6 +64,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
|||
public Aggregation toDruidAggregation(
|
||||
final String name,
|
||||
final RowSignature rowSignature,
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<Aggregation> existingAggregations,
|
||||
final Project project,
|
||||
|
@ -76,6 +78,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
|||
Iterables.getOnlyElement(aggregateCall.getArgList())
|
||||
);
|
||||
final RowExtraction rex = Expressions.toRowExtraction(
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
rowSignature.getRowOrder(),
|
||||
rexNode
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.sql.calcite.aggregation;
|
||||
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
import org.apache.calcite.rel.core.AggregateCall;
|
||||
|
@ -46,6 +47,7 @@ public interface SqlAggregator
|
|||
*
|
||||
* @param name desired output name of the aggregation
|
||||
* @param rowSignature signature of the rows being aggregated
|
||||
* @param operatorTable Operator table that can be used to convert sub-expressions
|
||||
* @param plannerContext SQL planner context
|
||||
* @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
|
||||
* ignored if you do not want to re-use existing aggregations.
|
||||
|
@ -59,6 +61,7 @@ public interface SqlAggregator
|
|||
Aggregation toDruidAggregation(
|
||||
final String name,
|
||||
final RowSignature rowSignature,
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<Aggregation> existingAggregations,
|
||||
final Project project,
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.sql.calcite.expression;
|
||||
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
|
||||
public abstract class AbstractExpressionConversion implements ExpressionConversion
|
||||
{
|
||||
private final SqlKind kind;
|
||||
private final String operatorName;
|
||||
|
||||
public AbstractExpressionConversion(SqlKind kind)
|
||||
{
|
||||
this(kind, null);
|
||||
}
|
||||
|
||||
public AbstractExpressionConversion(SqlKind kind, String operatorName)
|
||||
{
|
||||
this.kind = kind;
|
||||
this.operatorName = operatorName;
|
||||
|
||||
if (kind == SqlKind.OTHER_FUNCTION && operatorName == null) {
|
||||
throw new NullPointerException("operatorName must be non-null for kind OTHER_FUNCTION");
|
||||
} else if (kind != SqlKind.OTHER_FUNCTION && operatorName != null) {
|
||||
throw new NullPointerException("operatorName must be non-null for kind " + kind);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlKind sqlKind()
|
||||
{
|
||||
return kind;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String operatorName()
|
||||
{
|
||||
return operatorName;
|
||||
}
|
||||
}
|
|
@ -20,37 +20,38 @@
|
|||
package io.druid.sql.calcite.expression;
|
||||
|
||||
import io.druid.query.extraction.StrlenExtractionFn;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.SqlFunction;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class CharLengthExpressionConversion extends AbstractExpressionConversion
|
||||
public class CharacterLengthExtractionOperator implements SqlExtractionOperator
|
||||
{
|
||||
private static final CharLengthExpressionConversion INSTANCE = new CharLengthExpressionConversion();
|
||||
|
||||
private CharLengthExpressionConversion()
|
||||
@Override
|
||||
public SqlFunction calciteFunction()
|
||||
{
|
||||
super(SqlKind.OTHER_FUNCTION, "CHAR_LENGTH");
|
||||
}
|
||||
|
||||
public static CharLengthExpressionConversion instance()
|
||||
{
|
||||
return INSTANCE;
|
||||
return SqlStdOperatorTable.CHAR_LENGTH;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowExtraction convert(
|
||||
final ExpressionConverter converter,
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<String> rowOrder,
|
||||
final RexNode expression
|
||||
)
|
||||
{
|
||||
final RexCall call = (RexCall) expression;
|
||||
final RowExtraction arg = converter.convert(plannerContext, rowOrder, call.getOperands().get(0));
|
||||
final RowExtraction arg = Expressions.toRowExtraction(
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
rowOrder,
|
||||
call.getOperands().get(0)
|
||||
);
|
||||
if (arg == null) {
|
||||
return null;
|
||||
}
|
|
@ -1,123 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.sql.calcite.expression;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.calcite.avatica.util.TimeUnitRange;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexInputRef;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ExpressionConverter
|
||||
{
|
||||
private final Map<SqlKind, ExpressionConversion> kindMap;
|
||||
private final Map<String, ExpressionConversion> otherFunctionMap;
|
||||
|
||||
private ExpressionConverter(
|
||||
Map<SqlKind, ExpressionConversion> kindMap,
|
||||
Map<String, ExpressionConversion> otherFunctionMap
|
||||
)
|
||||
{
|
||||
this.kindMap = kindMap;
|
||||
this.otherFunctionMap = otherFunctionMap;
|
||||
}
|
||||
|
||||
public static ExpressionConverter create(final List<ExpressionConversion> conversions)
|
||||
{
|
||||
final Map<SqlKind, ExpressionConversion> kindMap = Maps.newHashMap();
|
||||
final Map<String, ExpressionConversion> otherFunctionMap = Maps.newHashMap();
|
||||
|
||||
for (final ExpressionConversion conversion : conversions) {
|
||||
if (conversion.sqlKind() != SqlKind.OTHER_FUNCTION) {
|
||||
if (kindMap.put(conversion.sqlKind(), conversion) != null) {
|
||||
throw new ISE("Oops, can't have two conversions for sqlKind[%s]", conversion.sqlKind());
|
||||
}
|
||||
} else {
|
||||
// kind is OTHER_FUNCTION
|
||||
if (otherFunctionMap.put(conversion.operatorName(), conversion) != null) {
|
||||
throw new ISE(
|
||||
"Oops, can't have two conversions for sqlKind[%s], operatorName[%s]",
|
||||
conversion.sqlKind(),
|
||||
conversion.operatorName()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new ExpressionConverter(kindMap, otherFunctionMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate a row-expression to a Druid row extraction. Note that this signature will probably need to change
|
||||
* once we support extractions from multiple columns.
|
||||
*
|
||||
* @param plannerContext SQL planner context
|
||||
* @param rowOrder order of fields in the Druid rows to be extracted from
|
||||
* @param expression expression meant to be applied on top of the table
|
||||
*
|
||||
* @return (columnName, extractionFn) or null
|
||||
*/
|
||||
public RowExtraction convert(PlannerContext plannerContext, List<String> rowOrder, RexNode expression)
|
||||
{
|
||||
if (expression.getKind() == SqlKind.INPUT_REF) {
|
||||
final RexInputRef ref = (RexInputRef) expression;
|
||||
final String columnName = rowOrder.get(ref.getIndex());
|
||||
if (columnName == null) {
|
||||
throw new ISE("WTF?! Expression referred to nonexistent index[%d]", ref.getIndex());
|
||||
}
|
||||
|
||||
return RowExtraction.of(columnName, null);
|
||||
} else if (expression.getKind() == SqlKind.CAST) {
|
||||
final RexNode operand = ((RexCall) expression).getOperands().get(0);
|
||||
if (expression.getType().getSqlTypeName() == SqlTypeName.DATE
|
||||
&& operand.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP) {
|
||||
// Handling casting TIMESTAMP to DATE by flooring to DAY.
|
||||
return FloorExpressionConversion.applyTimestampFloor(
|
||||
convert(plannerContext, rowOrder, operand),
|
||||
TimeUnits.toQueryGranularity(TimeUnitRange.DAY, plannerContext.getTimeZone())
|
||||
);
|
||||
} else {
|
||||
// Ignore other casts.
|
||||
// TODO(gianm): Probably not a good idea to ignore other CASTs like this.
|
||||
return convert(plannerContext, rowOrder, ((RexCall) expression).getOperands().get(0));
|
||||
}
|
||||
} else {
|
||||
// Try conversion using an ExpressionConversion specific to this operator.
|
||||
final RowExtraction retVal;
|
||||
|
||||
if (expression.getKind() == SqlKind.OTHER_FUNCTION) {
|
||||
final ExpressionConversion conversion = otherFunctionMap.get(((RexCall) expression).getOperator().getName());
|
||||
retVal = conversion != null ? conversion.convert(this, plannerContext, rowOrder, expression) : null;
|
||||
} else {
|
||||
final ExpressionConversion conversion = kindMap.get(expression.getKind());
|
||||
retVal = conversion != null ? conversion.convert(this, plannerContext, rowOrder, expression) : null;
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,6 @@ package io.druid.sql.calcite.expression;
|
|||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -51,8 +50,10 @@ import io.druid.sql.calcite.filtration.BoundRefKey;
|
|||
import io.druid.sql.calcite.filtration.Bounds;
|
||||
import io.druid.sql.calcite.filtration.Filtration;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
import org.apache.calcite.avatica.util.TimeUnitRange;
|
||||
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
|
||||
import org.apache.calcite.rel.core.Project;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
|
@ -74,15 +75,6 @@ import java.util.Map;
|
|||
*/
|
||||
public class Expressions
|
||||
{
|
||||
private static final ExpressionConverter EXPRESSION_CONVERTER = ExpressionConverter.create(
|
||||
ImmutableList.<ExpressionConversion>of(
|
||||
CharLengthExpressionConversion.instance(),
|
||||
ExtractExpressionConversion.instance(),
|
||||
FloorExpressionConversion.instance(),
|
||||
SubstringExpressionConversion.instance()
|
||||
)
|
||||
);
|
||||
|
||||
private static final Map<String, String> MATH_FUNCTIONS = ImmutableMap.<String, String>builder()
|
||||
.put("ABS", "abs")
|
||||
.put("CEIL", "ceil")
|
||||
|
@ -153,17 +145,58 @@ public class Expressions
|
|||
* @return RowExtraction or null if not possible
|
||||
*/
|
||||
public static RowExtraction toRowExtraction(
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<String> rowOrder,
|
||||
final RexNode expression
|
||||
)
|
||||
{
|
||||
return EXPRESSION_CONVERTER.convert(plannerContext, rowOrder, expression);
|
||||
if (expression.getKind() == SqlKind.INPUT_REF) {
|
||||
final RexInputRef ref = (RexInputRef) expression;
|
||||
final String columnName = rowOrder.get(ref.getIndex());
|
||||
if (columnName == null) {
|
||||
throw new ISE("WTF?! Expression referred to nonexistent index[%d]", ref.getIndex());
|
||||
}
|
||||
|
||||
return RowExtraction.of(columnName, null);
|
||||
} else if (expression.getKind() == SqlKind.CAST) {
|
||||
final RexNode operand = ((RexCall) expression).getOperands().get(0);
|
||||
if (expression.getType().getSqlTypeName() == SqlTypeName.DATE
|
||||
&& operand.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP) {
|
||||
// Handling casting TIMESTAMP to DATE by flooring to DAY.
|
||||
return FloorExtractionOperator.applyTimestampFloor(
|
||||
toRowExtraction(operatorTable, plannerContext, rowOrder, operand),
|
||||
TimeUnits.toQueryGranularity(TimeUnitRange.DAY, plannerContext.getTimeZone())
|
||||
);
|
||||
} else {
|
||||
// Ignore other casts.
|
||||
// TODO(gianm): Probably not a good idea to ignore other CASTs like this.
|
||||
return toRowExtraction(operatorTable, plannerContext, rowOrder, ((RexCall) expression).getOperands().get(0));
|
||||
}
|
||||
} else {
|
||||
// Try conversion using a SqlExtractionOperator.
|
||||
final RowExtraction retVal;
|
||||
|
||||
if (expression instanceof RexCall) {
|
||||
final SqlExtractionOperator extractionOperator = operatorTable.lookupExtractionOperator(
|
||||
expression.getKind(),
|
||||
((RexCall) expression).getOperator().getName()
|
||||
);
|
||||
|
||||
retVal = extractionOperator != null
|
||||
? extractionOperator.convert(operatorTable, plannerContext, rowOrder, expression)
|
||||
: null;
|
||||
} else {
|
||||
retVal = null;
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate a Calcite row-expression to a Druid PostAggregator. One day, when possible, this could be folded
|
||||
* into {@link #toRowExtraction(PlannerContext, List, RexNode)}.
|
||||
* into {@link #toRowExtraction(DruidOperatorTable, PlannerContext, List, RexNode)} .
|
||||
*
|
||||
* @param name name of the PostAggregator
|
||||
* @param rowOrder order of fields in the Druid rows to be extracted from
|
||||
|
@ -241,7 +274,7 @@ public class Expressions
|
|||
|
||||
/**
|
||||
* Translate a row-expression to a Druid math expression. One day, when possible, this could be folded into
|
||||
* {@link #toRowExtraction(PlannerContext, List, RexNode)}.
|
||||
* {@link #toRowExtraction(DruidOperatorTable, PlannerContext, List, RexNode)}.
|
||||
*
|
||||
* @param rowOrder order of fields in the Druid rows to be extracted from
|
||||
* @param expression expression meant to be applied on top of the rows
|
||||
|
@ -367,6 +400,7 @@ public class Expressions
|
|||
* @param expression Calcite row expression
|
||||
*/
|
||||
public static DimFilter toFilter(
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final RowSignature rowSignature,
|
||||
final RexNode expression
|
||||
|
@ -377,7 +411,7 @@ public class Expressions
|
|||
|| expression.getKind() == SqlKind.NOT) {
|
||||
final List<DimFilter> filters = Lists.newArrayList();
|
||||
for (final RexNode rexNode : ((RexCall) expression).getOperands()) {
|
||||
final DimFilter nextFilter = toFilter(plannerContext, rowSignature, rexNode);
|
||||
final DimFilter nextFilter = toFilter(operatorTable, plannerContext, rowSignature, rexNode);
|
||||
if (nextFilter == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -394,7 +428,7 @@ public class Expressions
|
|||
}
|
||||
} else {
|
||||
// Handle filter conditions on everything else.
|
||||
return toLeafFilter(plannerContext, rowSignature, expression);
|
||||
return toLeafFilter(operatorTable, plannerContext, rowSignature, expression);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -407,6 +441,7 @@ public class Expressions
|
|||
* @param expression Calcite row expression
|
||||
*/
|
||||
private static DimFilter toLeafFilter(
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final RowSignature rowSignature,
|
||||
final RexNode expression
|
||||
|
@ -422,7 +457,8 @@ public class Expressions
|
|||
|
||||
if (kind == SqlKind.LIKE) {
|
||||
final List<RexNode> operands = ((RexCall) expression).getOperands();
|
||||
final RowExtraction rex = EXPRESSION_CONVERTER.convert(
|
||||
final RowExtraction rex = toRowExtraction(
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
rowSignature.getRowOrder(),
|
||||
operands.get(0)
|
||||
|
@ -462,7 +498,7 @@ public class Expressions
|
|||
}
|
||||
|
||||
// lhs must be translatable to a RowExtraction to be filterable
|
||||
final RowExtraction rex = EXPRESSION_CONVERTER.convert(plannerContext, rowSignature.getRowOrder(), lhs);
|
||||
final RowExtraction rex = toRowExtraction(operatorTable, plannerContext, rowSignature.getRowOrder(), lhs);
|
||||
if (rex == null || !rex.isFilterable(rowSignature)) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -22,32 +22,28 @@ package io.druid.sql.calcite.expression;
|
|||
import io.druid.java.util.common.granularity.Granularity;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.extraction.TimeFormatExtractionFn;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.calcite.avatica.util.TimeUnitRange;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexLiteral;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.SqlFunction;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ExtractExpressionConversion extends AbstractExpressionConversion
|
||||
public class ExtractExtractionOperator implements SqlExtractionOperator
|
||||
{
|
||||
private static final ExtractExpressionConversion INSTANCE = new ExtractExpressionConversion();
|
||||
|
||||
private ExtractExpressionConversion()
|
||||
@Override
|
||||
public SqlFunction calciteFunction()
|
||||
{
|
||||
super(SqlKind.EXTRACT);
|
||||
}
|
||||
|
||||
public static ExtractExpressionConversion instance()
|
||||
{
|
||||
return INSTANCE;
|
||||
return SqlStdOperatorTable.EXTRACT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowExtraction convert(
|
||||
final ExpressionConverter converter,
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<String> rowOrder,
|
||||
final RexNode expression
|
||||
|
@ -59,7 +55,7 @@ public class ExtractExpressionConversion extends AbstractExpressionConversion
|
|||
final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
|
||||
final RexNode expr = call.getOperands().get(1);
|
||||
|
||||
final RowExtraction rex = converter.convert(plannerContext, rowOrder, expr);
|
||||
final RowExtraction rex = Expressions.toRowExtraction(operatorTable, plannerContext, rowOrder, expr);
|
||||
if (rex == null) {
|
||||
return null;
|
||||
}
|
|
@ -21,29 +21,19 @@ package io.druid.sql.calcite.expression;
|
|||
|
||||
import io.druid.java.util.common.granularity.Granularity;
|
||||
import io.druid.query.extraction.BucketExtractionFn;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.calcite.avatica.util.TimeUnitRange;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexLiteral;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.SqlFunction;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class FloorExpressionConversion extends AbstractExpressionConversion
|
||||
public class FloorExtractionOperator implements SqlExtractionOperator
|
||||
{
|
||||
private static final FloorExpressionConversion INSTANCE = new FloorExpressionConversion();
|
||||
|
||||
private FloorExpressionConversion()
|
||||
{
|
||||
super(SqlKind.FLOOR);
|
||||
}
|
||||
|
||||
public static FloorExpressionConversion instance()
|
||||
{
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
public static RowExtraction applyTimestampFloor(
|
||||
final RowExtraction rex,
|
||||
final Granularity queryGranularity
|
||||
|
@ -62,9 +52,15 @@ public class FloorExpressionConversion extends AbstractExpressionConversion
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlFunction calciteFunction()
|
||||
{
|
||||
return SqlStdOperatorTable.FLOOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowExtraction convert(
|
||||
final ExpressionConverter converter,
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<String> rowOrder,
|
||||
final RexNode expression
|
||||
|
@ -73,7 +69,7 @@ public class FloorExpressionConversion extends AbstractExpressionConversion
|
|||
final RexCall call = (RexCall) expression;
|
||||
final RexNode arg = call.getOperands().get(0);
|
||||
|
||||
final RowExtraction rex = converter.convert(plannerContext, rowOrder, arg);
|
||||
final RowExtraction rex = Expressions.toRowExtraction(operatorTable, plannerContext, rowOrder, arg);
|
||||
if (rex == null) {
|
||||
return null;
|
||||
} else if (call.getOperands().size() == 1) {
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.sql.calcite.expression;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.query.lookup.LookupReferencesManager;
|
||||
import io.druid.query.lookup.RegisteredLookupExtractionFn;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexLiteral;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlFunction;
|
||||
import org.apache.calcite.sql.SqlFunctionCategory;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.type.OperandTypes;
|
||||
import org.apache.calcite.sql.type.ReturnTypes;
|
||||
import org.apache.calcite.sql.type.SqlTypeFamily;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class LookupExtractionOperator implements SqlExtractionOperator
|
||||
{
|
||||
private static final String NAME = "LOOKUP";
|
||||
private static final SqlFunction SQL_FUNCTION = new LookupSqlFunction();
|
||||
|
||||
private final LookupReferencesManager lookupReferencesManager;
|
||||
|
||||
@Inject
|
||||
public LookupExtractionOperator(final LookupReferencesManager lookupReferencesManager)
|
||||
{
|
||||
this.lookupReferencesManager = lookupReferencesManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlFunction calciteFunction()
|
||||
{
|
||||
return SQL_FUNCTION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowExtraction convert(
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<String> rowOrder,
|
||||
final RexNode expression
|
||||
)
|
||||
{
|
||||
final RexCall call = (RexCall) expression;
|
||||
final RowExtraction rex = Expressions.toRowExtraction(
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
rowOrder,
|
||||
call.getOperands().get(0)
|
||||
);
|
||||
if (rex == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String lookupName = RexLiteral.stringValue(call.getOperands().get(1));
|
||||
final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn(
|
||||
lookupReferencesManager,
|
||||
lookupName,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
true
|
||||
);
|
||||
|
||||
return RowExtraction.of(
|
||||
rex.getColumn(),
|
||||
ExtractionFns.compose(extractionFn, rex.getExtractionFn())
|
||||
);
|
||||
}
|
||||
|
||||
private static class LookupSqlFunction extends SqlFunction
|
||||
{
|
||||
private static final String SIGNATURE = "'" + NAME + "(expression, lookupName)'\n";
|
||||
|
||||
LookupSqlFunction()
|
||||
{
|
||||
super(
|
||||
NAME,
|
||||
SqlKind.OTHER_FUNCTION,
|
||||
ReturnTypes.explicit(SqlTypeName.VARCHAR),
|
||||
null,
|
||||
OperandTypes.and(
|
||||
OperandTypes.sequence(SIGNATURE, OperandTypes.CHARACTER, OperandTypes.LITERAL),
|
||||
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)
|
||||
),
|
||||
SqlFunctionCategory.STRING
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,41 +19,36 @@
|
|||
|
||||
package io.druid.sql.calcite.expression;
|
||||
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.SqlFunction;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface ExpressionConversion
|
||||
public interface SqlExtractionOperator
|
||||
{
|
||||
/**
|
||||
* SQL kind that this converter knows how to convert.
|
||||
* Returns the SQL operator corresponding to this aggregation function. Should be a singleton.
|
||||
*
|
||||
* @return sql kind
|
||||
* @return operator
|
||||
*/
|
||||
SqlKind sqlKind();
|
||||
SqlFunction calciteFunction();
|
||||
|
||||
/**
|
||||
* Operator name, if {@link #sqlKind()} is {@code OTHER_FUNCTION}.
|
||||
* Returns the Druid {@link RowExtraction} corresponding to a SQL {@code RexNode}.
|
||||
*
|
||||
* @return operator name, or null
|
||||
*/
|
||||
String operatorName();
|
||||
|
||||
/**
|
||||
* Translate a row-expression to a Druid column reference. Note that this signature will probably need to change
|
||||
* once we support extractions from multiple columns.
|
||||
*
|
||||
* @param converter converter that can be used to convert sub-expressions
|
||||
* @param operatorTable Operator table that can be used to convert sub-expressions
|
||||
* @param plannerContext SQL planner context
|
||||
* @param rowOrder order of fields in the Druid rows to be extracted from
|
||||
* @param expression expression meant to be applied on top of the table
|
||||
*
|
||||
* @return (columnName, extractionFn) or null
|
||||
*
|
||||
* @see ExpressionConversion#convert(ExpressionConverter, PlannerContext, List, RexNode)
|
||||
*/
|
||||
RowExtraction convert(
|
||||
ExpressionConverter converter,
|
||||
DruidOperatorTable operatorTable,
|
||||
PlannerContext plannerContext,
|
||||
List<String> rowOrder,
|
||||
RexNode expression
|
|
@ -20,38 +20,39 @@
|
|||
package io.druid.sql.calcite.expression;
|
||||
|
||||
import io.druid.query.extraction.SubstringDimExtractionFn;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexLiteral;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.SqlFunction;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class SubstringExpressionConversion extends AbstractExpressionConversion
|
||||
public class SubstringExtractionOperator implements SqlExtractionOperator
|
||||
{
|
||||
private static final SubstringExpressionConversion INSTANCE = new SubstringExpressionConversion();
|
||||
|
||||
private SubstringExpressionConversion()
|
||||
@Override
|
||||
public SqlFunction calciteFunction()
|
||||
{
|
||||
super(SqlKind.OTHER_FUNCTION, "SUBSTRING");
|
||||
}
|
||||
|
||||
public static SubstringExpressionConversion instance()
|
||||
{
|
||||
return INSTANCE;
|
||||
return SqlStdOperatorTable.SUBSTRING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowExtraction convert(
|
||||
final ExpressionConverter converter,
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<String> rowOrder,
|
||||
final RexNode expression
|
||||
)
|
||||
{
|
||||
final RexCall call = (RexCall) expression;
|
||||
final RowExtraction arg = converter.convert(plannerContext, rowOrder, call.getOperands().get(0));
|
||||
final RowExtraction arg = Expressions.toRowExtraction(
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
rowOrder,
|
||||
call.getOperands().get(0)
|
||||
);
|
||||
if (arg == null) {
|
||||
return null;
|
||||
}
|
|
@ -68,7 +68,7 @@ public class DruidConvertletTable implements SqlRexConvertletTable
|
|||
{
|
||||
if (call.getKind() == SqlKind.EXTRACT && call.getOperandList().get(1).getKind() != SqlKind.LITERAL) {
|
||||
// Avoid using the standard convertlet for EXTRACT(TIMEUNIT FROM col), since we want to handle it directly
|
||||
// in ExtractExpressionConversion.
|
||||
// in ExtractExtractionOperator.
|
||||
return BYPASS_CONVERTLET;
|
||||
} else {
|
||||
final SqlRexConvertlet convertlet = table.get(call.getOperator());
|
||||
|
|
|
@ -23,8 +23,10 @@ import com.google.common.collect.Maps;
|
|||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.SqlExtractionOperator;
|
||||
import org.apache.calcite.sql.SqlFunctionCategory;
|
||||
import org.apache.calcite.sql.SqlIdentifier;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.SqlOperator;
|
||||
import org.apache.calcite.sql.SqlOperatorTable;
|
||||
import org.apache.calcite.sql.SqlSyntax;
|
||||
|
@ -40,17 +42,28 @@ public class DruidOperatorTable implements SqlOperatorTable
|
|||
private static final SqlStdOperatorTable STANDARD_TABLE = SqlStdOperatorTable.instance();
|
||||
|
||||
private final Map<String, SqlAggregator> aggregators;
|
||||
private final Map<String, SqlExtractionOperator> extractionOperators;
|
||||
|
||||
@Inject
|
||||
public DruidOperatorTable(
|
||||
final Set<SqlAggregator> aggregators
|
||||
final Set<SqlAggregator> aggregators,
|
||||
final Set<SqlExtractionOperator> extractionOperators
|
||||
)
|
||||
{
|
||||
this.aggregators = Maps.newHashMap();
|
||||
this.extractionOperators = Maps.newHashMap();
|
||||
|
||||
for (SqlAggregator aggregator : aggregators) {
|
||||
final String lcname = aggregator.calciteFunction().getName().toLowerCase();
|
||||
if (this.aggregators.put(lcname, aggregator) != null) {
|
||||
throw new ISE("Cannot have two aggregators with name[%s]", lcname);
|
||||
throw new ISE("Cannot have two operators with name[%s]", lcname);
|
||||
}
|
||||
}
|
||||
|
||||
for (SqlExtractionOperator extractionFunction : extractionOperators) {
|
||||
final String lcname = extractionFunction.calciteFunction().getName().toLowerCase();
|
||||
if (this.aggregators.containsKey(lcname) || this.extractionOperators.put(lcname, extractionFunction) != null) {
|
||||
throw new ISE("Cannot have two operators with name[%s]", lcname);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -60,6 +73,16 @@ public class DruidOperatorTable implements SqlOperatorTable
|
|||
return aggregators.get(opName.toLowerCase());
|
||||
}
|
||||
|
||||
public SqlExtractionOperator lookupExtractionOperator(final SqlKind kind, final String opName)
|
||||
{
|
||||
final SqlExtractionOperator extractionOperator = extractionOperators.get(opName.toLowerCase());
|
||||
if (extractionOperator != null && extractionOperator.calciteFunction().getKind() == kind) {
|
||||
return extractionOperator;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lookupOperatorOverloads(
|
||||
final SqlIdentifier opName,
|
||||
|
@ -68,12 +91,18 @@ public class DruidOperatorTable implements SqlOperatorTable
|
|||
final List<SqlOperator> operatorList
|
||||
)
|
||||
{
|
||||
if (opName.names.size() == 1) {
|
||||
if (opName.names.size() == 1 && syntax == SqlSyntax.FUNCTION) {
|
||||
final SqlAggregator aggregator = aggregators.get(opName.getSimple().toLowerCase());
|
||||
if (aggregator != null && syntax == SqlSyntax.FUNCTION) {
|
||||
if (aggregator != null) {
|
||||
operatorList.add(aggregator.calciteFunction());
|
||||
}
|
||||
|
||||
final SqlExtractionOperator extractionFunction = extractionOperators.get(opName.getSimple().toLowerCase());
|
||||
if (extractionFunction != null) {
|
||||
operatorList.add(extractionFunction.calciteFunction());
|
||||
}
|
||||
}
|
||||
|
||||
STANDARD_TABLE.lookupOperatorOverloads(opName, category, syntax, operatorList);
|
||||
}
|
||||
|
||||
|
@ -84,6 +113,9 @@ public class DruidOperatorTable implements SqlOperatorTable
|
|||
for (SqlAggregator aggregator : aggregators.values()) {
|
||||
retVal.add(aggregator.calciteFunction());
|
||||
}
|
||||
for (SqlExtractionOperator extractionFunction : extractionOperators.values()) {
|
||||
retVal.add(extractionFunction.calciteFunction());
|
||||
}
|
||||
retVal.addAll(STANDARD_TABLE.getOperatorList());
|
||||
return retVal;
|
||||
}
|
||||
|
|
|
@ -215,13 +215,13 @@ public class Rules
|
|||
|
||||
// Druid-specific rules.
|
||||
rules.add(new DruidTableScanRule(queryMaker));
|
||||
rules.add(DruidFilterRule.instance());
|
||||
rules.add(new DruidFilterRule(operatorTable));
|
||||
|
||||
if (plannerConfig.getMaxSemiJoinRowsInMemory() > 0) {
|
||||
rules.add(DruidSemiJoinRule.instance());
|
||||
}
|
||||
|
||||
rules.addAll(SelectRules.rules());
|
||||
rules.addAll(SelectRules.rules(operatorTable));
|
||||
rules.addAll(GroupByRules.rules(operatorTable));
|
||||
|
||||
return rules.build();
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.sql.calcite.rule;
|
|||
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.sql.calcite.expression.Expressions;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.rel.DruidRel;
|
||||
import org.apache.calcite.plan.RelOptRule;
|
||||
import org.apache.calcite.plan.RelOptRuleCall;
|
||||
|
@ -28,16 +29,12 @@ import org.apache.calcite.rel.core.Filter;
|
|||
|
||||
public class DruidFilterRule extends RelOptRule
|
||||
{
|
||||
private static final DruidFilterRule INSTANCE = new DruidFilterRule();
|
||||
private final DruidOperatorTable operatorTable;
|
||||
|
||||
private DruidFilterRule()
|
||||
public DruidFilterRule(final DruidOperatorTable operatorTable)
|
||||
{
|
||||
super(operand(Filter.class, operand(DruidRel.class, none())));
|
||||
}
|
||||
|
||||
public static DruidFilterRule instance()
|
||||
{
|
||||
return INSTANCE;
|
||||
this.operatorTable = operatorTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -53,6 +50,7 @@ public class DruidFilterRule extends RelOptRule
|
|||
}
|
||||
|
||||
final DimFilter dimFilter = Expressions.toFilter(
|
||||
operatorTable,
|
||||
druidRel.getPlannerContext(),
|
||||
druidRel.getSourceRowSignature(),
|
||||
filter.getCondition()
|
||||
|
|
|
@ -94,7 +94,7 @@ public class GroupByRules
|
|||
new DruidAggregateProjectRule(operatorTable),
|
||||
new DruidAggregateProjectFilterRule(operatorTable),
|
||||
new DruidGroupByPostAggregationRule(),
|
||||
new DruidGroupByHavingRule(),
|
||||
new DruidGroupByHavingRule(operatorTable),
|
||||
new DruidGroupByLimitRule()
|
||||
);
|
||||
}
|
||||
|
@ -116,12 +116,13 @@ public class GroupByRules
|
|||
}
|
||||
|
||||
public static FieldOrExpression fromRexNode(
|
||||
final DruidOperatorTable operatorTable,
|
||||
final PlannerContext plannerContext,
|
||||
final List<String> rowOrder,
|
||||
final RexNode rexNode
|
||||
)
|
||||
{
|
||||
final RowExtraction rex = Expressions.toRowExtraction(plannerContext, rowOrder, rexNode);
|
||||
final RowExtraction rex = Expressions.toRowExtraction(operatorTable, plannerContext, rowOrder, rexNode);
|
||||
if (rex != null && rex.getExtractionFn() == null) {
|
||||
// This was a simple field access.
|
||||
return fieldName(rex.getColumn());
|
||||
|
@ -302,9 +303,12 @@ public class GroupByRules
|
|||
|
||||
public static class DruidGroupByHavingRule extends RelOptRule
|
||||
{
|
||||
private DruidGroupByHavingRule()
|
||||
private final DruidOperatorTable operatorTable;
|
||||
|
||||
private DruidGroupByHavingRule(final DruidOperatorTable operatorTable)
|
||||
{
|
||||
super(operand(Filter.class, operand(DruidRel.class, none())));
|
||||
this.operatorTable = operatorTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -319,7 +323,7 @@ public class GroupByRules
|
|||
{
|
||||
final Filter postFilter = call.rel(0);
|
||||
final DruidRel druidRel = call.rel(1);
|
||||
final DruidRel newDruidRel = GroupByRules.applyHaving(druidRel, postFilter);
|
||||
final DruidRel newDruidRel = GroupByRules.applyHaving(operatorTable, druidRel, postFilter);
|
||||
if (newDruidRel != null) {
|
||||
call.transformTo(newDruidRel);
|
||||
}
|
||||
|
@ -395,7 +399,12 @@ public class GroupByRules
|
|||
// Filter that should be applied before aggregating.
|
||||
final DimFilter filter;
|
||||
if (filter0 != null) {
|
||||
filter = Expressions.toFilter(druidRel.getPlannerContext(), sourceRowSignature, filter0.getCondition());
|
||||
filter = Expressions.toFilter(
|
||||
operatorTable,
|
||||
druidRel.getPlannerContext(),
|
||||
sourceRowSignature,
|
||||
filter0.getCondition()
|
||||
);
|
||||
if (filter == null) {
|
||||
// Can't plan this filter.
|
||||
return null;
|
||||
|
@ -435,6 +444,7 @@ public class GroupByRules
|
|||
} else {
|
||||
final RexNode rexNode = Expressions.fromFieldAccess(sourceRowSignature, project, i);
|
||||
final RowExtraction rex = Expressions.toRowExtraction(
|
||||
operatorTable,
|
||||
druidRel.getPlannerContext(),
|
||||
sourceRowSignature.getRowOrder(),
|
||||
rexNode
|
||||
|
@ -590,11 +600,16 @@ public class GroupByRules
|
|||
*
|
||||
* @return new rel, or null if the filter cannot be applied
|
||||
*/
|
||||
private static DruidRel applyHaving(final DruidRel druidRel, final Filter postFilter)
|
||||
private static DruidRel applyHaving(
|
||||
final DruidOperatorTable operatorTable,
|
||||
final DruidRel druidRel,
|
||||
final Filter postFilter
|
||||
)
|
||||
{
|
||||
Preconditions.checkState(canApplyHaving(druidRel), "Cannot applyHaving.");
|
||||
|
||||
final DimFilter dimFilter = Expressions.toFilter(
|
||||
operatorTable,
|
||||
druidRel.getPlannerContext(),
|
||||
druidRel.getOutputRowSignature(),
|
||||
postFilter.getCondition()
|
||||
|
@ -751,7 +766,7 @@ public class GroupByRules
|
|||
}
|
||||
|
||||
final RexNode expression = project.getChildExps().get(call.filterArg);
|
||||
final DimFilter filter = Expressions.toFilter(plannerContext, sourceRowSignature, expression);
|
||||
final DimFilter filter = Expressions.toFilter(operatorTable, plannerContext, sourceRowSignature, expression);
|
||||
if (filter == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -767,6 +782,7 @@ public class GroupByRules
|
|||
return approximateCountDistinct ? APPROX_COUNT_DISTINCT.toDruidAggregation(
|
||||
name,
|
||||
sourceRowSignature,
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
existingAggregations,
|
||||
project,
|
||||
|
@ -785,7 +801,7 @@ public class GroupByRules
|
|||
|
||||
final int inputField = Iterables.getOnlyElement(call.getArgList());
|
||||
final RexNode rexNode = Expressions.fromFieldAccess(sourceRowSignature, project, inputField);
|
||||
final FieldOrExpression foe = FieldOrExpression.fromRexNode(plannerContext, rowOrder, rexNode);
|
||||
final FieldOrExpression foe = FieldOrExpression.fromRexNode(operatorTable, plannerContext, rowOrder, rexNode);
|
||||
|
||||
if (foe != null) {
|
||||
input = foe;
|
||||
|
@ -804,6 +820,7 @@ public class GroupByRules
|
|||
|
||||
// Operand 1: Filter
|
||||
final DimFilter filter = Expressions.toFilter(
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
sourceRowSignature,
|
||||
caseCall.getOperands().get(0)
|
||||
|
@ -831,7 +848,7 @@ public class GroupByRules
|
|||
input = null;
|
||||
} else if (RexLiteral.isNullLiteral(arg2)) {
|
||||
// Maybe case A
|
||||
input = FieldOrExpression.fromRexNode(plannerContext, rowOrder, arg1);
|
||||
input = FieldOrExpression.fromRexNode(operatorTable, plannerContext, rowOrder, arg1);
|
||||
if (input == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -904,6 +921,7 @@ public class GroupByRules
|
|||
return sqlAggregator != null ? sqlAggregator.toDruidAggregation(
|
||||
name,
|
||||
sourceRowSignature,
|
||||
operatorTable,
|
||||
plannerContext,
|
||||
existingAggregations,
|
||||
project,
|
||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.segment.column.ValueType;
|
|||
import io.druid.sql.calcite.expression.Expressions;
|
||||
import io.druid.sql.calcite.expression.RowExtraction;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.rel.DruidRel;
|
||||
import io.druid.sql.calcite.rel.SelectProjection;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
|
@ -45,26 +46,27 @@ import java.util.List;
|
|||
|
||||
public class SelectRules
|
||||
{
|
||||
private static final List<RelOptRule> RULES = ImmutableList.of(
|
||||
new DruidSelectProjectionRule(),
|
||||
new DruidSelectSortRule()
|
||||
);
|
||||
|
||||
private SelectRules()
|
||||
{
|
||||
// No instantiation.
|
||||
}
|
||||
|
||||
public static List<RelOptRule> rules()
|
||||
public static List<RelOptRule> rules(final DruidOperatorTable operatorTable)
|
||||
{
|
||||
return RULES;
|
||||
return ImmutableList.of(
|
||||
new DruidSelectProjectionRule(operatorTable),
|
||||
new DruidSelectSortRule()
|
||||
);
|
||||
}
|
||||
|
||||
static class DruidSelectProjectionRule extends RelOptRule
|
||||
{
|
||||
private DruidSelectProjectionRule()
|
||||
private final DruidOperatorTable operatorTable;
|
||||
|
||||
public DruidSelectProjectionRule(final DruidOperatorTable operatorTable)
|
||||
{
|
||||
super(operand(Project.class, operand(DruidRel.class, none())));
|
||||
this.operatorTable = operatorTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -95,6 +97,7 @@ public class SelectRules
|
|||
for (int i = 0; i < project.getRowType().getFieldCount(); i++) {
|
||||
final RexNode rexNode = project.getChildExps().get(i);
|
||||
final RowExtraction rex = Expressions.toRowExtraction(
|
||||
operatorTable,
|
||||
druidRel.getPlannerContext(),
|
||||
sourceRowSignature.getRowOrder(),
|
||||
rexNode
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.sql.guice;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.multibindings.Multibinder;
|
||||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.SqlExtractionOperator;
|
||||
|
||||
public class SqlBindings
|
||||
{
|
||||
|
@ -33,4 +34,13 @@ public class SqlBindings
|
|||
final Multibinder<SqlAggregator> setBinder = Multibinder.newSetBinder(binder, SqlAggregator.class);
|
||||
setBinder.addBinding().to(aggregatorClass);
|
||||
}
|
||||
|
||||
public static void addExtractionOperator(
|
||||
final Binder binder,
|
||||
final Class<? extends SqlExtractionOperator> clazz
|
||||
)
|
||||
{
|
||||
final Multibinder<SqlExtractionOperator> setBinder = Multibinder.newSetBinder(binder, SqlExtractionOperator.class);
|
||||
setBinder.addBinding().to(clazz);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.sql.guice;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Module;
|
||||
|
@ -34,16 +35,36 @@ import io.druid.sql.avatica.AvaticaMonitor;
|
|||
import io.druid.sql.avatica.AvaticaServerConfig;
|
||||
import io.druid.sql.avatica.DruidAvaticaHandler;
|
||||
import io.druid.sql.calcite.aggregation.ApproxCountDistinctSqlAggregator;
|
||||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.CharacterLengthExtractionOperator;
|
||||
import io.druid.sql.calcite.expression.ExtractExtractionOperator;
|
||||
import io.druid.sql.calcite.expression.FloorExtractionOperator;
|
||||
import io.druid.sql.calcite.expression.LookupExtractionOperator;
|
||||
import io.druid.sql.calcite.expression.SqlExtractionOperator;
|
||||
import io.druid.sql.calcite.expression.SubstringExtractionOperator;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
import io.druid.sql.calcite.schema.DruidSchema;
|
||||
import io.druid.sql.http.SqlResource;
|
||||
import org.apache.calcite.schema.SchemaPlus;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
public class SqlModule implements Module
|
||||
{
|
||||
public static final List<Class<? extends SqlAggregator>> DEFAULT_AGGREGATOR_CLASSES = ImmutableList.<Class<? extends SqlAggregator>>of(
|
||||
ApproxCountDistinctSqlAggregator.class
|
||||
);
|
||||
|
||||
public static final List<Class<? extends SqlExtractionOperator>> DEFAULT_EXTRACTION_OPERATOR_CLASSES = ImmutableList.<Class<? extends SqlExtractionOperator>>of(
|
||||
CharacterLengthExtractionOperator.class,
|
||||
ExtractExtractionOperator.class,
|
||||
FloorExtractionOperator.class,
|
||||
LookupExtractionOperator.class,
|
||||
SubstringExtractionOperator.class
|
||||
);
|
||||
|
||||
private static final String PROPERTY_SQL_ENABLE = "druid.sql.enable";
|
||||
private static final String PROPERTY_SQL_ENABLE_JSON_OVER_HTTP = "druid.sql.http.enable";
|
||||
private static final String PROPERTY_SQL_ENABLE_AVATICA = "druid.sql.avatica.enable";
|
||||
|
@ -64,7 +85,14 @@ public class SqlModule implements Module
|
|||
JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.sql.avatica", AvaticaServerConfig.class);
|
||||
LifecycleModule.register(binder, DruidSchema.class);
|
||||
SqlBindings.addAggregator(binder, ApproxCountDistinctSqlAggregator.class);
|
||||
|
||||
for (Class<? extends SqlAggregator> clazz : DEFAULT_AGGREGATOR_CLASSES) {
|
||||
SqlBindings.addAggregator(binder, clazz);
|
||||
}
|
||||
|
||||
for (Class<? extends SqlExtractionOperator> clazz : DEFAULT_EXTRACTION_OPERATOR_CLASSES) {
|
||||
SqlBindings.addExtractionOperator(binder, clazz);
|
||||
}
|
||||
|
||||
if (isJsonOverHttpEnabled()) {
|
||||
Jerseys.addResource(binder, SqlResource.class);
|
||||
|
|
|
@ -68,6 +68,7 @@ import io.druid.query.groupby.GroupByQuery;
|
|||
import io.druid.query.groupby.having.DimFilterHavingSpec;
|
||||
import io.druid.query.groupby.orderby.DefaultLimitSpec;
|
||||
import io.druid.query.groupby.orderby.OrderByColumnSpec;
|
||||
import io.druid.query.lookup.RegisteredLookupExtractionFn;
|
||||
import io.druid.query.ordering.StringComparator;
|
||||
import io.druid.query.ordering.StringComparators;
|
||||
import io.druid.query.select.PagingSpec;
|
||||
|
@ -3116,6 +3117,94 @@ public class CalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterAndGroupByLookup() throws Exception
|
||||
{
|
||||
final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn(
|
||||
null,
|
||||
"lookyloo",
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
true
|
||||
);
|
||||
|
||||
testQuery(
|
||||
"SELECT LOOKUP(dim1, 'lookyloo'), COUNT(*) FROM foo\n"
|
||||
+ "WHERE LOOKUP(dim1, 'lookyloo') <> 'xxx'\n"
|
||||
+ "GROUP BY LOOKUP(dim1, 'lookyloo')",
|
||||
ImmutableList.<Query>of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(CalciteTests.DATASOURCE1)
|
||||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimFilter(
|
||||
NOT(SELECTOR(
|
||||
"dim1",
|
||||
"xxx",
|
||||
extractionFn
|
||||
))
|
||||
)
|
||||
.setDimensions(
|
||||
DIMS(
|
||||
new ExtractionDimensionSpec(
|
||||
"dim1",
|
||||
"d0",
|
||||
ValueType.STRING,
|
||||
extractionFn
|
||||
)
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
AGGS(
|
||||
new CountAggregatorFactory("a0")
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{"", 5L},
|
||||
new Object[]{"xabc", 1L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountDistinctOfLookup() throws Exception
|
||||
{
|
||||
final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn(
|
||||
null,
|
||||
"lookyloo",
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
true
|
||||
);
|
||||
|
||||
testQuery(
|
||||
"SELECT COUNT(DISTINCT LOOKUP(dim1, 'lookyloo')) FROM foo",
|
||||
ImmutableList.<Query>of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(QSS(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.aggregators(AGGS(
|
||||
new CardinalityAggregatorFactory(
|
||||
"a0",
|
||||
ImmutableList.<DimensionSpec>of(new ExtractionDimensionSpec("dim1", null, extractionFn)),
|
||||
false
|
||||
)
|
||||
))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{2L}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeseries() throws Exception
|
||||
{
|
||||
|
|
|
@ -24,7 +24,10 @@ import com.google.common.base.Suppliers;
|
|||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
|
@ -42,10 +45,15 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.extraction.MapLookupExtractor;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryRunnerTest;
|
||||
import io.druid.query.groupby.strategy.GroupByStrategySelector;
|
||||
import io.druid.query.lookup.LookupExtractor;
|
||||
import io.druid.query.lookup.LookupExtractorFactory;
|
||||
import io.druid.query.lookup.LookupIntrospectHandler;
|
||||
import io.druid.query.lookup.LookupReferencesManager;
|
||||
import io.druid.query.metadata.SegmentMetadataQueryConfig;
|
||||
import io.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
|
||||
import io.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
|
||||
|
@ -67,19 +75,24 @@ import io.druid.segment.IndexBuilder;
|
|||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.sql.calcite.aggregation.ApproxCountDistinctSqlAggregator;
|
||||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.SqlExtractionOperator;
|
||||
import io.druid.sql.calcite.planner.DruidOperatorTable;
|
||||
import io.druid.sql.calcite.planner.PlannerConfig;
|
||||
import io.druid.sql.calcite.schema.DruidSchema;
|
||||
import io.druid.sql.guice.SqlModule;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.LinearShardSpec;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Utility functions for Calcite tests.
|
||||
|
@ -269,7 +282,78 @@ public class CalciteTests
|
|||
|
||||
public static DruidOperatorTable createOperatorTable()
|
||||
{
|
||||
return new DruidOperatorTable(ImmutableSet.<SqlAggregator>of(new ApproxCountDistinctSqlAggregator()));
|
||||
try {
|
||||
final Injector injector = Guice.createInjector(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(final Binder binder)
|
||||
{
|
||||
// This Module is just to get a LookupReferencesManager with a usable "lookyloo" lookup.
|
||||
|
||||
final LookupReferencesManager mock = EasyMock.createMock(LookupReferencesManager.class);
|
||||
EasyMock.expect(mock.get(EasyMock.eq("lookyloo"))).andReturn(
|
||||
new LookupExtractorFactory()
|
||||
{
|
||||
@Override
|
||||
public boolean start()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean close()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean replaces(@Nullable final LookupExtractorFactory other)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public LookupIntrospectHandler getIntrospectHandler()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupExtractor get()
|
||||
{
|
||||
return new MapLookupExtractor(
|
||||
ImmutableMap.of(
|
||||
"a", "xa",
|
||||
"abc", "xabc"
|
||||
),
|
||||
false
|
||||
);
|
||||
}
|
||||
}
|
||||
).anyTimes();
|
||||
EasyMock.replay(mock);
|
||||
binder.bind(LookupReferencesManager.class).toInstance(mock);
|
||||
}
|
||||
}
|
||||
);
|
||||
final Set<SqlAggregator> aggregators = new HashSet<>();
|
||||
final Set<SqlExtractionOperator> extractionOperators = new HashSet<>();
|
||||
|
||||
for (Class<? extends SqlAggregator> clazz : SqlModule.DEFAULT_AGGREGATOR_CLASSES) {
|
||||
aggregators.add(injector.getInstance(clazz));
|
||||
}
|
||||
|
||||
for (Class<? extends SqlExtractionOperator> clazz : SqlModule.DEFAULT_EXTRACTION_OPERATOR_CLASSES) {
|
||||
extractionOperators.add(injector.getInstance(clazz));
|
||||
}
|
||||
|
||||
return new DruidOperatorTable(aggregators, extractionOperators);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static DruidSchema createMockSchema(
|
||||
|
|
Loading…
Reference in New Issue