Tuple sketch SQL support (#13887)

This PR is a follow-up to #13819 so that the Tuple sketch functionality can be used in SQL for both ingestion using Multi-Stage Queries (MSQ) and also for analytic queries against Tuple sketch columns.
This commit is contained in:
frankgrimes97 2023-03-28 09:17:12 -04:00 committed by GitHub
parent c2fe6a4956
commit 2f98675285
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1116 additions and 4 deletions

6
.gitignore vendored
View File

@ -33,3 +33,9 @@ integration-tests/gen-scripts/
**/.ipython/
**/.jupyter/
**/.local/
# ignore NetBeans IDE specific files
nbproject
nbactions.xml
nb-configuration.xml

View File

@ -139,6 +139,16 @@ Load the [DataSketches extension](../development/extensions-core/datasketches-ex
|`DS_QUANTILES_SKETCH(expr, [k])`|Creates a [Quantiles sketch](../development/extensions-core/datasketches-quantiles.md) on the values of `expr`, which can be a regular column or a column containing quantiles sketches. The `k` parameter is described in the Quantiles sketch documentation.<br/><br/>See the [known issue](sql-translation.md#approximations) with this function.|`'0'` (STRING)|
### Tuple sketch functions
Load the [DataSketches extension](../development/extensions-core/datasketches-extension.md) to use the following functions.
|Function|Notes|Default|
|--------|-----|-------|
|`DS_TUPLE_DOUBLES(expr, [nominalEntries])`|Creates a [Tuple sketch](../development/extensions-core/datasketches-tuple.md) on the values of `expr` which is a column containing Tuple sketches which contain an array of double values as their Summary Objects. The `nominalEntries` override parameter is optional and described in the Tuple sketch documentation.
|`DS_TUPLE_DOUBLES(dimensionColumnExpr, metricColumnExpr, ..., [nominalEntries])`|Creates a [Tuple sketch](../development/extensions-core/datasketches-tuple.md) which contains an array of double values as its Summary Object based on the dimension value of `dimensionColumnExpr` and the numeric metric values contained in one or more `metricColumnExpr` columns. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).
### T-Digest sketch functions
Load the T-Digest extension to use the following functions. See the [T-Digest extension](../development/extensions-contrib/tdigestsketch-quantiles.md) for additional details and for more information on these functions.

View File

@ -491,6 +491,48 @@ Returns an approximate rank between 0 and 1 of a given value, in which the rank
Creates a Theta sketch on a column containing Theta sketches or a regular column.
## DS_TUPLE_DOUBLES
`DS_TUPLE_DOUBLES(expr, [nominalEntries])`
`DS_TUPLE_DOUBLES(dimensionColumnExpr, metricColumnExpr, ..., [nominalEntries])`
**Function type:** [Aggregation](sql-aggregations.md)
Creates a Tuple sketch which contains an array of double values as the Summary Object. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).
## DS_TUPLE_DOUBLES_INTERSECT
`DS_TUPLE_DOUBLES_INTERSECT(expr, ..., [nominalEntries])`
**Function type:** [Scalar, sketch](sql-scalar.md#tuple-sketch-functions)
Returns an intersection of Tuple sketches which each contain an array of double values as their Summary Objects. The values contained in the Summary Objects are summed when combined. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).
## DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE
`DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(expr)`
**Function type:** [Scalar, sketch](sql-scalar.md#tuple-sketch-functions)
Computes approximate sums of the values contained within a Tuple sketch which contains an array of double values as the Summary Object.
## DS_TUPLE_DOUBLES_NOT
`DS_TUPLE_DOUBLES_NOT(expr, ..., [nominalEntries])`
**Function type:** [Scalar, sketch](sql-scalar.md#tuple-sketch-functions)
Returns a set difference of Tuple sketches which each contain an array of double values as their Summary Objects. The values contained in the Summary Object are preserved as is. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).
## DS_TUPLE_DOUBLES_UNION
`DS_TUPLE_DOUBLES_UNION(expr, ..., [nominalEntries])`
**Function type:** [Scalar, sketch](sql-scalar.md#tuple-sketch-functions)
Returns a union of Tuple sketches which each contain an array of double values as their Summary Objects. The values contained in the Summary Objects are summed when combined. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).
## EARLIEST
`EARLIEST(expr)`

View File

@ -248,6 +248,19 @@ The [DataSketches extension](../development/extensions-core/datasketches-extensi
|`DS_RANK(expr, value)`|Returns an approximation to the rank of a given value that is the fraction of the distribution less than that value from a quantiles sketch. `expr` must return a quantiles sketch.|
|`DS_QUANTILE_SUMMARY(expr)`|Returns a string summary of a quantiles sketch, useful for debugging. `expr` must return a quantiles sketch.|
### Tuple sketch functions
The following functions operate on [tuple sketches](../development/extensions-core/datasketches-tuple.md).
The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use the following functions.
|Function|Notes|Default|
|--------|-----|-------|
|`DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(expr)`|Computes approximate sums of the values contained within a [Tuple sketch](../development/extensions-core/datasketches-tuple.md#estimated-metrics-values-for-each-column-of-arrayofdoublessketch) column which contains an array of double values as its Summary Object.
|`DS_TUPLE_DOUBLES_INTERSECT(expr, ..., [nominalEntries])`|Returns an intersection of tuple sketches, where each input expression must return a tuple sketch which contains an array of double values as its Summary Object. The values contained in the Summary Objects are summed when combined. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).|
|`DS_TUPLE_DOUBLES_NOT(expr, ..., [nominalEntries])`|Returns a set difference of tuple sketches, where each input expression must return a tuple sketch which contains an array of double values as its Summary Object. The values contained in the Summary Object are preserved as is. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).|
|`DS_TUPLE_DOUBLES_UNION(expr, ..., [nominalEntries])`|Returns a union of tuple sketches, where each input expression must return a tuple sketch which contains an array of double values as its Summary Object. The values contained in the Summary Objects are summed when combined. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).|
## Other scalar functions
|Function|Notes|

View File

@ -22,11 +22,18 @@ package org.apache.druid.query.aggregation.datasketches.tuple;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchMetricsSumEstimateOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchSetIntersectOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchSetNotOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchSetUnionOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchSqlAggregator;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings;
import java.util.Collections;
import java.util.List;
@ -58,9 +65,14 @@ public class ArrayOfDoublesSketchModule implements DruidModule
@Override
public void configure(final Binder binder)
{
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH, new ArrayOfDoublesSketchMergeComplexMetricSerde());
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG, new ArrayOfDoublesSketchMergeComplexMetricSerde());
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG, new ArrayOfDoublesSketchBuildComplexMetricSerde());
registerSerde();
SqlBindings.addAggregator(binder, ArrayOfDoublesSketchSqlAggregator.class);
SqlBindings.addOperatorConversion(binder, ArrayOfDoublesSketchMetricsSumEstimateOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ArrayOfDoublesSketchSetIntersectOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ArrayOfDoublesSketchSetUnionOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ArrayOfDoublesSketchSetNotOperatorConversion.class);
}
@Override
@ -124,4 +136,13 @@ public class ArrayOfDoublesSketchModule implements DruidModule
);
}
@VisibleForTesting
public static void registerSerde()
{
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH, new ArrayOfDoublesSketchMergeComplexMetricSerde());
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG, new ArrayOfDoublesSketchMergeComplexMetricSerde());
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG, new ArrayOfDoublesSketchBuildComplexMetricSerde());
}
}

View File

@ -55,6 +55,9 @@ public class ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator extends Arra
public double[] compute(final Map<String, Object> combinedAggregators)
{
final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
if (sketch == null) {
return null;
}
final SummaryStatistics[] stats = new SummaryStatistics[sketch.getNumValues()];
Arrays.setAll(stats, i -> new SummaryStatistics());
final ArrayOfDoublesSketchIterator it = sketch.iterator();

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.List;
public class ArrayOfDoublesSketchMetricsSumEstimateOperatorConversion implements SqlOperatorConversion
{
private static final String FUNCTION_NAME = "DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
.operandTypes(SqlTypeFamily.ANY)
.returnTypeNullableArrayWithNullableElements(SqlTypeName.DOUBLE)
.build();
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final PostAggregator firstOperand = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor,
true
);
if (firstOperand == null) {
return null;
}
return new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
firstOperand
);
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.datasketches.Util;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchSetOpPostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public abstract class ArrayOfDoublesSketchSetBaseOperatorConversion implements SqlOperatorConversion
{
public ArrayOfDoublesSketchSetBaseOperatorConversion()
{
}
@Override
public SqlOperator calciteOperator()
{
return makeSqlFunction();
}
@Nullable
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
plannerContext.setPlanningError("%s can only be used on aggregates. " +
"It cannot be used directly on a column or on a scalar expression.", getFunctionName());
return null;
}
@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final List<PostAggregator> inputPostAggs = new ArrayList<>();
final int nominalEntries;
Integer numberOfvalues = null;
final int metricExpressionEndIndex;
final int lastArgIndex = operands.size() - 1;
final RexNode potentialNominalEntriesArg = operands.get(lastArgIndex);
if (potentialNominalEntriesArg.isA(SqlKind.LITERAL) &&
RexLiteral.value(potentialNominalEntriesArg) instanceof Number) {
nominalEntries = ((Number) RexLiteral.value(potentialNominalEntriesArg)).intValue();
metricExpressionEndIndex = lastArgIndex - 1;
} else {
nominalEntries = Util.DEFAULT_NOMINAL_ENTRIES;
metricExpressionEndIndex = lastArgIndex;
}
for (int i = 0; i <= metricExpressionEndIndex; i++) {
RexNode operand = operands.get(i);
final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operand,
postAggregatorVisitor,
true
);
if (convertedPostAgg == null) {
return null;
} else {
inputPostAggs.add(convertedPostAgg);
}
}
return new ArrayOfDoublesSketchSetOpPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
getSetOperationName(),
nominalEntries,
numberOfvalues,
inputPostAggs
);
}
private SqlFunction makeSqlFunction()
{
return new SqlFunction(
getFunctionName(),
SqlKind.OTHER_FUNCTION,
ArrayOfDoublesSketchSqlOperators.RETURN_TYPE_INFERENCE,
null,
OperandTypes.variadic(SqlOperandCountRanges.from(2)),
SqlFunctionCategory.USER_DEFINED_FUNCTION
);
}
public abstract String getSetOperationName();
public String getFunctionName()
{
return StringUtils.format("DS_TUPLE_DOUBLES_%s", getSetOperationName());
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
public class ArrayOfDoublesSketchSetIntersectOperatorConversion extends ArrayOfDoublesSketchSetBaseOperatorConversion
{
@Override
public String getSetOperationName()
{
return "INTERSECT";
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
public class ArrayOfDoublesSketchSetNotOperatorConversion extends ArrayOfDoublesSketchSetBaseOperatorConversion
{
@Override
public String getSetOperationName()
{
return "NOT";
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
public class ArrayOfDoublesSketchSetUnionOperatorConversion extends ArrayOfDoublesSketchSetBaseOperatorConversion
{
@Override
public String getSetOperationName()
{
return "UNION";
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.util.Optionality;
import org.apache.datasketches.Util;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ArrayOfDoublesSketchSqlAggregator implements SqlAggregator
{
private static final SqlAggFunction FUNCTION_INSTANCE = new ArrayOfDoublesSqlAggFunction();
private static final String NAME = "DS_TUPLE_DOUBLES";
@Override
public SqlAggFunction calciteFunction()
{
return FUNCTION_INSTANCE;
}
@Nullable
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
Project project,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
{
final List<Integer> argList = aggregateCall.getArgList();
// check last argument for nomimalEntries
final int nominalEntries;
final int metricExpressionEndIndex;
final int lastArgIndex = argList.size() - 1;
final RexNode potentialNominalEntriesArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
argList.get(lastArgIndex)
);
if (potentialNominalEntriesArg.isA(SqlKind.LITERAL) &&
RexLiteral.value(potentialNominalEntriesArg) instanceof Number) {
nominalEntries = ((Number) RexLiteral.value(potentialNominalEntriesArg)).intValue();
metricExpressionEndIndex = lastArgIndex - 1;
} else {
nominalEntries = Util.DEFAULT_NOMINAL_ENTRIES;
metricExpressionEndIndex = lastArgIndex;
}
final List<String> fieldNames = new ArrayList<>();
for (int i = 0; i <= metricExpressionEndIndex; i++) {
final String fieldName;
final RexNode columnRexNode = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
argList.get(i)
);
final DruidExpression columnArg = Expressions.toDruidExpression(
plannerContext,
rowSignature,
columnRexNode
);
if (columnArg == null) {
return null;
}
if (columnArg.isDirectColumnAccess() &&
rowSignature.getColumnType(columnArg.getDirectColumn())
.map(type -> type.is(ValueType.COMPLEX))
.orElse(false)) {
fieldName = columnArg.getDirectColumn();
} else {
final RelDataType dataType = columnRexNode.getType();
final ColumnType inputType = Calcites.getColumnTypeForRelDataType(dataType);
if (inputType == null) {
throw new ISE(
"Cannot translate sqlTypeName[%s] to Druid type for field[%s]",
dataType.getSqlTypeName(),
name
);
}
final DimensionSpec dimensionSpec;
if (columnArg.isDirectColumnAccess()) {
dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType);
} else {
String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
columnArg,
dataType
);
dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType);
}
fieldName = dimensionSpec.getDimension();
}
fieldNames.add(fieldName);
}
final AggregatorFactory aggregatorFactory;
final List<String> metricColumns = fieldNames.size() > 1 ? fieldNames.subList(1, fieldNames.size()) : null;
aggregatorFactory = new ArrayOfDoublesSketchAggregatorFactory(
name,
fieldNames.get(0), // first field is dimension
nominalEntries,
metricColumns,
null
);
return Aggregation.create(
Collections.singletonList(aggregatorFactory),
null);
}
private static class ArrayOfDoublesSqlAggFunction extends SqlAggFunction
{
ArrayOfDoublesSqlAggFunction()
{
super(
NAME,
null,
SqlKind.OTHER_FUNCTION,
ArrayOfDoublesSketchSqlOperators.RETURN_TYPE_INFERENCE,
InferTypes.VARCHAR_1024,
OperandTypes.VARIADIC,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
false,
false,
Optionality.FORBIDDEN
);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.sql.calcite.table.RowSignatures;
public class ArrayOfDoublesSketchSqlOperators
{
public static final SqlReturnTypeInference RETURN_TYPE_INFERENCE =
opBinding -> RowSignatures.makeComplexType(
opBinding.getTypeFactory(),
ColumnType.ofComplex(ArrayOfDoublesSketchModule.ARRAY_OF_DOUBLES_SKETCH),
true
);
}

View File

@ -55,8 +55,8 @@ public class ArrayOfDoublesSketchAggregationTest extends InitializedNullHandling
public ArrayOfDoublesSketchAggregationTest(final GroupByQueryConfig config)
{
ArrayOfDoublesSketchModule.registerSerde();
DruidModule module = new ArrayOfDoublesSketchModule();
module.configure(null);
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
module.getJacksonModules(), config, tempFolder);
tsHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(module.getJacksonModules(), tempFolder);

View File

@ -0,0 +1,452 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.tuple.sql;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchOperations;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchSetOpPostAggregator;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
public class ArrayOfDoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
{
private static final String DATA_SOURCE = "foo";
// built from ArrayOfDoublesUpdatableSketch.update("FEDCAB", new double[] {0.0}).compact()
private static final String COMPACT_BASE_64_ENCODED_SKETCH_FOR_INTERSECTION = "AQEJAwgBzJP/////////fwEAAAAAAAAAjFnadZuMrkgAAAAAAAAAAA==";
private static final List<InputRow> ROWS = ImmutableList.of(
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-01")
.put("dim1", "CA")
.put("dim2", "FEDCAB")
.put("m1", 5)
.build(),
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-01")
.put("dim1", "US")
.put("dim2", "ABCDEF")
.put("m1", 12)
.build(),
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-02")
.put("dim1", "CA")
.put("dim2", "FEDCAB")
.put("m1", 3)
.build(),
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-02")
.put("dim1", "US")
.put("dim2", "ABCDEF")
.put("m1", 8)
.build(),
ImmutableMap.<String, Object>builder()
.put("t", "2000-01-03")
.put("dim1", "US")
.put("dim2", "ABCDEF")
.put("m1", 2)
.build()
).stream().map(TestDataBuilder::createRow).collect(Collectors.toList());
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModule(new ArrayOfDoublesSketchModule());
}
@Override
public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final JoinableFactoryWrapper joinableFactory,
final Injector injector
) throws IOException
{
ArrayOfDoublesSketchModule.registerSerde();
final QueryableIndex index = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(
OffHeapMemorySegmentWriteOutMediumFactory.instance()
)
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new ArrayOfDoublesSketchAggregatorFactory(
"tuplesketch_dim2",
"dim2",
null,
ImmutableList.of("m1"),
1
),
new LongSumAggregatorFactory("m1", "m1")
)
.withRollup(false)
.build()
)
.rows(ROWS)
.buildMMappedIndex();
return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(index.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build(),
index
);
}
@Test
public void testMetricsSumEstimate()
{
cannotVectorize();
final String sql = "SELECT\n"
+ " dim1,\n"
+ " SUM(cnt),\n"
+ " DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(DS_TUPLE_DOUBLES(tuplesketch_dim2)),\n"
+ " DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(DS_TUPLE_DOUBLES(dim2, m1)),\n"
+ " DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(DS_TUPLE_DOUBLES(dim2, m1, 256))\n"
+ "FROM druid.foo\n"
+ "GROUP BY dim1";
final List<Object[]> expectedResults;
expectedResults = ImmutableList.of(
new Object[]{
"CA",
2L,
"[8.0]",
"[8.0]",
"[8.0]"
},
new Object[]{
"US",
3L,
"[22.0]",
"[22.0]",
"[22.0]"
}
);
testQuery(
sql,
ImmutableList.of(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new LongSumAggregatorFactory("a0", "cnt"),
new ArrayOfDoublesSketchAggregatorFactory(
"a1",
"tuplesketch_dim2",
null,
null,
null
),
new ArrayOfDoublesSketchAggregatorFactory(
"a2",
"dim2",
null,
ImmutableList.of("m1"),
null
),
new ArrayOfDoublesSketchAggregatorFactory(
"a3",
"dim2",
256,
ImmutableList.of("m1"),
null
)
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"p1",
new FieldAccessPostAggregator("p0", "a1")
),
new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"p3",
new FieldAccessPostAggregator("p2", "a2")
),
new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"p5",
new FieldAccessPostAggregator("p4", "a3")
)
)
)
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@Test
public void testMetricsSumEstimateIntersect()
{
cannotVectorize();
final String sql = "SELECT\n"
+ " SUM(cnt),\n"
+ " DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(DS_TUPLE_DOUBLES(tuplesketch_dim2)) AS all_sum_estimates,\n"
+ StringUtils.replace(
"DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(DS_TUPLE_DOUBLES_INTERSECT(COMPLEX_DECODE_BASE64('arrayOfDoublesSketch', '%s'), DS_TUPLE_DOUBLES(tuplesketch_dim2), 128)) AS intersect_sum_estimates\n",
"%s",
COMPACT_BASE_64_ENCODED_SKETCH_FOR_INTERSECTION
)
+ "FROM druid.foo";
final List<Object[]> expectedResults;
expectedResults = ImmutableList.of(
new Object[]{
5L,
"[30.0]",
"[8.0]"
}
);
final String expectedBase64Constant = "'"
+ StringUtils.replace(COMPACT_BASE_64_ENCODED_SKETCH_FOR_INTERSECTION, "=", "\\u003D")
+ "'";
testQuery(
sql,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(
ImmutableList.of(
new LongSumAggregatorFactory("a0", "cnt"),
new ArrayOfDoublesSketchAggregatorFactory(
"a1",
"tuplesketch_dim2",
null,
null,
null
)
)
)
.postAggregators(
ImmutableList.of(
new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"p1",
new FieldAccessPostAggregator("p0", "a1")
),
new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"p5",
new ArrayOfDoublesSketchSetOpPostAggregator(
"p4",
"INTERSECT",
128,
null,
ImmutableList.of(
new ExpressionPostAggregator(
"p2",
"complex_decode_base64('arrayOfDoublesSketch'," + expectedBase64Constant + ")",
null,
queryFramework().macroTable()
),
new FieldAccessPostAggregator("p3", "a1")
)
)
)
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@Test
public void testNullInputs()
{
cannotVectorize();
final String sql = "SELECT\n"
+ " DS_TUPLE_DOUBLES(NULL),\n"
+ " DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(NULL),\n"
+ " DS_TUPLE_DOUBLES_UNION(NULL, NULL),\n"
+ " DS_TUPLE_DOUBLES_UNION(NULL, DS_TUPLE_DOUBLES(tuplesketch_dim2)),\n"
+ " DS_TUPLE_DOUBLES_UNION(DS_TUPLE_DOUBLES(tuplesketch_dim2), NULL)\n"
+ "FROM druid.foo";
final List<Object[]> expectedResults;
expectedResults = ImmutableList.of(
new Object[]{
"0.0",
null,
"\"AQEJAwQBzJP/////////fw==\"",
"\"AQEJAwgBzJP/////////fwIAAAAAAAAAjFnadZuMrkg6WYAWZ8t1NgAAAAAAACBAAAAAAAAANkA=\"",
"\"AQEJAwgBzJP/////////fwIAAAAAAAAAjFnadZuMrkg6WYAWZ8t1NgAAAAAAACBAAAAAAAAANkA=\"",
}
);
testQuery(
sql,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"null",
ColumnType.STRING,
queryFramework().macroTable()
)
)
.aggregators(
ImmutableList.of(
new ArrayOfDoublesSketchAggregatorFactory(
"a0",
"v0",
null,
null,
null
),
new ArrayOfDoublesSketchAggregatorFactory(
"a1",
"tuplesketch_dim2",
null,
null,
null
)
)
)
.postAggregators(
ImmutableList.of(
new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
"p1",
new ExpressionPostAggregator("p0", "null", null, queryFramework().macroTable())
),
new ArrayOfDoublesSketchSetOpPostAggregator(
"p4",
ArrayOfDoublesSketchOperations.Operation.UNION.name(),
null,
null,
ImmutableList.of(
new ExpressionPostAggregator("p2", "null", null, queryFramework().macroTable()),
new ExpressionPostAggregator("p3", "null", null, queryFramework().macroTable())
)
),
new ArrayOfDoublesSketchSetOpPostAggregator(
"p7",
ArrayOfDoublesSketchOperations.Operation.UNION.name(),
null,
null,
ImmutableList.of(
new ExpressionPostAggregator("p5", "null", null, queryFramework().macroTable()),
new FieldAccessPostAggregator("p6", "a1")
)
),
new ArrayOfDoublesSketchSetOpPostAggregator(
"p10",
ArrayOfDoublesSketchOperations.Operation.UNION.name(),
null,
null,
ImmutableList.of(
new FieldAccessPostAggregator("p8", "a1"),
new ExpressionPostAggregator("p9", "null", null, queryFramework().macroTable())
)
)
)
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
expectedResults
);
}
@Test
public void testArrayOfDoublesSketchIntersectOnScalarExpression()
{
assertQueryIsUnplannable("SELECT DS_TUPLE_DOUBLES_INTERSECT(NULL, NULL) FROM foo",
"Possible error: DS_TUPLE_DOUBLES_INTERSECT can only be used on aggregates. " +
"It cannot be used directly on a column or on a scalar expression.");
}
@Test
public void testArrayOfDoublesSketchNotOnScalarExpression()
{
assertQueryIsUnplannable("SELECT DS_TUPLE_DOUBLES_NOT(NULL, NULL) FROM foo",
"Possible error: DS_TUPLE_DOUBLES_NOT can only be used on aggregates. " +
"It cannot be used directly on a column or on a scalar expression.");
}
@Test
public void testArrayOfDoublesSketchUnionOnScalarExpression()
{
assertQueryIsUnplannable("SELECT DS_TUPLE_DOUBLES_UNION(NULL, NULL) FROM foo",
"Possible error: DS_TUPLE_DOUBLES_UNION can only be used on aggregates. " +
"It cannot be used directly on a column or on a scalar expression.");
}
}

View File

@ -138,6 +138,12 @@
<skipTests>false</skipTests>
<skipUTs>${skipTests}</skipUTs>
<skipITs>${skipTests}</skipITs>
<!-- NetBeans has a problem when we use late binding with @ in the surefire arg line.
Therefore we set this empty property here.
ref: https://github.com/mapstruct/mapstruct/pull/1241
-->
<jacocoArgLine />
</properties>
<modules>

View File

@ -2259,6 +2259,11 @@ DS_QUANTILE_SUMMARY
DS_QUANTILES_SKETCH
DS_RANK
DS_THETA
DS_TUPLE_DOUBLES
DS_TUPLE_DOUBLES_INTERSECT
DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE
DS_TUPLE_DOUBLES_NOT
DS_TUPLE_DOUBLES_UNION
EARLIEST_BY
_e_
HLL_SKETCH_ESTIMATE