Add initial SQL support for non-expression sketch postaggs (#8487)

* Add initial SQL support for non-expression sketch postaggs

* Checkstyle, spotbugs

* checkstyle

* imports

* Update SQL docs

* Checkstyle

* Fix theta sketch operator docs

* PR comments

* Checkstyle fixes

* Add missing entries for HLL sketch module

* PR comments, add round param to HLL estimate operator, fix optional HLL param
This commit is contained in:
Jonathan Wei 2019-10-18 14:59:44 -07:00 committed by GitHub
parent 30c15900be
commit d88075237a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 3047 additions and 200 deletions

View File

@ -67,8 +67,26 @@ druid.extensions.loadList=["druid-datasketches"]
### Post Aggregators
#### Estimate
Returns the distinct count estimate as a double.
```
{
"type" : "HLLSketchEstimate",
"name": <output name>,
"field" : <post aggregator that returns an HLL Sketch>,
"round" : <if true, round the estimate. Default is false>
}
```
#### Estimate with bounds
Returns a distinct count estimate and error bounds from an HLL sketch.
The result will be an array containing three double values: estimate, lower bound and upper bound.
The bounds are provided at a given number of standard deviations (optional, defaults to 1).
This must be an integer value of 1, 2 or 3 corresponding to approximately 68.3%, 95.4% and 99.7% confidence intervals.
```
{
"type" : "HLLSketchEstimateWithBounds",
@ -92,7 +110,7 @@ druid.extensions.loadList=["druid-datasketches"]
#### Sketch to string
Human-readable sketch summary for debugging
Human-readable sketch summary for debugging.
```
{
@ -100,5 +118,4 @@ Human-readable sketch summary for debugging
"name": <output name>,
"field" : <post aggregator that returns an HLL Sketch>
}
```

View File

@ -180,9 +180,12 @@ Only the COUNT aggregation can accept DISTINCT.
|`APPROX_COUNT_DISTINCT(expr)`|Counts distinct values of expr, which can be a regular column or a hyperUnique column. This is always approximate, regardless of the value of "useApproximateCountDistinct". This uses Druid's built-in "cardinality" or "hyperUnique" aggregators. See also `COUNT(DISTINCT expr)`.|
|`APPROX_COUNT_DISTINCT_DS_HLL(expr, [lgK, tgtHllType])`|Counts distinct values of expr, which can be a regular column or an [HLL sketch](../development/extensions-core/datasketches-hll.html) column. The `lgK` and `tgtHllType` parameters are described in the HLL sketch documentation. This is always approximate, regardless of the value of "useApproximateCountDistinct". See also `COUNT(DISTINCT expr)`. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.|
|`APPROX_COUNT_DISTINCT_DS_THETA(expr, [size])`|Counts distinct values of expr, which can be a regular column or a [Theta sketch](../development/extensions-core/datasketches-theta.html) column. The `size` parameter is described in the Theta sketch documentation. This is always approximate, regardless of the value of "useApproximateCountDistinct". See also `COUNT(DISTINCT expr)`. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.|
|`DS_HLL(expr, [lgK, tgtHllType])`|Creates an [HLL sketch](../development/extensions-core/datasketches-hll.html) on the values of expr, which can be a regular column or a column containing HLL sketches. The `lgK` and `tgtHllType` parameters are described in the HLL sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.|
|`DS_THETA(expr, [size])`|Creates a [Theta sketch](../development/extensions-core/datasketches-theta.html) on the values of expr, which can be a regular column or a column containing Theta sketches. The `size` parameter is described in the Theta sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.|
|`APPROX_QUANTILE(expr, probability, [resolution])`|Computes approximate quantiles on numeric or [approxHistogram](../development/extensions-core/approximate-histograms.html#approximate-histogram-aggregator) exprs. The "probability" should be between 0 and 1 (exclusive). The "resolution" is the number of centroids to use for the computation. Higher resolutions will give more precise results but also have higher overhead. If not provided, the default resolution is 50. The [approximate histogram extension](../development/extensions-core/approximate-histograms.html) must be loaded to use this function.|
|`APPROX_QUANTILE_DS(expr, probability, [k])`|Computes approximate quantiles on numeric or [Quantiles sketch](../development/extensions-core/datasketches-quantiles.html) exprs. The "probability" should be between 0 and 1 (exclusive). The `k` parameter is described in the Quantiles sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.|
|`APPROX_QUANTILE_FIXED_BUCKETS(expr, probability, numBuckets, lowerLimit, upperLimit, [outlierHandlingMode])`|Computes approximate quantiles on numeric or [fixed buckets histogram](../development/extensions-core/approximate-histograms.html#fixed-buckets-histogram) exprs. The "probability" should be between 0 and 1 (exclusive). The `numBuckets`, `lowerLimit`, `upperLimit`, and `outlierHandlingMode` parameters are described in the fixed buckets histogram documentation. The [approximate histogram extension](../development/extensions-core/approximate-histograms.html) must be loaded to use this function.|
|`DS_QUANTILES_SKETCH(expr, [k])`|Creates a [Quantiles sketch](../development/extensions-core/datasketches-quantiles.html) on the values of expr, which can be a regular column or a column containing quantiles sketches. The `k` parameter is described in the Quantiles sketch documentation. The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use this function.|
|`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced by `expr`, with `numEntries` maximum number of distinct values before false positive rate increases. See [bloom filter extension](../development/extensions-core/bloom-filter.html) documentation for additional details.|
|`TDIGEST_QUANTILE(expr, quantileFraction, [compression])`|Builds a T-Digest sketch on values produced by `expr` and returns the value for the quantile. Compression parameter (default value 100) determines the accuracy and size of the sketch. Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.|
|`TDIGEST_GENERATE_SKETCH(expr, [compression])`|Builds a T-Digest sketch on values produced by `expr`. Compression parameter (default value 100) determines the accuracy and size of the sketch Higher compression means higher accuracy but more space to store sketches. See [t-digest extension](../development/extensions-contrib/tdigestsketch-quantiles.html) documentation for additional details.|
@ -363,6 +366,44 @@ All 'array' references in the multi-value string function documentation can refe
| `MV_TO_STRING(arr,str)` | joins all elements of arr by the delimiter specified by str |
| `STRING_TO_MV(str1,str2)` | splits str1 into an array on the delimiter specified by str2 |
### Sketch operators
These functions operate on expressions or columns that return sketch objects.
#### HLL sketch operators
The following functions operate on [DataSketches HLL sketches](../development/extensions-core/datasketches-hll.html).
The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use the following functions.
|Function|Notes|
|--------|-----|
|`HLL_SKETCH_ESTIMATE(expr, [round])`|Returns the distinct count estimate from an HLL sketch. `expr` must return an HLL sketch. The optional `round` boolean parameter will round the estimate if set to `true`, with a default of `false`.|
|`HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(expr, [numStdDev])`|Returns the distinct count estimate and error bounds from an HLL sketch. `expr` must return an HLL sketch. An optional `numStdDev` argument can be provided.|
|`HLL_SKETCH_UNION([lgK, tgtHllType], expr0, expr1, ...)`|Returns a union of HLL sketches, where each input expression must return an HLL sketch. The `lgK` and `tgtHllType` can be optionally specified as the first parameter; if provided, both optional parameters must be specified.|
|`HLL_SKETCH_TO_STRING(expr)`|Returns a human-readable string representation of an HLL sketch for debugging. `expr` must return an HLL sketch.|
#### Theta sketch operators
The following functions operate on [theta sketches](../development/extensions-core/datasketches-theta.html).
The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use the following functions.
|Function|Notes|
|--------|-----|
|`THETA_SKETCH_ESTIMATE(expr)`|Returns the distinct count estimate from a theta sketch. `expr` must return a theta sketch.|
|`THETA_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(expr, errorBoundsStdDev)`|Returns the distinct count estimate and error bounds from a theta sketch. `expr` must return a theta sketch.|
|`THETA_SKETCH_UNION([size], expr0, expr1, ...)`|Returns a union of theta sketches, where each input expression must return a theta sketch. The `size` can be optionally specified as the first parameter.|
|`THETA_SKETCH_INTERSECT([size], expr0, expr1, ...)`|Returns an intersection of theta sketches, where each input expression must return a theta sketch. The `size` can be optionally specified as the first parameter.|
|`THETA_SKETCH_NOT([size], expr0, expr1, ...)`|Returns a set difference of theta sketches, where each input expression must return a theta sketch. The `size` can be optionally specified as the first parameter.|
#### Quantiles sketch operators
The following functions operate on [quantiles sketches](../development/extensions-core/datasketches-quantiles.html).
The [DataSketches extension](../development/extensions-core/datasketches-extension.html) must be loaded to use the following functions.
|Function|Notes|
|--------|-----|
|`DS_GET_QUANTILE(expr, fraction)`|Returns the quantile estimate corresponding to `fraction` from a quantiles sketch. `expr` must return a quantiles sketch.|
### Other functions
|Function|Notes|
@ -588,8 +629,6 @@ Connection context can be specified as JDBC connection properties or as a "conte
|`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the Broker (default: true)|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.html) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.html) will be used instead.|druid.sql.planner.useApproximateTopN on the Broker (default: true)|
## Metadata tables
Druid Brokers infer table and column metadata for each datasource from segments loaded in the cluster, and use this to

View File

@ -40,6 +40,7 @@ import java.util.Objects;
*/
public abstract class HllSketchAggregatorFactory extends AggregatorFactory
{
public static final boolean DEFAULT_ROUND = false;
public static final int DEFAULT_LG_K = 12;
public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4;

View File

@ -26,7 +26,12 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import com.yahoo.sketches.hll.HllSketch;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchEstimateOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchEstimateWithErrorBoundsOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchObjectSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchSetUnionOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchToStringOperatorConversion;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings;
@ -46,12 +51,20 @@ public class HllSketchModule implements DruidModule
public static final String TO_STRING_TYPE_NAME = "HLLSketchToString";
public static final String UNION_TYPE_NAME = "HLLSketchUnion";
public static final String ESTIMATE_WITH_BOUNDS_TYPE_NAME = "HLLSketchEstimateWithBounds";
public static final String ESTIMATE_TYPE_NAME = "HLLSketchEstimate";
@Override
public void configure(final Binder binder)
{
registerSerde();
SqlBindings.addAggregator(binder, HllSketchSqlAggregator.class);
SqlBindings.addAggregator(binder, HllSketchApproxCountDistinctSqlAggregator.class);
SqlBindings.addAggregator(binder, HllSketchObjectSqlAggregator.class);
SqlBindings.addOperatorConversion(binder, HllSketchEstimateOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HllSketchEstimateWithErrorBoundsOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HllSketchSetUnionOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HllSketchToStringOperatorConversion.class);
}
@Override
@ -64,7 +77,8 @@ public class HllSketchModule implements DruidModule
new NamedType(HllSketchMergeAggregatorFactory.class, TYPE_NAME),
new NamedType(HllSketchToStringPostAggregator.class, TO_STRING_TYPE_NAME),
new NamedType(HllSketchUnionPostAggregator.class, UNION_TYPE_NAME),
new NamedType(HllSketchToEstimateWithBoundsPostAggregator.class, ESTIMATE_WITH_BOUNDS_TYPE_NAME)
new NamedType(HllSketchToEstimateWithBoundsPostAggregator.class, ESTIMATE_WITH_BOUNDS_TYPE_NAME),
new NamedType(HllSketchToEstimatePostAggregator.class, ESTIMATE_TYPE_NAME)
).addSerializer(HllSketch.class, new HllSketchJsonSerializer())
);
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.yahoo.sketches.hll.HllSketch;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
import org.apache.druid.query.cache.CacheKeyBuilder;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Returns a distinct count estimate a from a given {@link HllSketch}.
* The result will be a double value.
*/
public class HllSketchToEstimatePostAggregator implements PostAggregator
{
private final String name;
private final PostAggregator field;
private final boolean round;
@JsonCreator
public HllSketchToEstimatePostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field,
@JsonProperty("round") boolean round
)
{
this.name = name;
this.field = field;
this.round = round;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public PostAggregator getField()
{
return field;
}
@JsonProperty
public boolean isRound()
{
return round;
}
@Override
public Set<String> getDependentFields()
{
return field.getDependentFields();
}
@Override
public Comparator<Double> getComparator()
{
return ArithmeticPostAggregator.DEFAULT_COMPARATOR;
}
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
final HllSketch sketch = (HllSketch) field.compute(combinedAggregators);
return round ? Math.round(sketch.getEstimate()) : sketch.getEstimate();
}
@Override
public PostAggregator decorate(final Map<String, AggregatorFactory> aggregators)
{
return this;
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"name='" + name + '\'' +
", field=" + field +
"}";
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof HllSketchToEstimatePostAggregator)) {
return false;
}
final HllSketchToEstimatePostAggregator that = (HllSketchToEstimatePostAggregator) o;
if (!name.equals(that.name)) {
return false;
}
return field.equals(that.field);
}
@Override
public int hashCode()
{
return Objects.hash(name, field);
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(PostAggregatorIds.HLL_SKETCH_TO_ESTIMATE_CACHE_TYPE_ID)
.appendCacheable(field)
.build();
}
}

View File

@ -43,6 +43,7 @@ import java.util.Set;
*/
public class HllSketchToEstimateWithBoundsPostAggregator implements PostAggregator
{
public static final int DEFAULT_NUM_STD_DEVS = 1;
private final String name;
private final PostAggregator field;
@ -57,7 +58,7 @@ public class HllSketchToEstimateWithBoundsPostAggregator implements PostAggregat
{
this.name = name;
this.field = field;
this.numStdDevs = numStdDevs == null ? 1 : numStdDevs;
this.numStdDevs = numStdDevs == null ? DEFAULT_NUM_STD_DEVS : numStdDevs;
}
@Override

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll.sql;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import java.util.Collections;
import java.util.List;
public class HllSketchApproxCountDistinctSqlAggregator extends HllSketchBaseSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchApproxCountDistinctSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL";
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Override
protected Aggregation toAggregation(
String name,
boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
)
{
return Aggregation.create(
virtualColumns,
Collections.singletonList(aggregatorFactory),
finalizeAggregations ? new FinalizingFieldAccessPostAggregator(
name,
aggregatorFactory.getName()
) : null
);
}
private static class HllSketchApproxCountDistinctSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE = "'" + NAME + "(column, lgK, tgtHllType)'\n";
HllSketchApproxCountDistinctSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.BIGINT),
InferTypes.VARCHAR_1024,
OperandTypes.or(
OperandTypes.ANY,
OperandTypes.and(
OperandTypes.sequence(SIGNATURE, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL),
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING)
)
),
SqlFunctionCategory.NUMERIC,
false,
false
);
}
}
}

View File

@ -24,20 +24,13 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.VirtualColumn;
@ -53,21 +46,12 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class HllSketchSqlAggregator implements SqlAggregator
public abstract class HllSketchBaseSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL";
private static final boolean ROUND = true;
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Nullable
@Override
public Aggregation toDruidAggregation(
@ -174,39 +158,18 @@ public class HllSketchSqlAggregator implements SqlAggregator
);
}
return Aggregation.create(
return toAggregation(
name,
finalizeAggregations,
virtualColumns,
Collections.singletonList(aggregatorFactory),
finalizeAggregations ? new FinalizingFieldAccessPostAggregator(
name,
aggregatorFactory.getName()
) : null
aggregatorFactory
);
}
private static class HllSketchSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE = "'" + NAME + "(column, lgK, tgtHllType)'\n";
HllSketchSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.BIGINT),
InferTypes.VARCHAR_1024,
OperandTypes.or(
OperandTypes.ANY,
OperandTypes.and(
OperandTypes.sequence(SIGNATURE, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL),
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING)
)
),
SqlFunctionCategory.NUMERIC,
false,
false
);
}
}
protected abstract Aggregation toAggregation(
String name,
boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
);
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimatePostAggregator;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
public class HllSketchEstimateOperatorConversion extends DirectOperatorConversion
{
private static final String FUNCTION_NAME = "HLL_SKETCH_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
.operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.BOOLEAN)
.requiredOperands(1)
.returnTypeInference(ReturnTypes.DOUBLE)
.build();
public HllSketchEstimateOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final PostAggregator firstOperand = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
);
if (firstOperand == null) {
return null;
}
boolean round = HllSketchAggregatorFactory.DEFAULT_ROUND;
if (operands.size() == 2) {
if (!operands.get(1).isA(SqlKind.LITERAL)) {
return null;
}
round = RexLiteral.booleanValue(operands.get(1));
}
return new HllSketchToEstimatePostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
firstOperand,
round
);
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
public class HllSketchEstimateWithErrorBoundsOperatorConversion extends DirectOperatorConversion
{
private static final String FUNCTION_NAME = "HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
.operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER)
.requiredOperands(1)
.returnType(SqlTypeName.OTHER)
.build();
public HllSketchEstimateWithErrorBoundsOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final PostAggregator firstOperand = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
);
if (firstOperand == null) {
return null;
}
Integer numStdDev = null;
if (operands.size() == 2) {
if (!operands.get(1).isA(SqlKind.LITERAL)) {
return null;
}
numStdDev = ((Number) RexLiteral.value(operands.get(1))).intValue();
}
return new HllSketchToEstimateWithBoundsPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
firstOperand,
numStdDev
);
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll.sql;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import java.util.Collections;
import java.util.List;
public class HllSketchObjectSqlAggregator extends HllSketchBaseSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchSqlAggFunction();
private static final String NAME = "DS_HLL";
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Override
protected Aggregation toAggregation(
String name,
boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
)
{
return Aggregation.create(
virtualColumns,
Collections.singletonList(aggregatorFactory),
null
);
}
private static class HllSketchSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE = "'" + NAME + "(column, lgK, tgtHllType)'\n";
HllSketchSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.OTHER),
InferTypes.VARCHAR_1024,
OperandTypes.or(
OperandTypes.ANY,
OperandTypes.and(
OperandTypes.sequence(SIGNATURE, OperandTypes.ANY, OperandTypes.LITERAL, OperandTypes.LITERAL),
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING)
)
),
SqlFunctionCategory.USER_DEFINED_FUNCTION,
false,
false
);
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchUnionPostAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public class HllSketchSetUnionOperatorConversion implements SqlOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = new SqlFunction(
"HLL_SKETCH_UNION",
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(
factory -> Calcites.createSqlType(factory, SqlTypeName.OTHER)
),
null,
OperandTypes.variadic(SqlOperandCountRanges.from(2)),
SqlFunctionCategory.USER_DEFINED_FUNCTION
);
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Nullable
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final List<PostAggregator> inputPostAggs = new ArrayList<>();
Integer lgK = null;
String tgtHllType = null;
int operandCounter = 0;
for (RexNode operand : operands) {
final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operand,
postAggregatorVisitor
);
if (convertedPostAgg == null) {
if (operandCounter == 0) {
if (!operand.isA(SqlKind.LITERAL)) {
return null;
}
try {
lgK = RexLiteral.intValue(operand);
}
catch (ClassCastException re) {
return null;
}
} else if (operandCounter == 1) {
if (!operand.isA(SqlKind.LITERAL)) {
return null;
}
// both lgK and tgtHllType must be specified
if (lgK == null) {
return null;
}
try {
tgtHllType = RexLiteral.stringValue(operand);
}
catch (ClassCastException re) {
return null;
}
} else {
return null;
}
} else {
inputPostAggs.add(convertedPostAgg);
operandCounter++;
}
}
return new HllSketchUnionPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
inputPostAggs,
lgK,
tgtHllType
);
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.hll.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToStringPostAggregator;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
public class HllSketchToStringOperatorConversion extends DirectOperatorConversion
{
private static final String FUNCTION_NAME = "HLL_SKETCH_TO_STRING";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
.operandTypes(SqlTypeFamily.ANY)
.returnType(SqlTypeName.VARCHAR)
.build();
public HllSketchToStringOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final PostAggregator firstOperand = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
);
if (firstOperand == null) {
return null;
}
return new HllSketchToStringPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
firstOperand
);
}
}

View File

@ -26,7 +26,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import com.yahoo.sketches.quantiles.DoublesSketch;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchApproxQuantileSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchObjectSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchQuantileOperatorConversion;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings;
@ -50,7 +52,10 @@ public class DoublesSketchModule implements DruidModule
public void configure(final Binder binder)
{
registerSerde();
SqlBindings.addAggregator(binder, DoublesSketchSqlAggregator.class);
SqlBindings.addAggregator(binder, DoublesSketchApproxQuantileSqlAggregator.class);
SqlBindings.addAggregator(binder, DoublesSketchObjectSqlAggregator.class);
SqlBindings.addOperatorConversion(binder, DoublesSketchQuantileOperatorConversion.class);
}
@Override

View File

@ -51,9 +51,9 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public class DoublesSketchSqlAggregator implements SqlAggregator
public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new DoublesSketchSqlAggFunction();
private static final SqlAggFunction FUNCTION_INSTANCE = new DoublesSketchApproxQuantileSqlAggFunction();
private static final String NAME = "APPROX_QUANTILE_DS";
@Override
@ -206,12 +206,12 @@ public class DoublesSketchSqlAggregator implements SqlAggregator
);
}
private static class DoublesSketchSqlAggFunction extends SqlAggFunction
private static class DoublesSketchApproxQuantileSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE1 = "'" + NAME + "(column, probability)'\n";
private static final String SIGNATURE2 = "'" + NAME + "(column, probability, k)'\n";
DoublesSketchSqlAggFunction()
DoublesSketchApproxQuantileSqlAggFunction()
{
super(
NAME,

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles.sql;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public class DoublesSketchObjectSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new DoublesSketchSqlAggFunction();
private static final String NAME = "DS_QUANTILES_SKETCH";
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Nullable
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final DruidExpression input = Expressions.toDruidExpression(
plannerContext,
rowSignature,
Expressions.fromFieldAccess(
rowSignature,
project,
aggregateCall.getArgList().get(0)
)
);
if (input == null) {
return null;
}
final AggregatorFactory aggregatorFactory;
final String histogramName = StringUtils.format("%s:agg", name);
final int k;
if (aggregateCall.getArgList().size() >= 2) {
final RexNode resolutionArg = Expressions.fromFieldAccess(
rowSignature,
project,
aggregateCall.getArgList().get(1)
);
if (!resolutionArg.isA(SqlKind.LITERAL)) {
// Resolution must be a literal in order to plan.
return null;
}
k = ((Number) RexLiteral.value(resolutionArg)).intValue();
} else {
k = DoublesSketchAggregatorFactory.DEFAULT_K;
}
// No existing match found. Create a new one.
final List<VirtualColumn> virtualColumns = new ArrayList<>();
if (input.isDirectColumnAccess()) {
aggregatorFactory = new DoublesSketchAggregatorFactory(
histogramName,
input.getDirectColumn(),
k
);
} else {
VirtualColumn virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
input,
SqlTypeName.FLOAT
);
virtualColumns.add(virtualColumn);
aggregatorFactory = new DoublesSketchAggregatorFactory(
histogramName,
virtualColumn.getOutputName(),
k
);
}
return Aggregation.create(
virtualColumns,
ImmutableList.of(aggregatorFactory),
null
);
}
private static class DoublesSketchSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE1 = "'" + NAME + "(column)'\n";
private static final String SIGNATURE2 = "'" + NAME + "(column, k)'\n";
DoublesSketchSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.OTHER),
null,
OperandTypes.or(
OperandTypes.ANY,
OperandTypes.and(
OperandTypes.sequence(SIGNATURE2, OperandTypes.ANY, OperandTypes.LITERAL),
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.EXACT_NUMERIC)
)
),
SqlFunctionCategory.USER_DEFINED_FUNCTION,
false,
false
);
}
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToQuantilePostAggregator;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
public class DoublesSketchQuantileOperatorConversion extends DirectOperatorConversion
{
private static final String FUNCTION_NAME = "DS_GET_QUANTILE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
.operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC)
.returnType(SqlTypeName.DOUBLE)
.build();
public DoublesSketchQuantileOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final PostAggregator firstOperand = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
);
if (firstOperand == null) {
return null;
}
if (!operands.get(1).isA(SqlKind.LITERAL)) {
return null;
}
final float probability = ((Number) RexLiteral.value(operands.get(1))).floatValue();
return new DoublesSketchToQuantilePostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
firstOperand,
probability
);
}
}

View File

@ -25,7 +25,13 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchApproxCountDistinctSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchEstimateOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchEstimateWithErrorBoundsOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchObjectSqlAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchSetIntersectOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchSetNotOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchSetUnionOperatorConversion;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings;
@ -48,7 +54,14 @@ public class SketchModule implements DruidModule
public void configure(Binder binder)
{
registerSerde();
SqlBindings.addAggregator(binder, ThetaSketchSqlAggregator.class);
SqlBindings.addAggregator(binder, ThetaSketchApproxCountDistinctSqlAggregator.class);
SqlBindings.addAggregator(binder, ThetaSketchObjectSqlAggregator.class);
SqlBindings.addOperatorConversion(binder, ThetaSketchEstimateOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ThetaSketchEstimateWithErrorBoundsOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ThetaSketchSetIntersectOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ThetaSketchSetUnionOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ThetaSketchSetNotOperatorConversion.class);
}
@Override

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import java.util.Collections;
import java.util.List;
public class ThetaSketchApproxCountDistinctSqlAggregator extends ThetaSketchBaseSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new ThetaSketchSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_DS_THETA";
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Override
protected Aggregation toAggregation(
String name,
boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
)
{
return Aggregation.create(
virtualColumns,
Collections.singletonList(aggregatorFactory),
finalizeAggregations ? new FinalizingFieldAccessPostAggregator(
name,
aggregatorFactory.getName()
) : null
);
}
private static class ThetaSketchSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE = "'" + NAME + "(column, size)'\n";
ThetaSketchSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.BIGINT),
InferTypes.VARCHAR_1024,
OperandTypes.or(
OperandTypes.ANY,
OperandTypes.and(
OperandTypes.sequence(SIGNATURE, OperandTypes.ANY, OperandTypes.LITERAL),
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC)
)
),
SqlFunctionCategory.NUMERIC,
false,
false
);
}
}
}

View File

@ -24,19 +24,12 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.SketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.VirtualColumn;
@ -52,20 +45,10 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ThetaSketchSqlAggregator implements SqlAggregator
public abstract class ThetaSketchBaseSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new ThetaSketchSqlAggFunction();
private static final String NAME = "APPROX_COUNT_DISTINCT_DS_THETA";
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Nullable
@Override
public Aggregation toDruidAggregation(
@ -155,39 +138,18 @@ public class ThetaSketchSqlAggregator implements SqlAggregator
);
}
return Aggregation.create(
return toAggregation(
name,
finalizeAggregations,
virtualColumns,
Collections.singletonList(aggregatorFactory),
finalizeAggregations ? new FinalizingFieldAccessPostAggregator(
name,
aggregatorFactory.getName()
) : null
aggregatorFactory
);
}
private static class ThetaSketchSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE = "'" + NAME + "(column, size)'\n";
ThetaSketchSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.BIGINT),
InferTypes.VARCHAR_1024,
OperandTypes.or(
OperandTypes.ANY,
OperandTypes.and(
OperandTypes.sequence(SIGNATURE, OperandTypes.ANY, OperandTypes.LITERAL),
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC)
)
),
SqlFunctionCategory.NUMERIC,
false,
false
);
}
}
protected abstract Aggregation toAggregation(
String name,
boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
);
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
public class ThetaSketchEstimateOperatorConversion extends DirectOperatorConversion
{
private static final String FUNCTION_NAME = "THETA_SKETCH_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
.operandTypes(SqlTypeFamily.ANY)
.returnTypeInference(ReturnTypes.DOUBLE)
.build();
public ThetaSketchEstimateOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final PostAggregator firstOperand = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
);
if (firstOperand == null) {
return null;
}
return new SketchEstimatePostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
firstOperand,
null
);
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
public class ThetaSketchEstimateWithErrorBoundsOperatorConversion extends DirectOperatorConversion
{
private static final String FUNCTION_NAME = "THETA_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
.operandTypes(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER)
.returnType(SqlTypeName.OTHER)
.build();
public ThetaSketchEstimateWithErrorBoundsOperatorConversion()
{
super(SQL_FUNCTION, FUNCTION_NAME);
}
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final PostAggregator firstOperand = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor
);
if (firstOperand == null) {
return null;
}
if (!operands.get(1).isA(SqlKind.LITERAL)) {
return null;
}
final int errorBoundsStdDev = ((Number) RexLiteral.value(operands.get(1))).intValue();
return new SketchEstimatePostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
firstOperand,
errorBoundsStdDev
);
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import java.util.Collections;
import java.util.List;
public class ThetaSketchObjectSqlAggregator extends ThetaSketchBaseSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new ThetaSketchObjectSqlAggFunction();
private static final String NAME = "DS_THETA";
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Override
protected Aggregation toAggregation(
String name,
boolean finalizeAggregations,
List<VirtualColumn> virtualColumns,
AggregatorFactory aggregatorFactory
)
{
return Aggregation.create(
virtualColumns,
Collections.singletonList(aggregatorFactory),
null
);
}
private static class ThetaSketchObjectSqlAggFunction extends SqlAggFunction
{
private static final String SIGNATURE = "'" + NAME + "(column, size)'\n";
ThetaSketchObjectSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.OTHER),
InferTypes.VARCHAR_1024,
OperandTypes.or(
OperandTypes.ANY,
OperandTypes.and(
OperandTypes.sequence(SIGNATURE, OperandTypes.ANY, OperandTypes.LITERAL),
OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.NUMERIC)
)
),
SqlFunctionCategory.USER_DEFINED_FUNCTION,
false,
false
);
}
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.SketchSetPostAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public abstract class ThetaSketchSetBaseOperatorConversion implements SqlOperatorConversion
{
public ThetaSketchSetBaseOperatorConversion()
{
}
@Override
public SqlOperator calciteOperator()
{
return makeSqlFunction();
}
@Nullable
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final List<PostAggregator> inputPostAggs = new ArrayList<>();
Integer size = null;
int operandCounter = 0;
for (RexNode operand : operands) {
final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operand,
postAggregatorVisitor
);
if (convertedPostAgg == null) {
if (operandCounter == 0) {
try {
if (!operand.isA(SqlKind.LITERAL)) {
return null;
}
size = RexLiteral.intValue(operand);
}
catch (ClassCastException cce) {
return null;
}
} else {
return null;
}
} else {
inputPostAggs.add(convertedPostAgg);
operandCounter++;
}
}
return new SketchSetPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
getSetOperationName(),
size,
inputPostAggs
);
}
private SqlFunction makeSqlFunction()
{
return new SqlFunction(
getFunctionName(),
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(
factory -> Calcites.createSqlType(factory, SqlTypeName.OTHER)
),
null,
OperandTypes.variadic(SqlOperandCountRanges.from(2)),
SqlFunctionCategory.USER_DEFINED_FUNCTION
);
}
public abstract String getSetOperationName();
public String getFunctionName()
{
return StringUtils.format("THETA_SKETCH_%s", getSetOperationName());
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
public class ThetaSketchSetIntersectOperatorConversion extends ThetaSketchSetBaseOperatorConversion
{
@Override
public String getSetOperationName()
{
return "INTERSECT";
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
public class ThetaSketchSetNotOperatorConversion extends ThetaSketchSetBaseOperatorConversion
{
@Override
public String getSetOperationName()
{
return "NOT";
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.theta.sql;
public class ThetaSketchSetUnionOperatorConversion extends ThetaSketchSetBaseOperatorConversion
{
@Override
public String getSetOperationName()
{
return "UNION";
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
@ -39,7 +40,11 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimatePostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchToStringPostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -48,6 +53,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@ -116,7 +122,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
public QueryLogHook queryLogHook = QueryLogHook.create(TestHelper.JSON_MAPPER);
private SpecificSegmentsQuerySegmentWalker walker;
private SqlLifecycleFactory sqlLifecycleFactory;
@ -127,6 +133,7 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
HllSketchModule.registerSerde();
for (Module mod : new HllSketchModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod);
TestHelper.JSON_MAPPER.registerModule(mod);
}
final QueryableIndex index = IndexBuilder.create()
@ -165,8 +172,16 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new HllSketchSqlAggregator()),
ImmutableSet.of()
ImmutableSet.of(
new HllSketchApproxCountDistinctSqlAggregator(),
new HllSketchObjectSqlAggregator()
),
ImmutableSet.of(
new HllSketchEstimateOperatorConversion(),
new HllSketchEstimateWithErrorBoundsOperatorConversion(),
new HllSketchSetUnionOperatorConversion(),
new HllSketchToStringOperatorConversion()
)
);
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
@ -419,4 +434,229 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
final int expected = NullHandling.replaceWithDefault() ? 1 : 2;
Assert.assertEquals(expected, results.size());
}
@Test
public void testHllSketchPostAggs() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ " DS_HLL(dim2),\n"
+ " DS_HLL(m1),\n"
+ " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)),\n"
+ " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)) + 1,\n"
+ " HLL_SKETCH_ESTIMATE(DS_HLL(CONCAT(dim2, 'hello'))),\n"
+ " ABS(HLL_SKETCH_ESTIMATE(DS_HLL(dim2))),\n"
+ " HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(DS_HLL(dim2), 2),\n"
+ " HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(DS_HLL(dim2)),\n"
+ " DS_HLL(POWER(ABS(m1 + 100), 2)),\n"
+ " APPROX_COUNT_DISTINCT_DS_HLL(dim2),\n"
+ " HLL_SKETCH_TO_STRING(DS_HLL(dim2)),\n"
+ " UPPER(HLL_SKETCH_TO_STRING(DS_HLL(dim2))),\n"
+ " HLL_SKETCH_ESTIMATE(DS_HLL(dim2), true)\n"
+ "FROM druid.foo";
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
"\"AgEHDAMIAgDhUv8P63iABQ==\"",
"\"AgEHDAMIBgALpZ0PjpTfBY5ElQo+C7UE4jA+DKfcYQQ=\"",
2.000000004967054d,
3.000000004967054d,
3.000000014901161d,
2.000000004967054d,
"[2.000000004967054,2.0,2.0001997319422404]",
"[2.000000004967054,2.0,2.000099863468538]",
"\"AgEHDAMIBgC1EYgH1mlHBwsKPwu5SK8MIiUxB7iZVwU=\"",
2L,
"### HLL SKETCH SUMMARY: \n"
+ " Log Config K : 12\n"
+ " Hll Target : HLL_4\n"
+ " Current Mode : LIST\n"
+ " LB : 2.0\n"
+ " Estimate : 2.000000004967054\n"
+ " UB : 2.000099863468538\n"
+ " OutOfOrder Flag: false\n"
+ " Coupon Count : 2\n",
"### HLL SKETCH SUMMARY: \n"
+ " LOG CONFIG K : 12\n"
+ " HLL TARGET : HLL_4\n"
+ " CURRENT MODE : LIST\n"
+ " LB : 2.0\n"
+ " ESTIMATE : 2.000000004967054\n"
+ " UB : 2.000099863468538\n"
+ " OUTOFORDER FLAG: FALSE\n"
+ " COUPON COUNT : 2\n",
2.0
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
Query expectedQuery =
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"concat(\"dim2\",'hello')",
ValueType.STRING,
TestExprMacroTable.INSTANCE
),
new ExpressionVirtualColumn(
"v1",
"pow(abs((\"m1\" + 100)),2)",
ValueType.DOUBLE,
TestExprMacroTable.INSTANCE
)
)
.aggregators(
ImmutableList.of(
new HllSketchBuildAggregatorFactory(
"a0",
"dim2",
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
"a1",
"m1",
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
"a2",
"v0",
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
"a3",
"v1",
null,
null,
true
),
new HllSketchBuildAggregatorFactory(
"a4",
"dim2",
null,
null,
true
)
)
)
.postAggregators(
ImmutableList.of(
new FieldAccessPostAggregator("p0", "a0"),
new FieldAccessPostAggregator("p1", "a1"),
new HllSketchToEstimatePostAggregator("p3", new FieldAccessPostAggregator("p2", "a0"), false),
new HllSketchToEstimatePostAggregator("p5", new FieldAccessPostAggregator("p4", "a0"), false),
new ExpressionPostAggregator("p6", "(p5 + 1)", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimatePostAggregator("p8", new FieldAccessPostAggregator("p7", "a2"), false),
new HllSketchToEstimatePostAggregator("p10", new FieldAccessPostAggregator("p9", "a0"), false),
new ExpressionPostAggregator("p11", "abs(p10)", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimateWithBoundsPostAggregator(
"p13",
new FieldAccessPostAggregator("p12", "a0"),
2
),
new HllSketchToEstimateWithBoundsPostAggregator(
"p15",
new FieldAccessPostAggregator("p14", "a0"),
1
),
new FieldAccessPostAggregator("p16", "a3"),
new HllSketchToStringPostAggregator("p18", new FieldAccessPostAggregator("p17", "a0")),
new HllSketchToStringPostAggregator("p20", new FieldAccessPostAggregator("p19", "a0")),
new ExpressionPostAggregator("p21", "upper(p20)", null, TestExprMacroTable.INSTANCE),
new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true)
)
)
.context(ImmutableMap.of(
"skipEmptyBuckets", true,
PlannerContext.CTX_SQL_QUERY_ID, "dummy"
))
.build();
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
}
@Test
public void testtHllSketchPostAggsPostSort() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT DS_HLL(dim2) as y FROM druid.foo ORDER BY HLL_SKETCH_ESTIMATE(DS_HLL(dim2)) DESC LIMIT 10";
final String sql2 = StringUtils.format("SELECT HLL_SKETCH_ESTIMATE(y), HLL_SKETCH_TO_STRING(y) from (%s)", sql);
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(sql2, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
2.000000004967054d,
"### HLL SKETCH SUMMARY: \n"
+ " Log Config K : 12\n"
+ " Hll Target : HLL_4\n"
+ " Current Mode : LIST\n"
+ " LB : 2.0\n"
+ " Estimate : 2.000000004967054\n"
+ " UB : 2.000099863468538\n"
+ " OutOfOrder Flag: false\n"
+ " Coupon Count : 2\n"
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
Query expectedQuery =
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new HllSketchBuildAggregatorFactory(
"a0",
"dim2",
null,
null,
true
)
)
)
.postAggregators(
ImmutableList.of(
new FieldAccessPostAggregator("p0", "a0"),
new HllSketchToEstimatePostAggregator("p2", new FieldAccessPostAggregator("p1", "a0"), false),
new HllSketchToEstimatePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), false),
new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0"))
)
)
.context(ImmutableMap.of(
"skipEmptyBuckets", true,
PlannerContext.CTX_SQL_QUERY_ID, "dummy"
))
.build();
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
}
}

View File

@ -26,19 +26,23 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToQuantilePostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
@ -48,6 +52,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@ -123,6 +128,7 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
DoublesSketchModule.registerSerde();
for (Module mod : new DoublesSketchModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod);
TestHelper.JSON_MAPPER.registerModule(mod);
}
final QueryableIndex index =
@ -160,8 +166,13 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new DoublesSketchSqlAggregator()),
ImmutableSet.of()
ImmutableSet.of(
new DoublesSketchApproxQuantileSqlAggregator(),
new DoublesSketchObjectSqlAggregator()
),
ImmutableSet.of(
new DoublesSketchQuantileOperatorConversion()
)
);
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
@ -469,6 +480,167 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
);
}
@Test
public void testDoublesSketchPostAggs() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " APPROX_QUANTILE_DS(cnt, 0.5) + 1,\n"
+ " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5) + 1000,\n"
+ " DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt + 123), 0.5) + 1000,\n"
+ " ABS(DS_GET_QUANTILE(DS_QUANTILES_SKETCH(cnt), 0.5))\n"
+ "FROM foo";
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
6L,
2.0d,
1001.0d,
1124.0d,
1.0d
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
Query expectedQuery = Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"(\"cnt\" + 123)",
ValueType.FLOAT,
TestExprMacroTable.INSTANCE
)
)
.aggregators(ImmutableList.of(
new LongSumAggregatorFactory("a0", "cnt"),
new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128),
new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128),
new DoublesSketchAggregatorFactory("a3:agg", "v0", 128)
))
.postAggregators(
new DoublesSketchToQuantilePostAggregator(
"a1",
makeFieldAccessPostAgg("a1:agg"),
0.5f
),
new ExpressionPostAggregator(
"p0",
"(\"a1\" + 1)",
null,
TestExprMacroTable.INSTANCE
),
new DoublesSketchToQuantilePostAggregator(
"p2",
new FieldAccessPostAggregator(
"p1",
"a2:agg"
),
0.5f
),
new ExpressionPostAggregator(
"p3",
"(p2 + 1000)",
null,
TestExprMacroTable.INSTANCE
),
new DoublesSketchToQuantilePostAggregator(
"p5",
new FieldAccessPostAggregator(
"p4",
"a3:agg"
),
0.5f
),
new ExpressionPostAggregator(
"p6",
"(p5 + 1000)",
null,
TestExprMacroTable.INSTANCE
),
new DoublesSketchToQuantilePostAggregator(
"p8",
new FieldAccessPostAggregator(
"p7",
"a2:agg"
),
0.5f
),
new ExpressionPostAggregator("p9", "abs(p8)", null, TestExprMacroTable.INSTANCE)
)
.context(ImmutableMap.of(
"skipEmptyBuckets",
true,
PlannerContext.CTX_SQL_QUERY_ID,
"dummy"
))
.build();
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
}
@Test
public void testDoublesSketchPostAggsPostSort() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT DS_QUANTILES_SKETCH(m1) as y FROM druid.foo ORDER BY DS_GET_QUANTILE(DS_QUANTILES_SKETCH(m1), 0.5) DESC LIMIT 10";
final String sql2 = StringUtils.format("SELECT DS_GET_QUANTILE(y, 0.5), DS_GET_QUANTILE(y, 0.98) from (%s)", sql);
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(sql2, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
4.0d,
6.0d
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
Query expectedQuery =
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
)
)
.postAggregators(
ImmutableList.of(
new FieldAccessPostAggregator("p0", "a0:agg"),
new DoublesSketchToQuantilePostAggregator("p2", new FieldAccessPostAggregator("p1", "a0:agg"), 0.5),
new DoublesSketchToQuantilePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), 0.5),
new DoublesSketchToQuantilePostAggregator("s3", new FieldAccessPostAggregator("s2", "p0"), 0.9800000190734863)
)
)
.context(ImmutableMap.of(
"skipEmptyBuckets", true,
PlannerContext.CTX_SQL_QUERY_ID, "dummy"
))
.build();
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
}
private static PostAggregator makeFieldAccessPostAgg(String name)
{
return new FieldAccessPostAggregator(name, name);

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
@ -36,8 +37,10 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator;
import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
import org.apache.druid.query.aggregation.datasketches.theta.SketchSetPostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
@ -47,6 +50,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@ -125,6 +129,7 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
SketchModule.registerSerde();
for (Module mod : new SketchModule().getJacksonModules()) {
CalciteTests.getJsonMapper().registerModule(mod);
TestHelper.JSON_MAPPER.registerModule(mod);
}
final QueryableIndex index = IndexBuilder.create()
@ -164,8 +169,17 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new ThetaSketchSqlAggregator()),
ImmutableSet.of()
ImmutableSet.of(
new ThetaSketchApproxCountDistinctSqlAggregator(),
new ThetaSketchObjectSqlAggregator()
),
ImmutableSet.of(
new ThetaSketchEstimateOperatorConversion(),
new ThetaSketchEstimateWithErrorBoundsOperatorConversion(),
new ThetaSketchSetIntersectOperatorConversion(),
new ThetaSketchSetUnionOperatorConversion(),
new ThetaSketchSetNotOperatorConversion()
)
);
sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(
@ -399,4 +413,246 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
// Verify query
Assert.assertEquals(expected, actual);
}
@Test
public void testThetaSketchPostAggs() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " theta_sketch_estimate(DS_THETA(dim2)),\n"
+ " theta_sketch_estimate(DS_THETA(CONCAT(dim2, 'hello'))),\n"
+ " theta_sketch_estimate_with_error_bounds(DS_THETA(dim2), 10),\n"
+ " THETA_SKETCH_INTERSECT(DS_THETA(dim2), DS_THETA(dim1)),\n"
+ " THETA_SKETCH_UNION(DS_THETA(dim2), DS_THETA(dim1)),\n"
+ " THETA_SKETCH_NOT(DS_THETA(dim2), DS_THETA(dim1)),\n"
+ " THETA_SKETCH_INTERSECT(32768, DS_THETA(dim2), DS_THETA(dim1)),\n"
+ " theta_sketch_estimate(THETA_SKETCH_INTERSECT(THETA_SKETCH_INTERSECT(DS_THETA(dim2), DS_THETA(dim1)), DS_THETA(dim2)))\n"
+ "FROM druid.foo";
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
final List<Object[]> expectedResults;
if (NullHandling.replaceWithDefault()) {
expectedResults = ImmutableList.of(
new Object[]{
6L,
2.0d,
3.0d,
"{\"estimate\":2.0,\"highBound\":2.0,\"lowBound\":2.0,\"numStdDev\":10}",
"\"AQMDAAAazJOQxkPsNomrZQ==\"",
"\"AgMDAAAazJMGAAAAAACAP1XTBztMIcMJ+HOoBBne1zKQxkPsNomrZUeWbJt3n+VpF8EdUoUHAXvxsLkOSE0lfQ==\"",
"\"AQMDAAAazJMXwR1ShQcBew==\"",
"\"AQMDAAAazJOQxkPsNomrZQ==\"",
1.0d
}
);
} else {
expectedResults = ImmutableList.of(
new Object[]{
6L,
2.0d,
3.0d,
"{\"estimate\":2.0,\"highBound\":2.0,\"lowBound\":2.0,\"numStdDev\":10}",
"\"AQMDAAAazJOQxkPsNomrZQ==\"",
"\"AgMDAAAazJMGAAAAAACAP1XTBztMIcMJ+HOoBBne1zKQxkPsNomrZUeWbJt3n+VpF8EdUoUHAXvxsLkOSE0lfQ==\"",
"\"AQMDAAAazJMXwR1ShQcBew==\"",
"\"AQMDAAAazJOQxkPsNomrZQ==\"",
1.0d
}
);
}
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
Query expectedQuery =
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"concat(\"dim2\",'hello')",
ValueType.STRING,
TestExprMacroTable.INSTANCE
)
)
.aggregators(
ImmutableList.of(
new LongSumAggregatorFactory("a0", "cnt"),
new SketchMergeAggregatorFactory(
"a1",
"dim2",
null,
null,
null,
null
),
new SketchMergeAggregatorFactory(
"a2",
"v0",
null,
null,
null,
null
),
new SketchMergeAggregatorFactory(
"a3",
"dim1",
null,
null,
null,
null
)
)
)
.postAggregators(
new SketchEstimatePostAggregator(
"p1",
new FieldAccessPostAggregator("p0", "a1"),
null
),
new SketchEstimatePostAggregator(
"p3",
new FieldAccessPostAggregator("p2", "a2"),
null
),
new SketchEstimatePostAggregator(
"p5",
new FieldAccessPostAggregator("p4", "a1"),
10
),
new SketchSetPostAggregator(
"p8",
"INTERSECT",
null,
ImmutableList.of(
new FieldAccessPostAggregator("p6", "a1"),
new FieldAccessPostAggregator("p7", "a3")
)
),
new SketchSetPostAggregator(
"p11",
"UNION",
null,
ImmutableList.of(
new FieldAccessPostAggregator("p9", "a1"),
new FieldAccessPostAggregator("p10", "a3")
)
),
new SketchSetPostAggregator(
"p14",
"NOT",
null,
ImmutableList.of(
new FieldAccessPostAggregator("p12", "a1"),
new FieldAccessPostAggregator("p13", "a3")
)
),
new SketchSetPostAggregator(
"p17",
"INTERSECT",
32768,
ImmutableList.of(
new FieldAccessPostAggregator("p15", "a1"),
new FieldAccessPostAggregator("p16", "a3")
)
),
new SketchEstimatePostAggregator(
"p23",
new SketchSetPostAggregator(
"p22",
"INTERSECT",
null,
ImmutableList.of(
new SketchSetPostAggregator(
"p20",
"INTERSECT",
null,
ImmutableList.of(
new FieldAccessPostAggregator("p18", "a1"),
new FieldAccessPostAggregator("p19", "a3")
)
),
new FieldAccessPostAggregator("p21", "a1")
)
),
null
)
)
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
.build();
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
}
@Test
public void testThetaSketchPostAggsPostSort() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT DS_THETA(dim2) as y FROM druid.foo ORDER BY THETA_SKETCH_ESTIMATE(DS_THETA(dim2)) DESC LIMIT 10";
final String sql2 = StringUtils.format("SELECT THETA_SKETCH_ESTIMATE(y) from (%s)", sql);
// Verify results
final List<Object[]> results = sqlLifecycle.runSimple(sql2, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{
2.0d
}
);
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
Query actualQuery = Iterables.getOnlyElement(queryLogHook.getRecordedQueries());
Query expectedQuery =
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new SketchMergeAggregatorFactory(
"a0",
"dim2",
null,
null,
null,
null
)
)
)
.postAggregators(
new FieldAccessPostAggregator("p0", "a0"),
new SketchEstimatePostAggregator(
"p2",
new FieldAccessPostAggregator("p1", "a0"),
null
),
new SketchEstimatePostAggregator(
"s1",
new FieldAccessPostAggregator("s0", "p0"),
null
)
)
.context(ImmutableMap.of("skipEmptyBuckets", true, PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
.build();
// Verify query
Assert.assertEquals(expectedQuery, actualQuery);
}
}

View File

@ -53,4 +53,5 @@ public class PostAggregatorIds
public static final byte THETA_SKETCH_TO_STRING = 29;
public static final byte TDIGEST_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 30;
public static final byte TDIGEST_SKETCH_TO_QUANTILE_CACHE_TYPE_ID = 31;
public static final byte HLL_SKETCH_TO_ESTIMATE_CACHE_TYPE_ID = 32;
}

View File

@ -51,7 +51,7 @@ import java.util.stream.IntStream;
*/
public class TestHelper
{
private static final ObjectMapper JSON_MAPPER = makeJsonMapper();
public static final ObjectMapper JSON_MAPPER = makeJsonMapper();
public static IndexMergerV9 getTestIndexMergerV9(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
{

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
import java.util.stream.Collectors;
public class BinaryOperatorConversion implements SqlOperatorConversion
@ -75,4 +76,37 @@ public class BinaryOperatorConversion implements SqlOperatorConversion
}
);
}
@Nullable
@Override
public DruidExpression toDruidExpressionWithPostAggOperands(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
return OperatorConversions.convertCallWithPostAggOperands(
plannerContext,
rowSignature,
rexNode,
operands -> {
if (operands.size() < 2) {
throw new ISE("WTF?! Got binary operator[%s] with %s args?", operator.getName(), operands.size());
}
return DruidExpression.fromExpression(
StringUtils.format(
"(%s)",
joiner.join(
operands.stream()
.map(DruidExpression::getExpression)
.collect(Collectors.toList())
)
)
);
},
postAggregatorVisitor
);
}
}

View File

@ -24,6 +24,8 @@ import org.apache.calcite.sql.SqlOperator;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
public class DirectOperatorConversion implements SqlOperatorConversion
{
private final SqlOperator operator;
@ -60,4 +62,22 @@ public class DirectOperatorConversion implements SqlOperatorConversion
{
return druidFunctionName;
}
@Nullable
@Override
public DruidExpression toDruidExpressionWithPostAggOperands(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
return OperatorConversions.convertCallWithPostAggOperands(
plannerContext,
rowSignature,
rexNode,
operands -> DruidExpression.fromExpression(DruidExpression.functionCall(druidFunctionName, operands)),
postAggregatorVisitor
);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.math.expr.Parser;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.extraction.TimeFormatExtractionFn;
@ -128,6 +129,43 @@ public class Expressions
return retVal;
}
/**
* Translate a list of Calcite {@code RexNode} to Druid expressions, with the possibility of having postagg operands.
*
* @param plannerContext SQL planner context
* @param rowSignature signature of the rows to be extracted from
* @param rexNodes list of Calcite expressions meant to be applied on top of the rows
* @param postAggregatorVisitor visitor that manages postagg names and tracks postaggs that were created as
* by the translation
*
* @return list of Druid expressions in the same order as rexNodes, or null if not possible.
* If a non-null list is returned, all elements will be non-null.
*/
@Nullable
public static List<DruidExpression> toDruidExpressionsWithPostAggOperands(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final List<RexNode> rexNodes,
final PostAggregatorVisitor postAggregatorVisitor
)
{
final List<DruidExpression> retVal = new ArrayList<>(rexNodes.size());
for (RexNode rexNode : rexNodes) {
final DruidExpression druidExpression = toDruidExpressionWithPostAggOperands(
plannerContext,
rowSignature,
rexNode,
postAggregatorVisitor
);
if (druidExpression == null) {
return null;
}
retVal.add(druidExpression);
}
return retVal;
}
/**
* Translate a Calcite {@code RexNode} to a Druid expressions.
*
@ -143,68 +181,138 @@ public class Expressions
final RowSignature rowSignature,
final RexNode rexNode
)
{
return toDruidExpressionWithPostAggOperands(
plannerContext,
rowSignature,
rexNode,
null
);
}
@Nullable
public static DruidExpression toDruidExpressionWithPostAggOperands(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RexNode rexNode,
@Nullable final PostAggregatorVisitor postAggregatorVisitor
)
{
final SqlKind kind = rexNode.getKind();
final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
if (kind == SqlKind.INPUT_REF) {
// Translate field references.
final RexInputRef ref = (RexInputRef) rexNode;
final String columnName = rowSignature.getRowOrder().get(ref.getIndex());
if (columnName == null) {
throw new ISE("WTF?! Expression referred to nonexistent index[%d]", ref.getIndex());
}
return DruidExpression.fromColumn(columnName);
return inputRefToDruidExpression(rowSignature, rexNode);
} else if (rexNode instanceof RexCall) {
final SqlOperator operator = ((RexCall) rexNode).getOperator();
final SqlOperatorConversion conversion = plannerContext.getOperatorTable()
.lookupOperatorConversion(operator);
if (conversion == null) {
return null;
} else {
return conversion.toDruidExpression(plannerContext, rowSignature, rexNode);
}
return rexCallToDruidExpression(plannerContext, rowSignature, rexNode, postAggregatorVisitor);
} else if (kind == SqlKind.LITERAL) {
// Translate literal.
if (RexLiteral.isNullLiteral(rexNode)) {
return DruidExpression.fromExpression(DruidExpression.nullLiteral());
} else if (SqlTypeName.NUMERIC_TYPES.contains(sqlTypeName)) {
return DruidExpression.fromExpression(DruidExpression.numberLiteral((Number) RexLiteral.value(rexNode)));
} else if (SqlTypeFamily.INTERVAL_DAY_TIME == sqlTypeName.getFamily()) {
// Calcite represents DAY-TIME intervals in milliseconds.
final long milliseconds = ((Number) RexLiteral.value(rexNode)).longValue();
return DruidExpression.fromExpression(DruidExpression.numberLiteral(milliseconds));
} else if (SqlTypeFamily.INTERVAL_YEAR_MONTH == sqlTypeName.getFamily()) {
// Calcite represents YEAR-MONTH intervals in months.
final long months = ((Number) RexLiteral.value(rexNode)).longValue();
return DruidExpression.fromExpression(DruidExpression.numberLiteral(months));
} else if (SqlTypeName.STRING_TYPES.contains(sqlTypeName)) {
return DruidExpression.fromExpression(DruidExpression.stringLiteral(RexLiteral.stringValue(rexNode)));
} else if (SqlTypeName.TIMESTAMP == sqlTypeName || SqlTypeName.DATE == sqlTypeName) {
if (RexLiteral.isNullLiteral(rexNode)) {
return DruidExpression.fromExpression(DruidExpression.nullLiteral());
} else {
return DruidExpression.fromExpression(
DruidExpression.numberLiteral(
Calcites.calciteDateTimeLiteralToJoda(rexNode, plannerContext.getTimeZone()).getMillis()
)
);
}
} else if (SqlTypeName.BOOLEAN == sqlTypeName) {
return DruidExpression.fromExpression(DruidExpression.numberLiteral(RexLiteral.booleanValue(rexNode) ? 1 : 0));
} else {
// Can't translate other literals.
return null;
}
return literalToDruidExpression(plannerContext, rexNode);
} else {
// Can't translate.
return null;
}
}
private static DruidExpression inputRefToDruidExpression(
final RowSignature rowSignature,
final RexNode rexNode
)
{
// Translate field references.
final RexInputRef ref = (RexInputRef) rexNode;
final String columnName = rowSignature.getRowOrder().get(ref.getIndex());
if (columnName == null) {
throw new ISE("WTF?! Expression referred to nonexistent index[%d]", ref.getIndex());
}
return DruidExpression.fromColumn(columnName);
}
private static DruidExpression rexCallToDruidExpression(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RexNode rexNode,
final PostAggregatorVisitor postAggregatorVisitor
)
{
final SqlOperator operator = ((RexCall) rexNode).getOperator();
final SqlOperatorConversion conversion = plannerContext.getOperatorTable()
.lookupOperatorConversion(operator);
if (conversion == null) {
return null;
} else {
if (postAggregatorVisitor != null) {
// try making postagg first
PostAggregator postAggregator = conversion.toPostAggregator(
plannerContext,
rowSignature,
rexNode,
postAggregatorVisitor
);
if (postAggregator != null) {
postAggregatorVisitor.addPostAgg(postAggregator);
String exprName = postAggregator.getName();
return DruidExpression.of(SimpleExtraction.of(exprName, null), exprName);
}
}
DruidExpression expression = conversion.toDruidExpressionWithPostAggOperands(
plannerContext,
rowSignature,
rexNode,
postAggregatorVisitor
);
if (expression == null) {
return null;
} else {
return expression;
}
}
}
private static DruidExpression literalToDruidExpression(
final PlannerContext plannerContext,
final RexNode rexNode
)
{
final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
// Translate literal.
if (RexLiteral.isNullLiteral(rexNode)) {
return DruidExpression.fromExpression(DruidExpression.nullLiteral());
} else if (SqlTypeName.NUMERIC_TYPES.contains(sqlTypeName)) {
return DruidExpression.fromExpression(DruidExpression.numberLiteral((Number) RexLiteral.value(rexNode)));
} else if (SqlTypeFamily.INTERVAL_DAY_TIME == sqlTypeName.getFamily()) {
// Calcite represents DAY-TIME intervals in milliseconds.
final long milliseconds = ((Number) RexLiteral.value(rexNode)).longValue();
return DruidExpression.fromExpression(DruidExpression.numberLiteral(milliseconds));
} else if (SqlTypeFamily.INTERVAL_YEAR_MONTH == sqlTypeName.getFamily()) {
// Calcite represents YEAR-MONTH intervals in months.
final long months = ((Number) RexLiteral.value(rexNode)).longValue();
return DruidExpression.fromExpression(DruidExpression.numberLiteral(months));
} else if (SqlTypeName.STRING_TYPES.contains(sqlTypeName)) {
return DruidExpression.fromExpression(DruidExpression.stringLiteral(RexLiteral.stringValue(rexNode)));
} else if (SqlTypeName.TIMESTAMP == sqlTypeName || SqlTypeName.DATE == sqlTypeName) {
if (RexLiteral.isNullLiteral(rexNode)) {
return DruidExpression.fromExpression(DruidExpression.nullLiteral());
} else {
return DruidExpression.fromExpression(
DruidExpression.numberLiteral(
Calcites.calciteDateTimeLiteralToJoda(rexNode, plannerContext.getTimeZone()).getMillis()
)
);
}
} else if (SqlTypeName.BOOLEAN == sqlTypeName) {
return DruidExpression.fromExpression(DruidExpression.numberLiteral(RexLiteral.booleanValue(rexNode) ? 1 : 0));
} else {
// Can't translate other literals.
return null;
}
}
/**
* Translates "condition" to a Druid filter, or returns null if we cannot translate the condition.
*

View File

@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlCallBinding;
@ -43,7 +44,10 @@ import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Static;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
@ -136,6 +140,85 @@ public class OperatorConversions
}
}
@Nullable
public static DruidExpression convertCallWithPostAggOperands(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RexNode rexNode,
final Function<List<DruidExpression>, DruidExpression> expressionFunction,
final PostAggregatorVisitor postAggregatorVisitor
)
{
final RexCall call = (RexCall) rexNode;
final List<DruidExpression> druidExpressions = Expressions.toDruidExpressionsWithPostAggOperands(
plannerContext,
rowSignature,
call.getOperands(),
postAggregatorVisitor
);
if (druidExpressions == null) {
return null;
}
return expressionFunction.apply(druidExpressions);
}
/**
* Translate a Calcite {@code RexNode} to a Druid PostAggregator
*
* @param plannerContext SQL planner context
* @param rowSignature signature of the rows to be extracted from
* @param rexNode expression meant to be applied on top of the rows
*
* @param postAggregatorVisitor visitor that manages postagg names and tracks postaggs that were created
* by the translation
* @return rexNode referring to fields in rowOrder, or null if not possible
*/
@Nullable
public static PostAggregator toPostAggregator(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final RexNode rexNode,
final PostAggregatorVisitor postAggregatorVisitor
)
{
final SqlKind kind = rexNode.getKind();
if (kind == SqlKind.INPUT_REF) {
// Translate field references.
final RexInputRef ref = (RexInputRef) rexNode;
final String columnName = rowSignature.getRowOrder().get(ref.getIndex());
if (columnName == null) {
throw new ISE("WTF?! PostAgg referred to nonexistent index[%d]", ref.getIndex());
}
return new FieldAccessPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
columnName
);
} else if (rexNode instanceof RexCall) {
final SqlOperator operator = ((RexCall) rexNode).getOperator();
final SqlOperatorConversion conversion = plannerContext.getOperatorTable()
.lookupOperatorConversion(operator);
if (conversion == null) {
return null;
} else {
return conversion.toPostAggregator(
plannerContext,
rowSignature,
rexNode,
postAggregatorVisitor
);
}
} else if (kind == SqlKind.LITERAL) {
return null;
} else {
throw new IAE("Unknown rexnode kind: " + kind);
}
}
public static OperatorBuilder operatorBuilder(final String name)
{
return new OperatorBuilder(name);

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.expression;
import org.apache.druid.query.aggregation.PostAggregator;
import java.util.ArrayList;
import java.util.List;
/**
* This class serves as a tracking structure for managing post aggregator column names and any post aggs that
* are created as part of translation of a Calcite {@code RexNode} into native Druid structures.
*/
public class PostAggregatorVisitor
{
private String outputNamePrefix;
private int counter = 0;
private List<PostAggregator> postAggs = new ArrayList<>();
public PostAggregatorVisitor(
String outputNamePrefix
)
{
this.outputNamePrefix = outputNamePrefix;
}
public int getAndIncrementCounter()
{
return counter++;
}
public String getOutputNamePrefix()
{
return outputNamePrefix;
}
public List<PostAggregator> getPostAggs()
{
return postAggs;
}
public void addPostAgg(PostAggregator postAggregator)
{
postAggs.add(postAggregator);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.expression;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
@ -49,7 +50,35 @@ public interface SqlOperatorConversion
* @see Expressions#toDruidExpression(PlannerContext, RowSignature, RexNode)
*/
@Nullable
DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode);
DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
);
/**
* Translate a Calcite {@code RexNode} to a Druid expression, with the possibility of having postagg operands.
*
* @param plannerContext SQL planner context
* @param rowSignature signature of the rows to be extracted from
* @param rexNode expression meant to be applied on top of the rows
* @param postAggregatorVisitor visitor that manages postagg names and tracks postaggs that were created as
* by the translation
*
* @return Druid expression, or null if translation is not possible
*
* @see Expressions#toDruidExpression(PlannerContext, RowSignature, RexNode)
*/
@Nullable
default DruidExpression toDruidExpressionWithPostAggOperands(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
return toDruidExpression(plannerContext, rowSignature, rexNode);
}
/**
* Returns a Druid filter corresponding to a Calcite {@code RexNode} used as a filter condition.
@ -71,4 +100,27 @@ public interface SqlOperatorConversion
{
return null;
}
/**
* Returns a Druid PostAggregator corresponding to a Calcite {@link RexNode} used to transform a row after
* aggregation has occurred.
*
* @param plannerContext SQL planner context
* @param querySignature signature of the rows to be extracted from
* @param rexNode expression meant to be applied on top of the rows
*
* @param postAggregatorVisitor visitor that manages postagg names and tracks postaggs that were created
* by the translation
* @return filter, or null if the call cannot be translated
*/
@Nullable
default PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature querySignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
return null;
}
}

View File

@ -23,14 +23,18 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.math.expr.ExprType;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignature;
@ -79,6 +83,112 @@ public class Projection
this.outputRowSignature = outputRowSignature;
}
private static void postAggregationHandleInputRefOrLiteral(
final Project project,
final PlannerContext plannerContext,
final RowSignature inputRowSignature,
final RexNode postAggregatorRexNode,
final List<String> rowOrder,
final PostAggregatorVisitor postAggregatorVisitor
)
{
// Attempt to convert to PostAggregator.
final DruidExpression postAggregatorExpression = Expressions.toDruidExpression(
plannerContext,
inputRowSignature,
postAggregatorRexNode
);
if (postAggregatorExpression == null) {
throw new CannotBuildQueryException(project, postAggregatorRexNode);
}
handlePostAggregatorExpression(
plannerContext,
inputRowSignature,
postAggregatorRexNode,
rowOrder,
postAggregatorVisitor,
postAggregatorExpression
);
}
private static void postAggregationHandleOtherKinds(
final Project project,
final PlannerContext plannerContext,
final RowSignature inputRowSignature,
final RexNode postAggregatorRexNode,
final List<String> rowOrder,
final PostAggregatorVisitor postAggregatorVisitor
)
{
PostAggregator pagg = OperatorConversions.toPostAggregator(
plannerContext,
inputRowSignature,
postAggregatorRexNode,
postAggregatorVisitor
);
if (pagg != null) {
postAggregatorVisitor.addPostAgg(pagg);
rowOrder.add(pagg.getName());
} else {
final DruidExpression postAggregatorExpression = Expressions.toDruidExpressionWithPostAggOperands(
plannerContext,
inputRowSignature,
postAggregatorRexNode,
postAggregatorVisitor
);
if (postAggregatorExpression == null) {
throw new CannotBuildQueryException(project, postAggregatorRexNode);
}
handlePostAggregatorExpression(
plannerContext,
inputRowSignature,
postAggregatorRexNode,
rowOrder,
postAggregatorVisitor,
postAggregatorExpression
);
}
}
private static void handlePostAggregatorExpression(
final PlannerContext plannerContext,
final RowSignature inputRowSignature,
final RexNode postAggregatorRexNode,
final List<String> rowOrder,
final PostAggregatorVisitor postAggregatorVisitor,
final DruidExpression postAggregatorExpression
)
{
if (postAggregatorComplexDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
// Direct column access on a COMPLEX column, expressions cannot operate on complex columns, only postaggs
// Wrap the column access in a field access postagg so that other postaggs can use it
final PostAggregator postAggregator = new FieldAccessPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
postAggregatorExpression.getDirectColumn()
);
postAggregatorVisitor.addPostAgg(postAggregator);
rowOrder.add(postAggregator.getName());
} else if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
// Direct column access, without any type cast as far as Druid's runtime is concerned.
// (There might be a SQL-level type cast that we don't care about)
rowOrder.add(postAggregatorExpression.getDirectColumn());
} else {
final PostAggregator postAggregator = new ExpressionPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
postAggregatorExpression.getExpression(),
null,
plannerContext.getExprMacroTable()
);
postAggregatorVisitor.addPostAgg(postAggregator);
rowOrder.add(postAggregator.getName());
}
}
public static Projection postAggregation(
final Project project,
final PlannerContext plannerContext,
@ -87,43 +197,35 @@ public class Projection
)
{
final List<String> rowOrder = new ArrayList<>();
final List<PostAggregator> postAggregators = new ArrayList<>();
final String outputNamePrefix = Calcites.findUnusedPrefix(
basePrefix,
new TreeSet<>(inputRowSignature.getRowOrder())
);
final PostAggregatorVisitor postAggVisitor = new PostAggregatorVisitor(outputNamePrefix);
int outputNameCounter = 0;
for (final RexNode postAggregatorRexNode : project.getChildExps()) {
// Attempt to convert to PostAggregator.
final DruidExpression postAggregatorExpression = Expressions.toDruidExpression(
plannerContext,
inputRowSignature,
postAggregatorRexNode
);
if (postAggregatorExpression == null) {
throw new CannotBuildQueryException(project, postAggregatorRexNode);
}
if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) {
// Direct column access, without any type cast as far as Druid's runtime is concerned.
// (There might be a SQL-level type cast that we don't care about)
rowOrder.add(postAggregatorExpression.getDirectColumn());
} else {
final String postAggregatorName = outputNamePrefix + outputNameCounter++;
final PostAggregator postAggregator = new ExpressionPostAggregator(
postAggregatorName,
postAggregatorExpression.getExpression(),
null,
plannerContext.getExprMacroTable()
if (postAggregatorRexNode.getKind() == SqlKind.INPUT_REF || postAggregatorRexNode.getKind() == SqlKind.LITERAL) {
postAggregationHandleInputRefOrLiteral(
project,
plannerContext,
inputRowSignature,
postAggregatorRexNode,
rowOrder,
postAggVisitor
);
} else {
postAggregationHandleOtherKinds(
project,
plannerContext,
inputRowSignature,
postAggregatorRexNode,
rowOrder,
postAggVisitor
);
postAggregators.add(postAggregator);
rowOrder.add(postAggregator.getName());
}
}
return new Projection(postAggregators, null, RowSignature.from(rowOrder, project.getRowType()));
return new Projection(postAggVisitor.getPostAggs(), null, RowSignature.from(rowOrder, project.getRowType()));
}
public static Projection preAggregation(
@ -209,6 +311,33 @@ public class Projection
return toExprType.equals(fromExprType);
}
/**
* Returns true if a post-aggregation "expression" can be realized as a direct field access. This is true if it's
* a direct column access that doesn't require an implicit cast.
*
* @param aggregateRowSignature signature of the aggregation
* @param expression post-aggregation expression
* @param rexNode RexNode for the post-aggregation expression
*
* @return yes or no
*/
private static boolean postAggregatorComplexDirectColumnIsOk(
final RowSignature aggregateRowSignature,
final DruidExpression expression,
final RexNode rexNode
)
{
if (!expression.isDirectColumnAccess()) {
return false;
}
// Check if a cast is necessary.
final ValueType toValueType = aggregateRowSignature.getColumnType(expression.getDirectColumn());
final ValueType fromValueType = Calcites.getValueTypeForSqlTypeName(rexNode.getType().getSqlTypeName());
return toValueType == ValueType.COMPLEX && fromValueType == ValueType.COMPLEX;
}
public List<PostAggregator> getPostAggregators()
{
// If you ever see this error, it probably means a Projection was created in pre-aggregation mode, but then