mirror of
https://github.com/apache/druid.git
synced 2025-02-26 04:55:24 +00:00
greater-than/less-than/equal-to havingSpec to call AggregatorFactory.finalizeComputation(..) (#4883)
* greater-than/less-than/equal-to havingSpec to call AggregatorFactory.finalizeComputation(..) * fix the unit test and expect having to work on hyperUnique agg * test fix * fix style errors
This commit is contained in:
parent
dc7cb117a1
commit
a7e802c9d4
@ -68,6 +68,7 @@ import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -144,6 +145,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
||||
postProcessingFn,
|
||||
(Sequence<Row> input) -> {
|
||||
havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this));
|
||||
havingSpec.setAggregators(getAggregatorsMap(aggregatorSpecs));
|
||||
return Sequences.filter(input, havingSpec::eval);
|
||||
}
|
||||
);
|
||||
@ -696,6 +698,13 @@ public class GroupByQuery extends BaseQuery<Row>
|
||||
}
|
||||
}
|
||||
|
||||
private static Map<String, AggregatorFactory> getAggregatorsMap(List<AggregatorFactory> aggregatorSpecs)
|
||||
{
|
||||
Map<String, AggregatorFactory> map = new HashMap<>(aggregatorSpecs.size());
|
||||
aggregatorSpecs.stream().forEach(v -> map.put(v.getName(), v));
|
||||
return map;
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
{
|
||||
private DataSource dataSource;
|
||||
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.List;
|
||||
@ -55,6 +56,14 @@ public class AndHavingSpec extends BaseHavingSpec
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregators(Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
for (HavingSpec havingSpec : havingSpecs) {
|
||||
havingSpec.setAggregators(aggregators);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package io.druid.query.groupby.having;
|
||||
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.Map;
|
||||
@ -30,4 +31,10 @@ public abstract class BaseHavingSpec implements HavingSpec
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregators(Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,9 @@ package io.druid.query.groupby.having;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value",
|
||||
@ -32,6 +35,8 @@ public class EqualToHavingSpec extends BaseHavingSpec
|
||||
private final String aggregationName;
|
||||
private final Number value;
|
||||
|
||||
private volatile Map<String, AggregatorFactory> aggregators;
|
||||
|
||||
@JsonCreator
|
||||
public EqualToHavingSpec(
|
||||
@JsonProperty("aggregation") String aggName,
|
||||
@ -54,10 +59,16 @@ public class EqualToHavingSpec extends BaseHavingSpec
|
||||
return aggregationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregators(Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
this.aggregators = aggregators;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
return HavingSpecMetricComparator.compare(row, aggregationName, value) == 0;
|
||||
return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -22,6 +22,9 @@ package io.druid.query.groupby.having;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value",
|
||||
@ -32,6 +35,8 @@ public class GreaterThanHavingSpec extends BaseHavingSpec
|
||||
private final String aggregationName;
|
||||
private final Number value;
|
||||
|
||||
private volatile Map<String, AggregatorFactory> aggregators;
|
||||
|
||||
@JsonCreator
|
||||
public GreaterThanHavingSpec(
|
||||
@JsonProperty("aggregation") String aggName,
|
||||
@ -54,10 +59,16 @@ public class GreaterThanHavingSpec extends BaseHavingSpec
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregators(Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
this.aggregators = aggregators;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
return HavingSpecMetricComparator.compare(row, aggregationName, value) > 0;
|
||||
return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -22,6 +22,7 @@ package io.druid.query.groupby.having;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.Map;
|
||||
@ -58,6 +59,8 @@ public interface HavingSpec
|
||||
*/
|
||||
void setRowSignature(Map<String, ValueType> rowSignature);
|
||||
|
||||
void setAggregators(Map<String, AggregatorFactory> aggregators);
|
||||
|
||||
/**
|
||||
* Evaluates if a given row satisfies the having spec.
|
||||
*
|
||||
|
@ -19,9 +19,15 @@
|
||||
|
||||
package io.druid.query.groupby.having;
|
||||
|
||||
import com.google.common.primitives.Doubles;
|
||||
import com.google.common.primitives.Floats;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.java.util.common.ISE;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
@ -30,16 +36,28 @@ class HavingSpecMetricComparator
|
||||
{
|
||||
static final Pattern LONG_PAT = Pattern.compile("[-|+]?\\d+");
|
||||
|
||||
static int compare(Row row, String aggregationName, Number value)
|
||||
static int compare(Row row, String aggregationName, Number value, Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
|
||||
Object metricValueObj = row.getRaw(aggregationName);
|
||||
|
||||
if (metricValueObj != null) {
|
||||
if (aggregators != null && aggregators.containsKey(aggregationName)) {
|
||||
metricValueObj = aggregators.get(aggregationName).finalizeComputation(metricValueObj);
|
||||
}
|
||||
|
||||
if (metricValueObj instanceof Long) {
|
||||
return Long.compare((Long) metricValueObj, value.longValue());
|
||||
long l = ((Long) metricValueObj).longValue();
|
||||
return Longs.compare(l, value.longValue());
|
||||
} else if (metricValueObj instanceof Float) {
|
||||
return Float.compare((Float) metricValueObj, value.floatValue());
|
||||
float l = ((Float) metricValueObj).floatValue();
|
||||
return Floats.compare(l, value.floatValue());
|
||||
} else if (metricValueObj instanceof Double) {
|
||||
return Double.compare((Double) metricValueObj, value.doubleValue());
|
||||
double l = ((Double) metricValueObj).longValue();
|
||||
return Doubles.compare(l, value.doubleValue());
|
||||
} else if (metricValueObj instanceof Integer) {
|
||||
int l = ((Integer) metricValueObj).intValue();
|
||||
return Ints.compare(l, value.intValue());
|
||||
} else if (metricValueObj instanceof String) {
|
||||
String metricValueStr = (String) metricValueObj;
|
||||
if (LONG_PAT.matcher(metricValueStr).matches()) {
|
||||
|
@ -21,6 +21,9 @@ package io.druid.query.groupby.having;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value",
|
||||
@ -31,6 +34,8 @@ public class LessThanHavingSpec extends BaseHavingSpec
|
||||
private final String aggregationName;
|
||||
private final Number value;
|
||||
|
||||
private volatile Map<String, AggregatorFactory> aggregators;
|
||||
|
||||
public LessThanHavingSpec(
|
||||
@JsonProperty("aggregation") String aggName,
|
||||
@JsonProperty("value") Number value
|
||||
@ -52,10 +57,16 @@ public class LessThanHavingSpec extends BaseHavingSpec
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregators(Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
this.aggregators = aggregators;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
return HavingSpecMetricComparator.compare(row, aggregationName, value) < 0;
|
||||
return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) < 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -22,6 +22,7 @@ package io.druid.query.groupby.having;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.Map;
|
||||
@ -51,6 +52,12 @@ public class NotHavingSpec extends BaseHavingSpec
|
||||
havingSpec.setRowSignature(rowSignature);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregators(Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
havingSpec.setAggregators(aggregators);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.List;
|
||||
@ -55,6 +56,14 @@ public class OrHavingSpec extends BaseHavingSpec
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAggregators(Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
for (HavingSpec havingSpec : havingSpecs) {
|
||||
havingSpec.setAggregators(aggregators);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
|
@ -3318,10 +3318,6 @@ public class GroupByQueryRunnerTest
|
||||
)
|
||||
);
|
||||
|
||||
// havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized.
|
||||
// See also: https://github.com/druid-io/druid/issues/2507
|
||||
expectedException.expect(ISE.class);
|
||||
expectedException.expectMessage("Unknown type of metric value");
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user