mirror of https://github.com/apache/druid.git
ExpressionPostAggregator: Automatically finalize inputs. (#4406)
* ExpressionPostAggregator: Automatically finalize inputs. Raw HyperLogLogCollectors and such aren't very useful. When writing expressions like `x / y` users will expect `x` and `y` to be finalized. * Fix un-merge. * Code review comments. * Remove unnecessary ImmutableMap.copyOf.
This commit is contained in:
parent
b87f037b77
commit
22aad08a59
|
@ -22,8 +22,11 @@ package io.druid.query.aggregation.post;
|
|||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.java.util.common.guava.Comparators;
|
||||
import io.druid.math.expr.Expr;
|
||||
import io.druid.math.expr.ExprMacroTable;
|
||||
|
@ -32,10 +35,12 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ExpressionPostAggregator implements PostAggregator
|
||||
{
|
||||
|
@ -55,6 +60,7 @@ public class ExpressionPostAggregator implements PostAggregator
|
|||
private final Comparator<Comparable> comparator;
|
||||
private final String ordering;
|
||||
private final ExprMacroTable macroTable;
|
||||
private final Map<String, Function<Object, Object>> finalizers;
|
||||
|
||||
private final Expr parsed;
|
||||
private final Set<String> dependentFields;
|
||||
|
@ -69,6 +75,20 @@ public class ExpressionPostAggregator implements PostAggregator
|
|||
@JsonProperty("ordering") String ordering,
|
||||
@JacksonInject ExprMacroTable macroTable
|
||||
)
|
||||
{
|
||||
this(name, expression, ordering, macroTable, ImmutableMap.of());
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for {@link #decorate(Map)}.
|
||||
*/
|
||||
private ExpressionPostAggregator(
|
||||
final String name,
|
||||
final String expression,
|
||||
@Nullable final String ordering,
|
||||
final ExprMacroTable macroTable,
|
||||
final Map<String, Function<Object, Object>> finalizers
|
||||
)
|
||||
{
|
||||
Preconditions.checkArgument(expression != null, "expression cannot be null");
|
||||
|
||||
|
@ -77,15 +97,12 @@ public class ExpressionPostAggregator implements PostAggregator
|
|||
this.ordering = ordering;
|
||||
this.comparator = ordering == null ? DEFAULT_COMPARATOR : Ordering.valueOf(ordering);
|
||||
this.macroTable = macroTable;
|
||||
this.finalizers = finalizers;
|
||||
|
||||
this.parsed = Parser.parse(expression, macroTable);
|
||||
this.dependentFields = ImmutableSet.copyOf(Parser.findRequiredBindings(parsed));
|
||||
}
|
||||
|
||||
public ExpressionPostAggregator(String name, String fnName)
|
||||
{
|
||||
this(name, fnName, null, ExprMacroTable.nil());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getDependentFields()
|
||||
{
|
||||
|
@ -101,7 +118,16 @@ public class ExpressionPostAggregator implements PostAggregator
|
|||
@Override
|
||||
public Object compute(Map<String, Object> values)
|
||||
{
|
||||
return parsed.eval(Parser.withMap(values)).value();
|
||||
// Maps.transformEntries is lazy, will only finalize values we actually read.
|
||||
final Map<String, Object> finalizedValues = Maps.transformEntries(
|
||||
values,
|
||||
(String k, Object v) -> {
|
||||
final Function<Object, Object> finalizer = finalizers.get(k);
|
||||
return finalizer != null ? finalizer.apply(v) : v;
|
||||
}
|
||||
);
|
||||
|
||||
return parsed.eval(Parser.withMap(finalizedValues)).value();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,9 +138,20 @@ public class ExpressionPostAggregator implements PostAggregator
|
|||
}
|
||||
|
||||
@Override
|
||||
public ExpressionPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
|
||||
public ExpressionPostAggregator decorate(final Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
return this;
|
||||
return new ExpressionPostAggregator(
|
||||
name,
|
||||
expression,
|
||||
ordering,
|
||||
macroTable,
|
||||
aggregators.entrySet().stream().collect(
|
||||
Collectors.toMap(
|
||||
entry -> entry.getKey(),
|
||||
entry -> entry.getValue()::finalizeComputation
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@JsonProperty("expression")
|
||||
|
|
|
@ -640,6 +640,56 @@ public class TopNQueryRunnerTest
|
|||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopNOverHyperUniqueExpression()
|
||||
{
|
||||
TopNQuery query = new TopNQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimension(QueryRunnerTestHelper.marketDimension)
|
||||
.metric(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric)
|
||||
.threshold(3)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(QueryRunnerTestHelper.qualityUniques)
|
||||
)
|
||||
.postAggregators(
|
||||
Collections.singletonList(new ExpressionPostAggregator(
|
||||
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
|
||||
"uniques + 1",
|
||||
null,
|
||||
TestExprMacroTable.INSTANCE
|
||||
))
|
||||
)
|
||||
.build();
|
||||
|
||||
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||
new Result<>(
|
||||
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||
new TopNResultValue(
|
||||
Arrays.<Map<String, Object>>asList(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("market", "spot")
|
||||
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_9)
|
||||
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_9 + 1)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("market", "total_market")
|
||||
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_2)
|
||||
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_2 + 1)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("market", "upfront")
|
||||
.put(QueryRunnerTestHelper.uniqueMetric, QueryRunnerTestHelper.UNIQUES_2)
|
||||
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, QueryRunnerTestHelper.UNIQUES_2 + 1)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopNOverFirstLastAggregator()
|
||||
{
|
||||
|
|
|
@ -210,7 +210,8 @@ public class Expressions
|
|||
final String name,
|
||||
final List<String> rowOrder,
|
||||
final List<PostAggregatorFactory> finalizingPostAggregatorFactories,
|
||||
final RexNode expression
|
||||
final RexNode expression,
|
||||
final PlannerContext plannerContext
|
||||
)
|
||||
{
|
||||
final PostAggregator retVal;
|
||||
|
@ -226,7 +227,7 @@ public class Expressions
|
|||
// types internally and there isn't much we can do to respect
|
||||
// TODO(gianm): Probably not a good idea to ignore CAST like this.
|
||||
final RexNode operand = ((RexCall) expression).getOperands().get(0);
|
||||
retVal = toPostAggregator(name, rowOrder, finalizingPostAggregatorFactories, operand);
|
||||
retVal = toPostAggregator(name, rowOrder, finalizingPostAggregatorFactories, operand, plannerContext);
|
||||
} else if (expression.getKind() == SqlKind.LITERAL
|
||||
&& SqlTypeName.NUMERIC_TYPES.contains(expression.getType().getSqlTypeName())) {
|
||||
retVal = new ConstantPostAggregator(name, (Number) RexLiteral.value(expression));
|
||||
|
@ -246,7 +247,8 @@ public class Expressions
|
|||
null,
|
||||
rowOrder,
|
||||
finalizingPostAggregatorFactories,
|
||||
operand
|
||||
operand,
|
||||
plannerContext
|
||||
);
|
||||
if (translatedOperand == null) {
|
||||
return null;
|
||||
|
@ -260,7 +262,7 @@ public class Expressions
|
|||
if (mathExpression == null) {
|
||||
retVal = null;
|
||||
} else {
|
||||
retVal = new ExpressionPostAggregator(name, mathExpression);
|
||||
retVal = new ExpressionPostAggregator(name, mathExpression, null, plannerContext.getExprMacroTable());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -540,7 +540,8 @@ public class GroupByRules
|
|||
postAggregatorName,
|
||||
rowOrder,
|
||||
finalizingPostAggregatorFactories,
|
||||
projectExpression
|
||||
projectExpression,
|
||||
druidRel.getPlannerContext()
|
||||
);
|
||||
if (postAggregator != null) {
|
||||
newAggregations.add(Aggregation.create(postAggregator));
|
||||
|
|
|
@ -1529,7 +1529,7 @@ public class CalciteQueryTest
|
|||
new DoubleSumAggregatorFactory("a2", "m1", null, macroTable)
|
||||
))
|
||||
.postAggregators(ImmutableList.of(
|
||||
new ExpressionPostAggregator("a3", "log((\"a1\" + \"a2\"))"),
|
||||
EXPRESSION_POST_AGG("a3", "log((\"a1\" + \"a2\"))"),
|
||||
new ArithmeticPostAggregator("a4", "quotient", ImmutableList.of(
|
||||
new FieldAccessPostAggregator(null, "a1"),
|
||||
new ConstantPostAggregator(null, 0.25)
|
||||
|
@ -4416,4 +4416,9 @@ public class CalciteQueryTest
|
|||
{
|
||||
return Arrays.asList(aggregators);
|
||||
}
|
||||
|
||||
private static ExpressionPostAggregator EXPRESSION_POST_AGG(final String name, final String expression)
|
||||
{
|
||||
return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue