diff --git a/docs/querying/sql.md b/docs/querying/sql.md index be420de1e4d..dfa58e137b8 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -195,6 +195,10 @@ Only the COUNT aggregation can accept DISTINCT. |`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| +|`EARLIEST(expr)`|Returns the earliest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.| +|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| +|`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.| +|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx). diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index 983ed9b2202..1b30bf716fe 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -102,6 +103,11 @@ public class StringFirstAggregatorFactory extends AggregatorFactory { Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + if (maxStringBytes != null && maxStringBytes < 0) { + throw new IAE("maxStringBytes must be greater than 0"); + } + this.name = name; this.fieldName = fieldName; this.maxStringBytes = maxStringBytes == null diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index b024af022db..9277d0529dd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -58,6 +59,11 @@ public class StringLastAggregatorFactory extends AggregatorFactory { Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + if (maxStringBytes != null && maxStringBytes < 0) { + throw new IAE("maxStringBytes must be greater than 0"); + } + this.name = name; this.fieldName = fieldName; this.maxStringBytes = maxStringBytes == null diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java index c7181424f69..d54b20caeb8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java @@ -30,6 +30,7 @@ import org.apache.druid.sql.calcite.table.RowSignature; import javax.annotation.Nullable; import java.util.List; +import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; @@ -40,18 +41,40 @@ public class Aggregations // No instantiation. } + /** + * Get Druid expressions that correspond to "simple" aggregator inputs. This is used by standard sum/min/max + * aggregators, which have the following properties: + * + * 1) They can take direct field accesses or expressions as inputs. + * 2) They cannot implicitly cast strings to numbers when using a direct field access. + * + * @param plannerContext SQL planner context + * @param rowSignature input row signature + * @param call aggregate call object + * @param project project that should be applied before aggregation; may be null + * + * @return list of expressions corresponding to aggregator arguments, or null if any cannot be translated + */ @Nullable public static List getArgumentsForSimpleAggregator( final PlannerContext plannerContext, final RowSignature rowSignature, final AggregateCall call, - final Project project + @Nullable final Project project ) { - return call.getArgList().stream() - .map(i -> Expressions.fromFieldAccess(rowSignature, project, i)) - .map(rexNode -> toDruidExpressionForSimpleAggregator(plannerContext, rowSignature, rexNode)) - .collect(Collectors.toList()); + final List args = call + .getArgList() + .stream() + .map(i -> Expressions.fromFieldAccess(rowSignature, project, i)) + .map(rexNode -> toDruidExpressionForSimpleAggregator(plannerContext, rowSignature, rexNode)) + .collect(Collectors.toList()); + + if (args.stream().noneMatch(Objects::isNull)) { + return args; + } else { + return null; + } } private static DruidExpression toDruidExpressionForSimpleAggregator( @@ -68,7 +91,8 @@ public class Aggregations if (druidExpression.isSimpleExtraction() && (!druidExpression.isDirectColumnAccess() || rowSignature.getColumnType(druidExpression.getDirectColumn()) == ValueType.STRING)) { - // Aggregators are unable to implicitly cast strings to numbers. So remove the simple extraction in this case. + // Aggregators are unable to implicitly cast strings to numbers. + // So remove the simple extraction, which forces the expression to be used instead of the direct column access. return druidExpression.map(simpleExtraction -> null, Function.identity()); } else { return druidExpression; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java new file mode 100644 index 00000000000..347080f9bd8 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.aggregation.builtin; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.InferTypes; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; +import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; +import org.apache.druid.sql.calcite.table.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class EarliestLatestSqlAggregator implements SqlAggregator +{ + public static final SqlAggregator EARLIEST = new EarliestLatestSqlAggregator(EarliestOrLatest.EARLIEST); + public static final SqlAggregator LATEST = new EarliestLatestSqlAggregator(EarliestOrLatest.LATEST); + + enum EarliestOrLatest + { + EARLIEST { + @Override + AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes) + { + switch (type) { + case LONG: + return new LongFirstAggregatorFactory(name, fieldName); + case FLOAT: + return new FloatFirstAggregatorFactory(name, fieldName); + case DOUBLE: + return new DoubleFirstAggregatorFactory(name, fieldName); + case STRING: + return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes); + default: + throw new ISE("Cannot build aggregatorFactory for type[%s]", type); + } + } + }, + + LATEST { + @Override + AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes) + { + switch (type) { + case LONG: + return new LongLastAggregatorFactory(name, fieldName); + case FLOAT: + return new FloatLastAggregatorFactory(name, fieldName); + case DOUBLE: + return new DoubleLastAggregatorFactory(name, fieldName); + case STRING: + return new StringLastAggregatorFactory(name, fieldName, maxStringBytes); + default: + throw new ISE("Cannot build aggregatorFactory for type[%s]", type); + } + } + }; + + abstract AggregatorFactory createAggregatorFactory( + String name, + String fieldName, + ValueType outputType, + int maxStringBytes + ); + } + + private final EarliestOrLatest earliestOrLatest; + private final SqlAggFunction function; + + private EarliestLatestSqlAggregator(final EarliestOrLatest earliestOrLatest) + { + this.earliestOrLatest = earliestOrLatest; + this.function = new EarliestLatestSqlAggFunction(earliestOrLatest); + } + + @Override + public SqlAggFunction calciteFunction() + { + return function; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + final PlannerContext plannerContext, + final RowSignature rowSignature, + final VirtualColumnRegistry virtualColumnRegistry, + final RexBuilder rexBuilder, + final String name, + final AggregateCall aggregateCall, + final Project project, + final List existingAggregations, + final boolean finalizeAggregations + ) + { + final List rexNodes = aggregateCall + .getArgList() + .stream() + .map(i -> Expressions.fromFieldAccess(rowSignature, project, i)) + .collect(Collectors.toList()); + + final List args = Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes); + + if (args == null) { + return null; + } + + final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; + final String fieldName; + + if (args.get(0).isDirectColumnAccess()) { + fieldName = args.get(0).getDirectColumn(); + } else { + final SqlTypeName sqlTypeName = rexNodes.get(0).getType().getSqlTypeName(); + final VirtualColumn virtualColumn = + virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, args.get(0), sqlTypeName); + fieldName = virtualColumn.getOutputName(); + } + + // Second arg must be a literal, if it exists (the type signature below requires it). + final int maxBytes = rexNodes.size() > 1 ? RexLiteral.intValue(rexNodes.get(1)) : -1; + + final ValueType outputType = Calcites.getValueTypeForSqlTypeName(aggregateCall.getType().getSqlTypeName()); + if (outputType == null) { + throw new ISE( + "Cannot translate output sqlTypeName[%s] to Druid type for aggregator[%s]", + aggregateCall.getType().getSqlTypeName(), + aggregateCall.getName() + ); + } + + return Aggregation.create( + Stream.of(virtualColumnRegistry.getVirtualColumn(fieldName)) + .filter(Objects::nonNull) + .collect(Collectors.toList()), + Collections.singletonList( + earliestOrLatest.createAggregatorFactory( + aggregatorName, + fieldName, + outputType, + maxBytes + ) + ), + finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorName) : null + ); + } + + private static class EarliestLatestSqlAggFunction extends SqlAggFunction + { + EarliestLatestSqlAggFunction(EarliestOrLatest earliestOrLatest) + { + super( + earliestOrLatest.name(), + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.ARG0, + InferTypes.RETURN_TYPE, + OperandTypes.or( + OperandTypes.or(OperandTypes.NUMERIC, OperandTypes.BOOLEAN), + OperandTypes.sequence( + "'" + earliestOrLatest.name() + "(expr, maxBytesPerString)'\n", + OperandTypes.STRING, + OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL) + ) + ), + SqlFunctionCategory.STRING, + false, + false + ); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java index 4764cf41468..9b6ceaa6bff 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java @@ -88,7 +88,7 @@ public class Expressions */ public static RexNode fromFieldAccess( final RowSignature rowSignature, - final Project project, + @Nullable final Project project, final int fieldNumber ) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java index a6b7ce842a5..e010be99019 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java @@ -35,6 +35,7 @@ import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.ApproxCountDistinctSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator; +import org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator; @@ -117,6 +118,8 @@ public class DruidOperatorTable implements SqlOperatorTable .add(new ApproxCountDistinctSqlAggregator()) .add(new AvgSqlAggregator()) .add(new CountSqlAggregator()) + .add(EarliestLatestSqlAggregator.EARLIEST) + .add(EarliestLatestSqlAggregator.LATEST) .add(new MinSqlAggregator()) .add(new MaxSqlAggregator()) .add(new SumSqlAggregator()) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 4b411da4a85..251272b3195 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -43,10 +43,17 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.ExtractionDimensionSpec; import org.apache.druid.query.extraction.RegexDimExtractionFn; @@ -314,21 +321,21 @@ public class CalciteQueryTest extends BaseCalciteQueryTest + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", ImmutableList.of(), ImmutableList.builder() - .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"}) - .add(new Object[]{"druid", "aview", "VIEW"}) - .add(new Object[]{"druid", "bview", "VIEW"}) - .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) - .build() + .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"}) + .add(new Object[]{"druid", "aview", "VIEW"}) + .add(new Object[]{"druid", "bview", "VIEW"}) + .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"}) + .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"}) + .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) + .build() ); testQuery( @@ -889,6 +896,123 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testEarliestAggregators() throws Exception + { + // Cannot vectorize EARLIEST aggregator. + skipVectorize(); + + testQuery( + "SELECT " + + "EARLIEST(cnt), EARLIEST(m1), EARLIEST(dim1, 10), " + + "EARLIEST(cnt + 1), EARLIEST(m1 + 1), EARLIEST(dim1 || CAST(cnt AS VARCHAR), 10) " + + "FROM druid.foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .virtualColumns( + expressionVirtualColumn("v0", "(\"cnt\" + 1)", ValueType.LONG), + expressionVirtualColumn("v1", "(\"m1\" + 1)", ValueType.FLOAT), + expressionVirtualColumn("v2", "concat(\"dim1\",CAST(\"cnt\", 'STRING'))", ValueType.STRING) + ) + .aggregators( + aggregators( + new LongFirstAggregatorFactory("a0", "cnt"), + new FloatFirstAggregatorFactory("a1", "m1"), + new StringFirstAggregatorFactory("a2", "dim1", 10), + new LongFirstAggregatorFactory("a3", "v0"), + new FloatFirstAggregatorFactory("a4", "v1"), + new StringFirstAggregatorFactory("a5", "v2", 10) + ) + ) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1L, 1.0f, NullHandling.sqlCompatible() ? "" : "10.1", 2L, 2.0f, "1"} + ) + ); + } + + @Test + public void testLatestAggregators() throws Exception + { + // Cannot vectorize LATEST aggregator. + skipVectorize(); + + testQuery( + "SELECT " + + "LATEST(cnt), LATEST(m1), LATEST(dim1, 10), " + + "LATEST(cnt + 1), LATEST(m1 + 1), LATEST(dim1 || CAST(cnt AS VARCHAR), 10) " + + "FROM druid.foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .virtualColumns( + expressionVirtualColumn("v0", "(\"cnt\" + 1)", ValueType.LONG), + expressionVirtualColumn("v1", "(\"m1\" + 1)", ValueType.FLOAT), + expressionVirtualColumn("v2", "concat(\"dim1\",CAST(\"cnt\", 'STRING'))", ValueType.STRING) + ) + .aggregators( + aggregators( + new LongLastAggregatorFactory("a0", "cnt"), + new FloatLastAggregatorFactory("a1", "m1"), + new StringLastAggregatorFactory("a2", "dim1", 10), + new LongLastAggregatorFactory("a3", "v0"), + new FloatLastAggregatorFactory("a4", "v1"), + new StringLastAggregatorFactory("a5", "v2", 10) + ) + ) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1"} + ) + ); + } + + @Test + public void testLatestInSubquery() throws Exception + { + // Cannot vectorize LATEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0:a", "m1"))) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0} + ) + ); + } + @Test public void testGroupByLong() throws Exception {