From a7e802c9d42a93830ee387bca38bf1d5f5c29b95 Mon Sep 17 00:00:00 2001 From: Himanshu Date: Mon, 16 Oct 2017 14:02:30 -0500 Subject: [PATCH] greater-than/less-than/equal-to havingSpec to call AggregatorFactory.finalizeComputation(..) (#4883) * greater-than/less-than/equal-to havingSpec to call AggregatorFactory.finalizeComputation(..) * fix the unit test and expect having to work on hyperUnique agg * test fix * fix style errors --- .../io/druid/query/groupby/GroupByQuery.java | 9 +++++++ .../query/groupby/having/AndHavingSpec.java | 9 +++++++ .../query/groupby/having/BaseHavingSpec.java | 7 +++++ .../groupby/having/EqualToHavingSpec.java | 13 +++++++++- .../groupby/having/GreaterThanHavingSpec.java | 13 +++++++++- .../query/groupby/having/HavingSpec.java | 3 +++ .../having/HavingSpecMetricComparator.java | 26 ++++++++++++++++--- .../groupby/having/LessThanHavingSpec.java | 13 +++++++++- .../query/groupby/having/NotHavingSpec.java | 7 +++++ .../query/groupby/having/OrHavingSpec.java | 9 +++++++ .../query/groupby/GroupByQueryRunnerTest.java | 4 --- 11 files changed, 102 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index f017cad0d2a..eed69f34384 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -68,6 +68,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -144,6 +145,7 @@ public class GroupByQuery extends BaseQuery postProcessingFn, (Sequence input) -> { havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this)); + havingSpec.setAggregators(getAggregatorsMap(aggregatorSpecs)); return Sequences.filter(input, havingSpec::eval); } ); @@ -696,6 +698,13 @@ public class GroupByQuery extends BaseQuery } } + private static Map getAggregatorsMap(List aggregatorSpecs) + { + Map map = new HashMap<>(aggregatorSpecs.size()); + aggregatorSpecs.stream().forEach(v -> map.put(v.getName(), v)); + return map; + } + public static class Builder { private DataSource dataSource; diff --git a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java index 4a5f962bcdc..2436c91613f 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.List; @@ -55,6 +56,14 @@ public class AndHavingSpec extends BaseHavingSpec } } + @Override + public void setAggregators(Map aggregators) + { + for (HavingSpec havingSpec : havingSpecs) { + havingSpec.setAggregators(aggregators); + } + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java index 5700e89dea2..be28eec2e4c 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java @@ -19,6 +19,7 @@ package io.druid.query.groupby.having; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -30,4 +31,10 @@ public abstract class BaseHavingSpec implements HavingSpec { // Do nothing. } + + @Override + public void setAggregators(Map aggregators) + { + + } } diff --git a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java index 89698285b80..2e382c4dd9c 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java @@ -22,6 +22,9 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value", @@ -32,6 +35,8 @@ public class EqualToHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + @JsonCreator public EqualToHavingSpec( @JsonProperty("aggregation") String aggName, @@ -54,10 +59,16 @@ public class EqualToHavingSpec extends BaseHavingSpec return aggregationName; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) == 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) == 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java index d75333b74fa..1c7ec437eec 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -22,6 +22,9 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value", @@ -32,6 +35,8 @@ public class GreaterThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + @JsonCreator public GreaterThanHavingSpec( @JsonProperty("aggregation") String aggName, @@ -54,10 +59,16 @@ public class GreaterThanHavingSpec extends BaseHavingSpec return value; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) > 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) > 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java index d3bc31c9989..a2431c65f60 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java @@ -22,6 +22,7 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -58,6 +59,8 @@ public interface HavingSpec */ void setRowSignature(Map rowSignature); + void setAggregators(Map aggregators); + /** * Evaluates if a given row satisfies the having spec. * diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java index cae152a1d09..ff08202bf7c 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java @@ -19,9 +19,15 @@ package io.druid.query.groupby.having; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.java.util.common.ISE; +import java.util.Map; import java.util.regex.Pattern; /** @@ -30,16 +36,28 @@ class HavingSpecMetricComparator { static final Pattern LONG_PAT = Pattern.compile("[-|+]?\\d+"); - static int compare(Row row, String aggregationName, Number value) + static int compare(Row row, String aggregationName, Number value, Map aggregators) { + Object metricValueObj = row.getRaw(aggregationName); + if (metricValueObj != null) { + if (aggregators != null && aggregators.containsKey(aggregationName)) { + metricValueObj = aggregators.get(aggregationName).finalizeComputation(metricValueObj); + } + if (metricValueObj instanceof Long) { - return Long.compare((Long) metricValueObj, value.longValue()); + long l = ((Long) metricValueObj).longValue(); + return Longs.compare(l, value.longValue()); } else if (metricValueObj instanceof Float) { - return Float.compare((Float) metricValueObj, value.floatValue()); + float l = ((Float) metricValueObj).floatValue(); + return Floats.compare(l, value.floatValue()); } else if (metricValueObj instanceof Double) { - return Double.compare((Double) metricValueObj, value.doubleValue()); + double l = ((Double) metricValueObj).longValue(); + return Doubles.compare(l, value.doubleValue()); + } else if (metricValueObj instanceof Integer) { + int l = ((Integer) metricValueObj).intValue(); + return Ints.compare(l, value.intValue()); } else if (metricValueObj instanceof String) { String metricValueStr = (String) metricValueObj; if (LONG_PAT.matcher(metricValueStr).matches()) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java index a60d5adc123..37d6268b866 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java @@ -21,6 +21,9 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value", @@ -31,6 +34,8 @@ public class LessThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + public LessThanHavingSpec( @JsonProperty("aggregation") String aggName, @JsonProperty("value") Number value @@ -52,10 +57,16 @@ public class LessThanHavingSpec extends BaseHavingSpec return value; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) < 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) < 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java index b049fd668ce..c36a35b2881 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java @@ -22,6 +22,7 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -51,6 +52,12 @@ public class NotHavingSpec extends BaseHavingSpec havingSpec.setRowSignature(rowSignature); } + @Override + public void setAggregators(Map aggregators) + { + havingSpec.setAggregators(aggregators); + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java index 67eb4edd3da..66ebbf8dedc 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.List; @@ -55,6 +56,14 @@ public class OrHavingSpec extends BaseHavingSpec } } + @Override + public void setAggregators(Map aggregators) + { + for (HavingSpec havingSpec : havingSpecs) { + havingSpec.setAggregators(aggregators); + } + } + @Override public boolean eval(Row row) { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index dcb64932e9f..fc9054889fc 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3318,10 +3318,6 @@ public class GroupByQueryRunnerTest ) ); - // havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized. - // See also: https://github.com/druid-io/druid/issues/2507 - expectedException.expect(ISE.class); - expectedException.expectMessage("Unknown type of metric value"); Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); }