mirror of https://github.com/apache/druid.git
SQL: EARLIEST, LATEST aggregators. (#8815)
* SQL: EARLIEST, LATEST aggregators. I chose these names instead of FIRST, LAST because those are already reserved functions in Calcite that mean something different. I think these are also better names anyway. * Finalify. * SQL updates. * Adjust aggregator calls. * Validations, test updates. * Review docs.
This commit is contained in:
parent
6eacaf446f
commit
0e8c3f74d0
|
@ -195,6 +195,10 @@ Only the COUNT aggregation can accept DISTINCT.
|
||||||
|`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|
|`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|
||||||
|`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|
|`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|
||||||
|`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|
|`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|
||||||
|
|`EARLIEST(expr)`|Returns the earliest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|
|
||||||
|
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|
||||||
|
|`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|
||||||
|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|
||||||
|
|
||||||
For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx).
|
For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx).
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
|
import org.apache.druid.java.util.common.IAE;
|
||||||
import org.apache.druid.query.aggregation.AggregateCombiner;
|
import org.apache.druid.query.aggregation.AggregateCombiner;
|
||||||
import org.apache.druid.query.aggregation.Aggregator;
|
import org.apache.druid.query.aggregation.Aggregator;
|
||||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||||
|
@ -102,6 +103,11 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
|
||||||
{
|
{
|
||||||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
||||||
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
|
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
|
||||||
|
|
||||||
|
if (maxStringBytes != null && maxStringBytes < 0) {
|
||||||
|
throw new IAE("maxStringBytes must be greater than 0");
|
||||||
|
}
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.fieldName = fieldName;
|
this.fieldName = fieldName;
|
||||||
this.maxStringBytes = maxStringBytes == null
|
this.maxStringBytes = maxStringBytes == null
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import org.apache.druid.java.util.common.IAE;
|
||||||
import org.apache.druid.query.aggregation.AggregateCombiner;
|
import org.apache.druid.query.aggregation.AggregateCombiner;
|
||||||
import org.apache.druid.query.aggregation.Aggregator;
|
import org.apache.druid.query.aggregation.Aggregator;
|
||||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||||
|
@ -58,6 +59,11 @@ public class StringLastAggregatorFactory extends AggregatorFactory
|
||||||
{
|
{
|
||||||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
|
||||||
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
|
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
|
||||||
|
|
||||||
|
if (maxStringBytes != null && maxStringBytes < 0) {
|
||||||
|
throw new IAE("maxStringBytes must be greater than 0");
|
||||||
|
}
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.fieldName = fieldName;
|
this.fieldName = fieldName;
|
||||||
this.maxStringBytes = maxStringBytes == null
|
this.maxStringBytes = maxStringBytes == null
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.druid.sql.calcite.table.RowSignature;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -40,18 +41,40 @@ public class Aggregations
|
||||||
// No instantiation.
|
// No instantiation.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Druid expressions that correspond to "simple" aggregator inputs. This is used by standard sum/min/max
|
||||||
|
* aggregators, which have the following properties:
|
||||||
|
*
|
||||||
|
* 1) They can take direct field accesses or expressions as inputs.
|
||||||
|
* 2) They cannot implicitly cast strings to numbers when using a direct field access.
|
||||||
|
*
|
||||||
|
* @param plannerContext SQL planner context
|
||||||
|
* @param rowSignature input row signature
|
||||||
|
* @param call aggregate call object
|
||||||
|
* @param project project that should be applied before aggregation; may be null
|
||||||
|
*
|
||||||
|
* @return list of expressions corresponding to aggregator arguments, or null if any cannot be translated
|
||||||
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public static List<DruidExpression> getArgumentsForSimpleAggregator(
|
public static List<DruidExpression> getArgumentsForSimpleAggregator(
|
||||||
final PlannerContext plannerContext,
|
final PlannerContext plannerContext,
|
||||||
final RowSignature rowSignature,
|
final RowSignature rowSignature,
|
||||||
final AggregateCall call,
|
final AggregateCall call,
|
||||||
final Project project
|
@Nullable final Project project
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return call.getArgList().stream()
|
final List<DruidExpression> args = call
|
||||||
|
.getArgList()
|
||||||
|
.stream()
|
||||||
.map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
|
.map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
|
||||||
.map(rexNode -> toDruidExpressionForSimpleAggregator(plannerContext, rowSignature, rexNode))
|
.map(rexNode -> toDruidExpressionForSimpleAggregator(plannerContext, rowSignature, rexNode))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
if (args.stream().noneMatch(Objects::isNull)) {
|
||||||
|
return args;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DruidExpression toDruidExpressionForSimpleAggregator(
|
private static DruidExpression toDruidExpressionForSimpleAggregator(
|
||||||
|
@ -68,7 +91,8 @@ public class Aggregations
|
||||||
if (druidExpression.isSimpleExtraction() &&
|
if (druidExpression.isSimpleExtraction() &&
|
||||||
(!druidExpression.isDirectColumnAccess()
|
(!druidExpression.isDirectColumnAccess()
|
||||||
|| rowSignature.getColumnType(druidExpression.getDirectColumn()) == ValueType.STRING)) {
|
|| rowSignature.getColumnType(druidExpression.getDirectColumn()) == ValueType.STRING)) {
|
||||||
// Aggregators are unable to implicitly cast strings to numbers. So remove the simple extraction in this case.
|
// Aggregators are unable to implicitly cast strings to numbers.
|
||||||
|
// So remove the simple extraction, which forces the expression to be used instead of the direct column access.
|
||||||
return druidExpression.map(simpleExtraction -> null, Function.identity());
|
return druidExpression.map(simpleExtraction -> null, Function.identity());
|
||||||
} else {
|
} else {
|
||||||
return druidExpression;
|
return druidExpression;
|
||||||
|
|
|
@ -0,0 +1,221 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.sql.calcite.aggregation.builtin;
|
||||||
|
|
||||||
|
import org.apache.calcite.rel.core.AggregateCall;
|
||||||
|
import org.apache.calcite.rel.core.Project;
|
||||||
|
import org.apache.calcite.rex.RexBuilder;
|
||||||
|
import org.apache.calcite.rex.RexLiteral;
|
||||||
|
import org.apache.calcite.rex.RexNode;
|
||||||
|
import org.apache.calcite.sql.SqlAggFunction;
|
||||||
|
import org.apache.calcite.sql.SqlFunctionCategory;
|
||||||
|
import org.apache.calcite.sql.SqlKind;
|
||||||
|
import org.apache.calcite.sql.type.InferTypes;
|
||||||
|
import org.apache.calcite.sql.type.OperandTypes;
|
||||||
|
import org.apache.calcite.sql.type.ReturnTypes;
|
||||||
|
import org.apache.calcite.sql.type.SqlTypeName;
|
||||||
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
|
||||||
|
import org.apache.druid.segment.VirtualColumn;
|
||||||
|
import org.apache.druid.segment.column.ValueType;
|
||||||
|
import org.apache.druid.sql.calcite.aggregation.Aggregation;
|
||||||
|
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
|
||||||
|
import org.apache.druid.sql.calcite.expression.DruidExpression;
|
||||||
|
import org.apache.druid.sql.calcite.expression.Expressions;
|
||||||
|
import org.apache.druid.sql.calcite.planner.Calcites;
|
||||||
|
import org.apache.druid.sql.calcite.planner.PlannerContext;
|
||||||
|
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
|
||||||
|
import org.apache.druid.sql.calcite.table.RowSignature;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
public class EarliestLatestSqlAggregator implements SqlAggregator
|
||||||
|
{
|
||||||
|
public static final SqlAggregator EARLIEST = new EarliestLatestSqlAggregator(EarliestOrLatest.EARLIEST);
|
||||||
|
public static final SqlAggregator LATEST = new EarliestLatestSqlAggregator(EarliestOrLatest.LATEST);
|
||||||
|
|
||||||
|
enum EarliestOrLatest
|
||||||
|
{
|
||||||
|
EARLIEST {
|
||||||
|
@Override
|
||||||
|
AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes)
|
||||||
|
{
|
||||||
|
switch (type) {
|
||||||
|
case LONG:
|
||||||
|
return new LongFirstAggregatorFactory(name, fieldName);
|
||||||
|
case FLOAT:
|
||||||
|
return new FloatFirstAggregatorFactory(name, fieldName);
|
||||||
|
case DOUBLE:
|
||||||
|
return new DoubleFirstAggregatorFactory(name, fieldName);
|
||||||
|
case STRING:
|
||||||
|
return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes);
|
||||||
|
default:
|
||||||
|
throw new ISE("Cannot build aggregatorFactory for type[%s]", type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
LATEST {
|
||||||
|
@Override
|
||||||
|
AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes)
|
||||||
|
{
|
||||||
|
switch (type) {
|
||||||
|
case LONG:
|
||||||
|
return new LongLastAggregatorFactory(name, fieldName);
|
||||||
|
case FLOAT:
|
||||||
|
return new FloatLastAggregatorFactory(name, fieldName);
|
||||||
|
case DOUBLE:
|
||||||
|
return new DoubleLastAggregatorFactory(name, fieldName);
|
||||||
|
case STRING:
|
||||||
|
return new StringLastAggregatorFactory(name, fieldName, maxStringBytes);
|
||||||
|
default:
|
||||||
|
throw new ISE("Cannot build aggregatorFactory for type[%s]", type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
abstract AggregatorFactory createAggregatorFactory(
|
||||||
|
String name,
|
||||||
|
String fieldName,
|
||||||
|
ValueType outputType,
|
||||||
|
int maxStringBytes
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final EarliestOrLatest earliestOrLatest;
|
||||||
|
private final SqlAggFunction function;
|
||||||
|
|
||||||
|
private EarliestLatestSqlAggregator(final EarliestOrLatest earliestOrLatest)
|
||||||
|
{
|
||||||
|
this.earliestOrLatest = earliestOrLatest;
|
||||||
|
this.function = new EarliestLatestSqlAggFunction(earliestOrLatest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SqlAggFunction calciteFunction()
|
||||||
|
{
|
||||||
|
return function;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public Aggregation toDruidAggregation(
|
||||||
|
final PlannerContext plannerContext,
|
||||||
|
final RowSignature rowSignature,
|
||||||
|
final VirtualColumnRegistry virtualColumnRegistry,
|
||||||
|
final RexBuilder rexBuilder,
|
||||||
|
final String name,
|
||||||
|
final AggregateCall aggregateCall,
|
||||||
|
final Project project,
|
||||||
|
final List<Aggregation> existingAggregations,
|
||||||
|
final boolean finalizeAggregations
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final List<RexNode> rexNodes = aggregateCall
|
||||||
|
.getArgList()
|
||||||
|
.stream()
|
||||||
|
.map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
final List<DruidExpression> args = Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes);
|
||||||
|
|
||||||
|
if (args == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
|
||||||
|
final String fieldName;
|
||||||
|
|
||||||
|
if (args.get(0).isDirectColumnAccess()) {
|
||||||
|
fieldName = args.get(0).getDirectColumn();
|
||||||
|
} else {
|
||||||
|
final SqlTypeName sqlTypeName = rexNodes.get(0).getType().getSqlTypeName();
|
||||||
|
final VirtualColumn virtualColumn =
|
||||||
|
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, args.get(0), sqlTypeName);
|
||||||
|
fieldName = virtualColumn.getOutputName();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second arg must be a literal, if it exists (the type signature below requires it).
|
||||||
|
final int maxBytes = rexNodes.size() > 1 ? RexLiteral.intValue(rexNodes.get(1)) : -1;
|
||||||
|
|
||||||
|
final ValueType outputType = Calcites.getValueTypeForSqlTypeName(aggregateCall.getType().getSqlTypeName());
|
||||||
|
if (outputType == null) {
|
||||||
|
throw new ISE(
|
||||||
|
"Cannot translate output sqlTypeName[%s] to Druid type for aggregator[%s]",
|
||||||
|
aggregateCall.getType().getSqlTypeName(),
|
||||||
|
aggregateCall.getName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Aggregation.create(
|
||||||
|
Stream.of(virtualColumnRegistry.getVirtualColumn(fieldName))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toList()),
|
||||||
|
Collections.singletonList(
|
||||||
|
earliestOrLatest.createAggregatorFactory(
|
||||||
|
aggregatorName,
|
||||||
|
fieldName,
|
||||||
|
outputType,
|
||||||
|
maxBytes
|
||||||
|
)
|
||||||
|
),
|
||||||
|
finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorName) : null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class EarliestLatestSqlAggFunction extends SqlAggFunction
|
||||||
|
{
|
||||||
|
EarliestLatestSqlAggFunction(EarliestOrLatest earliestOrLatest)
|
||||||
|
{
|
||||||
|
super(
|
||||||
|
earliestOrLatest.name(),
|
||||||
|
null,
|
||||||
|
SqlKind.OTHER_FUNCTION,
|
||||||
|
ReturnTypes.ARG0,
|
||||||
|
InferTypes.RETURN_TYPE,
|
||||||
|
OperandTypes.or(
|
||||||
|
OperandTypes.or(OperandTypes.NUMERIC, OperandTypes.BOOLEAN),
|
||||||
|
OperandTypes.sequence(
|
||||||
|
"'" + earliestOrLatest.name() + "(expr, maxBytesPerString)'\n",
|
||||||
|
OperandTypes.STRING,
|
||||||
|
OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
SqlFunctionCategory.STRING,
|
||||||
|
false,
|
||||||
|
false
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -88,7 +88,7 @@ public class Expressions
|
||||||
*/
|
*/
|
||||||
public static RexNode fromFieldAccess(
|
public static RexNode fromFieldAccess(
|
||||||
final RowSignature rowSignature,
|
final RowSignature rowSignature,
|
||||||
final Project project,
|
@Nullable final Project project,
|
||||||
final int fieldNumber
|
final int fieldNumber
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
|
||||||
import org.apache.druid.sql.calcite.aggregation.builtin.ApproxCountDistinctSqlAggregator;
|
import org.apache.druid.sql.calcite.aggregation.builtin.ApproxCountDistinctSqlAggregator;
|
||||||
import org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator;
|
import org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator;
|
||||||
import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator;
|
import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator;
|
||||||
|
import org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestSqlAggregator;
|
||||||
import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator;
|
import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator;
|
||||||
import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator;
|
import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator;
|
||||||
import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator;
|
import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator;
|
||||||
|
@ -117,6 +118,8 @@ public class DruidOperatorTable implements SqlOperatorTable
|
||||||
.add(new ApproxCountDistinctSqlAggregator())
|
.add(new ApproxCountDistinctSqlAggregator())
|
||||||
.add(new AvgSqlAggregator())
|
.add(new AvgSqlAggregator())
|
||||||
.add(new CountSqlAggregator())
|
.add(new CountSqlAggregator())
|
||||||
|
.add(EarliestLatestSqlAggregator.EARLIEST)
|
||||||
|
.add(EarliestLatestSqlAggregator.LATEST)
|
||||||
.add(new MinSqlAggregator())
|
.add(new MinSqlAggregator())
|
||||||
.add(new MaxSqlAggregator())
|
.add(new MaxSqlAggregator())
|
||||||
.add(new SumSqlAggregator())
|
.add(new SumSqlAggregator())
|
||||||
|
|
|
@ -43,10 +43,17 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
|
||||||
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
|
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
|
||||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
|
||||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
|
||||||
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
|
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||||
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
|
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||||
|
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
|
||||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||||
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
|
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
|
||||||
import org.apache.druid.query.extraction.RegexDimExtractionFn;
|
import org.apache.druid.query.extraction.RegexDimExtractionFn;
|
||||||
|
@ -889,6 +896,123 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEarliestAggregators() throws Exception
|
||||||
|
{
|
||||||
|
// Cannot vectorize EARLIEST aggregator.
|
||||||
|
skipVectorize();
|
||||||
|
|
||||||
|
testQuery(
|
||||||
|
"SELECT "
|
||||||
|
+ "EARLIEST(cnt), EARLIEST(m1), EARLIEST(dim1, 10), "
|
||||||
|
+ "EARLIEST(cnt + 1), EARLIEST(m1 + 1), EARLIEST(dim1 || CAST(cnt AS VARCHAR), 10) "
|
||||||
|
+ "FROM druid.foo",
|
||||||
|
ImmutableList.of(
|
||||||
|
Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(CalciteTests.DATASOURCE1)
|
||||||
|
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||||
|
.granularity(Granularities.ALL)
|
||||||
|
.virtualColumns(
|
||||||
|
expressionVirtualColumn("v0", "(\"cnt\" + 1)", ValueType.LONG),
|
||||||
|
expressionVirtualColumn("v1", "(\"m1\" + 1)", ValueType.FLOAT),
|
||||||
|
expressionVirtualColumn("v2", "concat(\"dim1\",CAST(\"cnt\", 'STRING'))", ValueType.STRING)
|
||||||
|
)
|
||||||
|
.aggregators(
|
||||||
|
aggregators(
|
||||||
|
new LongFirstAggregatorFactory("a0", "cnt"),
|
||||||
|
new FloatFirstAggregatorFactory("a1", "m1"),
|
||||||
|
new StringFirstAggregatorFactory("a2", "dim1", 10),
|
||||||
|
new LongFirstAggregatorFactory("a3", "v0"),
|
||||||
|
new FloatFirstAggregatorFactory("a4", "v1"),
|
||||||
|
new StringFirstAggregatorFactory("a5", "v2", 10)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||||
|
.build()
|
||||||
|
),
|
||||||
|
ImmutableList.of(
|
||||||
|
new Object[]{1L, 1.0f, NullHandling.sqlCompatible() ? "" : "10.1", 2L, 2.0f, "1"}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLatestAggregators() throws Exception
|
||||||
|
{
|
||||||
|
// Cannot vectorize LATEST aggregator.
|
||||||
|
skipVectorize();
|
||||||
|
|
||||||
|
testQuery(
|
||||||
|
"SELECT "
|
||||||
|
+ "LATEST(cnt), LATEST(m1), LATEST(dim1, 10), "
|
||||||
|
+ "LATEST(cnt + 1), LATEST(m1 + 1), LATEST(dim1 || CAST(cnt AS VARCHAR), 10) "
|
||||||
|
+ "FROM druid.foo",
|
||||||
|
ImmutableList.of(
|
||||||
|
Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(CalciteTests.DATASOURCE1)
|
||||||
|
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||||
|
.granularity(Granularities.ALL)
|
||||||
|
.virtualColumns(
|
||||||
|
expressionVirtualColumn("v0", "(\"cnt\" + 1)", ValueType.LONG),
|
||||||
|
expressionVirtualColumn("v1", "(\"m1\" + 1)", ValueType.FLOAT),
|
||||||
|
expressionVirtualColumn("v2", "concat(\"dim1\",CAST(\"cnt\", 'STRING'))", ValueType.STRING)
|
||||||
|
)
|
||||||
|
.aggregators(
|
||||||
|
aggregators(
|
||||||
|
new LongLastAggregatorFactory("a0", "cnt"),
|
||||||
|
new FloatLastAggregatorFactory("a1", "m1"),
|
||||||
|
new StringLastAggregatorFactory("a2", "dim1", 10),
|
||||||
|
new LongLastAggregatorFactory("a3", "v0"),
|
||||||
|
new FloatLastAggregatorFactory("a4", "v1"),
|
||||||
|
new StringLastAggregatorFactory("a5", "v2", 10)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||||
|
.build()
|
||||||
|
),
|
||||||
|
ImmutableList.of(
|
||||||
|
new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1"}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLatestInSubquery() throws Exception
|
||||||
|
{
|
||||||
|
// Cannot vectorize LATEST aggregator.
|
||||||
|
skipVectorize();
|
||||||
|
|
||||||
|
testQuery(
|
||||||
|
"SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP BY dim2)",
|
||||||
|
ImmutableList.of(
|
||||||
|
GroupByQuery.builder()
|
||||||
|
.setDataSource(
|
||||||
|
GroupByQuery.builder()
|
||||||
|
.setDataSource(CalciteTests.DATASOURCE1)
|
||||||
|
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
|
||||||
|
.setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0:a", "m1")))
|
||||||
|
.setPostAggregatorSpecs(
|
||||||
|
ImmutableList.of(
|
||||||
|
new FinalizingFieldAccessPostAggregator("a0", "a0:a")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0")))
|
||||||
|
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||||
|
.build()
|
||||||
|
),
|
||||||
|
ImmutableList.of(
|
||||||
|
new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGroupByLong() throws Exception
|
public void testGroupByLong() throws Exception
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue