mirror of https://github.com/apache/druid.git
SQL: Finalize aggregations for inner queries when necessary. (#6221)
* SQL: Finalize aggregations for inner queries when necessary. Fixes #5779. * Fixed test method name.
This commit is contained in:
parent
9803ce954a
commit
28e6ae3664
|
@ -48,6 +48,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
|
|||
import org.apache.calcite.sql.type.SqlTypeFamily;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -62,6 +63,7 @@ public class QuantileSqlAggregator implements SqlAggregator
|
|||
return FUNCTION_INSTANCE;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Aggregation toDruidAggregation(
|
||||
final PlannerContext plannerContext,
|
||||
|
@ -70,7 +72,8 @@ public class QuantileSqlAggregator implements SqlAggregator
|
|||
final String name,
|
||||
final AggregateCall aggregateCall,
|
||||
final Project project,
|
||||
final List<Aggregation> existingAggregations
|
||||
final List<Aggregation> existingAggregations,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
final DruidExpression input = Expressions.toDruidExpression(
|
||||
|
|
|
@ -20,13 +20,13 @@
|
|||
package io.druid.sql.calcite.aggregation;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.math.expr.ExprMacroTable;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.VirtualColumn;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.sql.calcite.expression.DruidExpression;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
|
@ -80,7 +80,7 @@ public class DimensionExpression
|
|||
@Nullable
|
||||
public String getVirtualColumnName()
|
||||
{
|
||||
return expression.isSimpleExtraction() ? null : StringUtils.format("%s:v", outputName);
|
||||
return expression.isSimpleExtraction() ? null : Calcites.makePrefixedName(outputName, "v");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -53,6 +53,9 @@ public interface SqlAggregator
|
|||
* @param project project that should be applied before aggregation; may be null
|
||||
* @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely
|
||||
* ignored if you do not want to re-use existing aggregations.
|
||||
* @param finalizeAggregations true if this query should include explicit finalization for all of its
|
||||
* aggregators, where required. This is set for subqueries where Druid's native query
|
||||
* layer does not do this automatically.
|
||||
*
|
||||
* @return aggregation, or null if the call cannot be translated
|
||||
*/
|
||||
|
@ -64,6 +67,7 @@ public interface SqlAggregator
|
|||
String name,
|
||||
AggregateCall aggregateCall,
|
||||
Project project,
|
||||
List<Aggregation> existingAggregations
|
||||
List<Aggregation> existingAggregations,
|
||||
boolean finalizeAggregations
|
||||
);
|
||||
}
|
||||
|
|
|
@ -22,9 +22,9 @@ package io.druid.sql.calcite.aggregation.builtin;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
|
@ -52,6 +52,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
||||
|
@ -74,7 +75,8 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
|||
final String name,
|
||||
final AggregateCall aggregateCall,
|
||||
final Project project,
|
||||
final List<Aggregation> existingAggregations
|
||||
final List<Aggregation> existingAggregations,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
// Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access
|
||||
|
@ -92,14 +94,15 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
|||
|
||||
final List<VirtualColumn> virtualColumns = new ArrayList<>();
|
||||
final AggregatorFactory aggregatorFactory;
|
||||
final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
|
||||
|
||||
if (arg.isDirectColumnAccess() && rowSignature.getColumnType(arg.getDirectColumn()) == ValueType.COMPLEX) {
|
||||
aggregatorFactory = new HyperUniquesAggregatorFactory(name, arg.getDirectColumn(), false, true);
|
||||
aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName, arg.getDirectColumn(), false, true);
|
||||
} else {
|
||||
final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
|
||||
final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
|
||||
if (inputType == null) {
|
||||
throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, name);
|
||||
throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName);
|
||||
}
|
||||
|
||||
final DimensionSpec dimensionSpec;
|
||||
|
@ -108,7 +111,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
|||
dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType);
|
||||
} else {
|
||||
final ExpressionVirtualColumn virtualColumn = arg.toVirtualColumn(
|
||||
StringUtils.format("%s:v", name),
|
||||
Calcites.makePrefixedName(name, "v"),
|
||||
inputType,
|
||||
plannerContext.getExprMacroTable()
|
||||
);
|
||||
|
@ -116,10 +119,20 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
|||
virtualColumns.add(virtualColumn);
|
||||
}
|
||||
|
||||
aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true);
|
||||
aggregatorFactory = new CardinalityAggregatorFactory(
|
||||
aggregatorName,
|
||||
null,
|
||||
ImmutableList.of(dimensionSpec),
|
||||
false,
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
return Aggregation.create(virtualColumns, aggregatorFactory);
|
||||
return Aggregation.create(
|
||||
virtualColumns,
|
||||
Collections.singletonList(aggregatorFactory),
|
||||
finalizeAggregations ? new HyperUniqueFinalizingPostAggregator(name, aggregatorFactory.getName()) : null
|
||||
);
|
||||
}
|
||||
|
||||
private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction
|
||||
|
|
|
@ -21,7 +21,6 @@ package io.druid.sql.calcite.aggregation.builtin;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.math.expr.ExprMacroTable;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
|
@ -32,6 +31,7 @@ import io.druid.sql.calcite.aggregation.Aggregation;
|
|||
import io.druid.sql.calcite.aggregation.Aggregations;
|
||||
import io.druid.sql.calcite.aggregation.SqlAggregator;
|
||||
import io.druid.sql.calcite.expression.DruidExpression;
|
||||
import io.druid.sql.calcite.planner.Calcites;
|
||||
import io.druid.sql.calcite.planner.PlannerContext;
|
||||
import io.druid.sql.calcite.table.RowSignature;
|
||||
import org.apache.calcite.rel.core.AggregateCall;
|
||||
|
@ -61,7 +61,8 @@ public class AvgSqlAggregator implements SqlAggregator
|
|||
final String name,
|
||||
final AggregateCall aggregateCall,
|
||||
final Project project,
|
||||
final List<Aggregation> existingAggregations
|
||||
final List<Aggregation> existingAggregations,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
if (aggregateCall.isDistinct()) {
|
||||
|
@ -102,8 +103,8 @@ public class AvgSqlAggregator implements SqlAggregator
|
|||
expression = arg.getExpression();
|
||||
}
|
||||
|
||||
final String sumName = StringUtils.format("%s:sum", name);
|
||||
final String countName = StringUtils.format("%s:count", name);
|
||||
final String sumName = Calcites.makePrefixedName(name, "sum");
|
||||
final String countName = Calcites.makePrefixedName(name, "count");
|
||||
final AggregatorFactory sum = SumSqlAggregator.createSumAggregatorFactory(
|
||||
sumType,
|
||||
sumName,
|
||||
|
|
|
@ -60,7 +60,8 @@ public class CountSqlAggregator implements SqlAggregator
|
|||
final String name,
|
||||
final AggregateCall aggregateCall,
|
||||
final Project project,
|
||||
final List<Aggregation> existingAggregations
|
||||
final List<Aggregation> existingAggregations,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
final List<DruidExpression> args = Aggregations.getArgumentsForSimpleAggregator(
|
||||
|
@ -87,7 +88,8 @@ public class CountSqlAggregator implements SqlAggregator
|
|||
name,
|
||||
aggregateCall,
|
||||
project,
|
||||
existingAggregations
|
||||
existingAggregations,
|
||||
finalizeAggregations
|
||||
);
|
||||
} else {
|
||||
return null;
|
||||
|
|
|
@ -60,7 +60,8 @@ public class MaxSqlAggregator implements SqlAggregator
|
|||
final String name,
|
||||
final AggregateCall aggregateCall,
|
||||
final Project project,
|
||||
final List<Aggregation> existingAggregations
|
||||
final List<Aggregation> existingAggregations,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
if (aggregateCall.isDistinct()) {
|
||||
|
|
|
@ -60,7 +60,8 @@ public class MinSqlAggregator implements SqlAggregator
|
|||
final String name,
|
||||
final AggregateCall aggregateCall,
|
||||
final Project project,
|
||||
final List<Aggregation> existingAggregations
|
||||
final List<Aggregation> existingAggregations,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
if (aggregateCall.isDistinct()) {
|
||||
|
|
|
@ -60,7 +60,8 @@ public class SumSqlAggregator implements SqlAggregator
|
|||
final String name,
|
||||
final AggregateCall aggregateCall,
|
||||
final Project project,
|
||||
final List<Aggregation> existingAggregations
|
||||
final List<Aggregation> existingAggregations,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
if (aggregateCall.isDistinct()) {
|
||||
|
|
|
@ -339,21 +339,26 @@ public class Calcites
|
|||
return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName());
|
||||
}
|
||||
|
||||
public static String findOutputNamePrefix(final String basePrefix, final NavigableSet<String> strings)
|
||||
public static String findUnusedPrefix(final String basePrefix, final NavigableSet<String> strings)
|
||||
{
|
||||
String prefix = basePrefix;
|
||||
|
||||
while (!isUsablePrefix(strings, prefix)) {
|
||||
while (!isUnusedPrefix(prefix, strings)) {
|
||||
prefix = "_" + prefix;
|
||||
}
|
||||
|
||||
return prefix;
|
||||
}
|
||||
|
||||
private static boolean isUsablePrefix(final NavigableSet<String> strings, final String prefix)
|
||||
private static boolean isUnusedPrefix(final String prefix, final NavigableSet<String> strings)
|
||||
{
|
||||
// ":" is one character after "9"
|
||||
final NavigableSet<String> subSet = strings.subSet(prefix + "0", true, prefix + ":", false);
|
||||
return subSet.isEmpty();
|
||||
}
|
||||
|
||||
public static String makePrefixedName(final String prefix, final String suffix)
|
||||
{
|
||||
return StringUtils.format("%s:%s", prefix, suffix);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,7 +88,11 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
|
|||
@Override
|
||||
public Sequence<Object[]> runQuery()
|
||||
{
|
||||
final DruidQuery query = toDruidQuery();
|
||||
// runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
|
||||
// is the outermost query and it will actually get run as a native query. Druid's native query layer will
|
||||
// finalize aggregations for the outermost query even if we don't explicitly ask it to.
|
||||
|
||||
final DruidQuery query = toDruidQuery(false);
|
||||
if (query != null) {
|
||||
return getQueryMaker().runQuery(query);
|
||||
} else {
|
||||
|
@ -116,9 +120,11 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
|
|||
|
||||
@Nullable
|
||||
@Override
|
||||
public DruidQuery toDruidQuery()
|
||||
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
|
||||
{
|
||||
final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery();
|
||||
// Must finalize aggregations on subqueries.
|
||||
|
||||
final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true);
|
||||
if (subQuery == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -128,7 +134,8 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
|
|||
new QueryDataSource(subQuery.toGroupByQuery()),
|
||||
sourceRowSignature,
|
||||
getPlannerContext(),
|
||||
getCluster().getRexBuilder()
|
||||
getCluster().getRexBuilder(),
|
||||
finalizeAggregations
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -142,7 +149,8 @@ public class DruidOuterQueryRel extends DruidRel<DruidOuterQueryRel>
|
|||
sourceRel.getRowType()
|
||||
),
|
||||
getPlannerContext(),
|
||||
getCluster().getRexBuilder()
|
||||
getCluster().getRexBuilder(),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -131,7 +131,8 @@ public class DruidQuery
|
|||
final DataSource dataSource,
|
||||
final RowSignature sourceRowSignature,
|
||||
final PlannerContext plannerContext,
|
||||
final RexBuilder rexBuilder
|
||||
final RexBuilder rexBuilder,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
|
@ -142,7 +143,7 @@ public class DruidQuery
|
|||
// Now the fun begins.
|
||||
this.filter = computeWhereFilter(partialQuery, sourceRowSignature, plannerContext);
|
||||
this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceRowSignature);
|
||||
this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder);
|
||||
this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder, finalizeAggregations);
|
||||
|
||||
final RowSignature sortingInputRowSignature;
|
||||
|
||||
|
@ -222,7 +223,7 @@ public class DruidQuery
|
|||
final List<VirtualColumn> virtualColumns = new ArrayList<>();
|
||||
final List<String> rowOrder = new ArrayList<>();
|
||||
|
||||
final String virtualColumnPrefix = Calcites.findOutputNamePrefix(
|
||||
final String virtualColumnPrefix = Calcites.findUnusedPrefix(
|
||||
"v",
|
||||
new TreeSet<>(sourceRowSignature.getRowOrder())
|
||||
);
|
||||
|
@ -254,7 +255,8 @@ public class DruidQuery
|
|||
final PartialDruidQuery partialQuery,
|
||||
final PlannerContext plannerContext,
|
||||
final RowSignature sourceRowSignature,
|
||||
final RexBuilder rexBuilder
|
||||
final RexBuilder rexBuilder,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
final Aggregate aggregate = partialQuery.getAggregate();
|
||||
|
@ -269,7 +271,8 @@ public class DruidQuery
|
|||
partialQuery,
|
||||
plannerContext,
|
||||
sourceRowSignature,
|
||||
rexBuilder
|
||||
rexBuilder,
|
||||
finalizeAggregations
|
||||
);
|
||||
|
||||
final RowSignature aggregateRowSignature = RowSignature.from(
|
||||
|
@ -428,7 +431,7 @@ public class DruidQuery
|
|||
{
|
||||
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
|
||||
final List<DimensionExpression> dimensions = new ArrayList<>();
|
||||
final String outputNamePrefix = Calcites.findOutputNamePrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder()));
|
||||
final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder()));
|
||||
int outputNameCounter = 0;
|
||||
|
||||
for (int i : aggregate.getGroupSet()) {
|
||||
|
@ -460,10 +463,13 @@ public class DruidQuery
|
|||
/**
|
||||
* Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order.
|
||||
*
|
||||
* @param partialQuery partial query
|
||||
* @param plannerContext planner context
|
||||
* @param sourceRowSignature source row signature
|
||||
* @param rexBuilder calcite RexBuilder
|
||||
* @param partialQuery partial query
|
||||
* @param plannerContext planner context
|
||||
* @param sourceRowSignature source row signature
|
||||
* @param rexBuilder calcite RexBuilder
|
||||
* @param finalizeAggregations true if this query should include explicit finalization for all of its
|
||||
* aggregators, where required. Useful for subqueries where Druid's native query layer
|
||||
* does not do this automatically.
|
||||
*
|
||||
* @return aggregations
|
||||
*
|
||||
|
@ -473,12 +479,13 @@ public class DruidQuery
|
|||
final PartialDruidQuery partialQuery,
|
||||
final PlannerContext plannerContext,
|
||||
final RowSignature sourceRowSignature,
|
||||
final RexBuilder rexBuilder
|
||||
final RexBuilder rexBuilder,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate());
|
||||
final List<Aggregation> aggregations = new ArrayList<>();
|
||||
final String outputNamePrefix = Calcites.findOutputNamePrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder()));
|
||||
final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder()));
|
||||
|
||||
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
|
||||
final String aggName = outputNamePrefix + i;
|
||||
|
@ -490,7 +497,8 @@ public class DruidQuery
|
|||
partialQuery.getSelectProject(),
|
||||
aggCall,
|
||||
aggregations,
|
||||
aggName
|
||||
aggName,
|
||||
finalizeAggregations
|
||||
);
|
||||
|
||||
if (aggregation == null) {
|
||||
|
|
|
@ -93,20 +93,21 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
|
|||
|
||||
@Override
|
||||
@Nonnull
|
||||
public DruidQuery toDruidQuery()
|
||||
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
|
||||
{
|
||||
return partialQuery.build(
|
||||
druidTable.getDataSource(),
|
||||
druidTable.getRowSignature(),
|
||||
getPlannerContext(),
|
||||
getCluster().getRexBuilder()
|
||||
getCluster().getRexBuilder(),
|
||||
finalizeAggregations
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidQuery toDruidQueryForExplaining()
|
||||
{
|
||||
return toDruidQuery();
|
||||
return toDruidQuery(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -169,7 +170,11 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
|
|||
@Override
|
||||
public Sequence<Object[]> runQuery()
|
||||
{
|
||||
return getQueryMaker().runQuery(toDruidQuery());
|
||||
// runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
|
||||
// is the outermost query and it will actually get run as a native query. Druid's native query layer will
|
||||
// finalize aggregations for the outermost query even if we don't explicitly ask it to.
|
||||
|
||||
return getQueryMaker().runQuery(toDruidQuery(false));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -76,12 +76,16 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode imple
|
|||
*
|
||||
* This method may return null if it knows that this rel will yield an empty result set.
|
||||
*
|
||||
* @param finalizeAggregations true if this query should include explicit finalization for all of its
|
||||
* aggregators, where required. Useful for subqueries where Druid's native query layer
|
||||
* does not do this automatically.
|
||||
*
|
||||
* @return query, or null if it is known in advance that this rel will yield an empty result set.
|
||||
*
|
||||
* @throws CannotBuildQueryException
|
||||
*/
|
||||
@Nullable
|
||||
public abstract DruidQuery toDruidQuery();
|
||||
public abstract DruidQuery toDruidQuery(boolean finalizeAggregations);
|
||||
|
||||
/**
|
||||
* Convert this DruidRel to a DruidQuery for purposes of explaining. This must be an inexpensive operation. For
|
||||
|
|
|
@ -143,10 +143,10 @@ public class DruidSemiJoin extends DruidRel<DruidSemiJoin>
|
|||
|
||||
@Nullable
|
||||
@Override
|
||||
public DruidQuery toDruidQuery()
|
||||
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
|
||||
{
|
||||
final DruidRel rel = getLeftRelWithFilter();
|
||||
return rel != null ? rel.toDruidQuery() : null;
|
||||
return rel != null ? rel.toDruidQuery(finalizeAggregations) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -278,10 +278,11 @@ public class PartialDruidQuery
|
|||
final DataSource dataSource,
|
||||
final RowSignature sourceRowSignature,
|
||||
final PlannerContext plannerContext,
|
||||
final RexBuilder rexBuilder
|
||||
final RexBuilder rexBuilder,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder);
|
||||
return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations);
|
||||
}
|
||||
|
||||
public boolean canAccept(final Stage stage)
|
||||
|
|
|
@ -107,7 +107,7 @@ public class DruidRules
|
|||
{
|
||||
super(
|
||||
operand(relClass, operand(DruidRel.class, any())),
|
||||
StringUtils.format("%s:%s", DruidQueryRule.class.getSimpleName(), stage)
|
||||
StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage)
|
||||
);
|
||||
this.stage = stage;
|
||||
this.f = f;
|
||||
|
@ -261,7 +261,7 @@ public class DruidRules
|
|||
|
||||
public DruidOuterQueryRule(final RelOptRuleOperand op, final String description)
|
||||
{
|
||||
super(op, StringUtils.format("%s:%s", DruidOuterQueryRel.class.getSimpleName(), description));
|
||||
super(op, StringUtils.format("%s(%s)", DruidOuterQueryRel.class.getSimpleName(), description));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -57,7 +57,8 @@ public class GroupByRules
|
|||
final Project project,
|
||||
final AggregateCall call,
|
||||
final List<Aggregation> existingAggregations,
|
||||
final String name
|
||||
final String name,
|
||||
final boolean finalizeAggregations
|
||||
)
|
||||
{
|
||||
final DimFilter filter;
|
||||
|
@ -125,7 +126,8 @@ public class GroupByRules
|
|||
name,
|
||||
call,
|
||||
project,
|
||||
existingAggregationsWithSameFilter
|
||||
existingAggregationsWithSameFilter,
|
||||
finalizeAggregations
|
||||
);
|
||||
|
||||
if (retVal == null) {
|
||||
|
|
|
@ -50,6 +50,7 @@ import io.druid.query.aggregation.LongMaxAggregatorFactory;
|
|||
import io.druid.query.aggregation.LongMinAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import io.druid.query.aggregation.post.ExpressionPostAggregator;
|
||||
|
@ -2560,8 +2561,10 @@ public class CalciteQueryTest extends CalciteTestBase
|
|||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0")))
|
||||
.setAggregatorSpecs(new FloatMinAggregatorFactory("a0", "m1"),
|
||||
new FloatMaxAggregatorFactory("a1", "m1"))
|
||||
.setAggregatorSpecs(
|
||||
new FloatMinAggregatorFactory("a0", "m1"),
|
||||
new FloatMaxAggregatorFactory("a1", "m1")
|
||||
)
|
||||
.setPostAggregatorSpecs(ImmutableList.of(EXPRESSION_POST_AGG("p0", "(\"a0\" + \"a1\")")))
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
|
@ -2602,8 +2605,10 @@ public class CalciteQueryTest extends CalciteTestBase
|
|||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0")))
|
||||
.setAggregatorSpecs(new FloatMinAggregatorFactory("a0", "m1"),
|
||||
new FloatMaxAggregatorFactory("a1", "m1"))
|
||||
.setAggregatorSpecs(
|
||||
new FloatMinAggregatorFactory("a0", "m1"),
|
||||
new FloatMaxAggregatorFactory("a1", "m1")
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
ImmutableList.of(
|
||||
EXPRESSION_POST_AGG("p0", "(\"a0\" + \"a1\")")
|
||||
|
@ -4384,6 +4389,74 @@ public class CalciteQueryTest extends CalciteTestBase
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvgDailyCountDistinct() throws Exception
|
||||
{
|
||||
testQuery(
|
||||
"SELECT\n"
|
||||
+ " AVG(u)\n"
|
||||
+ "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT(cnt) AS u FROM druid.foo GROUP BY 1)",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(
|
||||
new QueryDataSource(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(CalciteTests.DATASOURCE1)
|
||||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setVirtualColumns(
|
||||
EXPRESSION_VIRTUAL_COLUMN(
|
||||
"d0:v",
|
||||
"timestamp_floor(\"__time\",'P1D',null,'UTC')",
|
||||
ValueType.LONG
|
||||
)
|
||||
)
|
||||
.setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG)))
|
||||
.setAggregatorSpecs(
|
||||
AGGS(
|
||||
new CardinalityAggregatorFactory(
|
||||
"a0:a",
|
||||
null,
|
||||
DIMS(new DefaultDimensionSpec("cnt", "cnt", ValueType.LONG)),
|
||||
false,
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
.setPostAggregatorSpecs(
|
||||
ImmutableList.of(
|
||||
new HyperUniqueFinalizingPostAggregator("a0", "a0:a")
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.setInterval(QSS(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setAggregatorSpecs(AGGS(
|
||||
new LongSumAggregatorFactory("_a0:sum", "a0"),
|
||||
new CountAggregatorFactory("_a0:count")
|
||||
))
|
||||
.setPostAggregatorSpecs(
|
||||
ImmutableList.of(
|
||||
new ArithmeticPostAggregator(
|
||||
"_a0",
|
||||
"quotient",
|
||||
ImmutableList.of(
|
||||
new FieldAccessPostAggregator(null, "_a0:sum"),
|
||||
new FieldAccessPostAggregator(null, "_a0:count")
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(new Object[]{1L})
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopNFilterJoin() throws Exception
|
||||
{
|
||||
|
@ -6897,7 +6970,10 @@ public class CalciteQueryTest extends CalciteTestBase
|
|||
.setAggregatorSpecs(
|
||||
AGGS(new CountAggregatorFactory("a0"), new DoubleSumAggregatorFactory("a1", "m2"))
|
||||
)
|
||||
.setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG("p0", "(\"a1\" / \"a0\")")))
|
||||
.setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG(
|
||||
"p0",
|
||||
"(\"a1\" / \"a0\")"
|
||||
)))
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
Collections.singletonList(
|
||||
|
|
|
@ -40,16 +40,16 @@ public class CalcitesTest extends CalciteTestBase
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFindOutputNamePrefix()
|
||||
public void testFindUnusedPrefix()
|
||||
{
|
||||
Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar")));
|
||||
Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
|
||||
Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
|
||||
Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
|
||||
Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
|
||||
Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
|
||||
Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
|
||||
Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
|
||||
Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
|
||||
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar")));
|
||||
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));
|
||||
Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0")));
|
||||
Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4")));
|
||||
Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0")));
|
||||
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x")));
|
||||
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx")));
|
||||
Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x")));
|
||||
Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90")));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue