Finalize fields in postaggs (#3957)

* initial commits for finalizeFieldAccess #2433

* fix some bugs to run a query

* change name of method Queries.verifyAggregations to Queries.prepareAggregations

* add Uts

* fix Ut failures

* rebased to master

* address comments and add a Ut for arithmetic post aggregators

* rebased to the master

* address the comment of injection within arithmetic post aggregator

* address comments and introduce decorate() in the PostAggregator interface.

* Address comments. 1. Implements getComparator in FinalizingFieldAccessPostAggregator and add Uts for it 2. Some minor changes like renaming a method name.

* Fix a code style mismatch.

* Rebased to the master
This commit is contained in:
Gian Merlino 2017-02-21 16:32:14 -08:00 committed by GitHub
parent a47206eaf8
commit 985203b634
30 changed files with 639 additions and 30 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -97,6 +98,12 @@ public class SketchEstimatePostAggregator implements PostAggregator
return name;
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty
public PostAggregator getField()
{

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import com.yahoo.sketches.Util;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -93,6 +94,12 @@ public class SketchSetPostAggregator implements PostAggregator
return name;
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty
public String getFunc()
{

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -65,6 +67,12 @@ public class BucketsPostAggregator extends ApproximateHistogramPostAggregator
return ah.toHistogram(bucketSize, offset);
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty
public float getBucketSize()
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -61,6 +63,12 @@ public class CustomBucketsPostAggregator extends ApproximateHistogramPostAggrega
return ah.toHistogram(breaks);
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty
public float[] getBreaks()
{

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -62,6 +64,12 @@ public class EqualBucketsPostAggregator extends ApproximateHistogramPostAggregat
return ah.toHistogram(numBuckets);
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty
public int getNumBuckets()
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -70,6 +72,12 @@ public class MaxPostAggregator extends ApproximateHistogramPostAggregator
return ah.getMax();
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@Override
public String toString()
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -70,6 +72,12 @@ public class MinPostAggregator extends ApproximateHistogramPostAggregator
return ah.getMin();
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@Override
public String toString()
{

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -81,6 +83,12 @@ public class QuantilePostAggregator extends ApproximateHistogramPostAggregator
return ah.getQuantiles(new float[]{probability})[0];
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty
public float getProbability()
{

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -74,6 +76,12 @@ public class QuantilesPostAggregator extends ApproximateHistogramPostAggregator
return new Quantiles(probabilities, ah.getQuantiles(probabilities), ah.getMin(), ah.getMax());
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty
public float[] getProbabilities()
{

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
@ -82,6 +83,12 @@ public class StandardDeviationPostAggregator implements PostAggregator
return name;
}
@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty("fieldName")
public String getFieldName()
{

View File

@ -50,6 +50,7 @@ import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
import io.druid.query.aggregation.post.DoubleLeastPostAggregator;
import io.druid.query.aggregation.post.ExpressionPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import io.druid.query.aggregation.post.JavaScriptPostAggregator;
import io.druid.query.aggregation.post.LongGreatestPostAggregator;
import io.druid.query.aggregation.post.LongLeastPostAggregator;
@ -103,6 +104,7 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "expression", value = ExpressionPostAggregator.class),
@JsonSubTypes.Type(name = "arithmetic", value = ArithmeticPostAggregator.class),
@JsonSubTypes.Type(name = "fieldAccess", value = FieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "finalizingFieldAccess", value = FinalizingFieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "constant", value = ConstantPostAggregator.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class),
@JsonSubTypes.Type(name = "hyperUniqueCardinality", value = HyperUniqueFinalizingPostAggregator.class),

View File

@ -20,33 +20,49 @@
package io.druid.query;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public class Queries
{
public static void verifyAggregations(
public static List<PostAggregator> decoratePostAggregators(List<PostAggregator> postAggs,
Map<String, AggregatorFactory> aggFactories)
{
List<PostAggregator> decorated = Lists.newArrayListWithExpectedSize(postAggs.size());
for (PostAggregator aggregator : postAggs) {
decorated.add(aggregator.decorate(aggFactories));
}
return decorated;
}
public static List<PostAggregator> prepareAggregations(
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
Preconditions.checkNotNull(aggFactories, "aggregations cannot be null");
final Set<String> aggNames = Sets.newHashSet();
final Map<String, AggregatorFactory> aggsFactoryMap = Maps.newHashMap();
for (AggregatorFactory aggFactory : aggFactories) {
Preconditions.checkArgument(aggNames.add(aggFactory.getName()), "[%s] already defined", aggFactory.getName());
Preconditions.checkArgument(!aggsFactoryMap.containsKey(aggFactory.getName()),
"[%s] already defined", aggFactory.getName());
aggsFactoryMap.put(aggFactory.getName(), aggFactory);
}
if (postAggs != null && !postAggs.isEmpty()) {
final Set<String> combinedAggNames = Sets.newHashSet(aggNames);
final Set<String> combinedAggNames = Sets.newHashSet(aggsFactoryMap.keySet());
for (PostAggregator postAgg : postAggs) {
List<PostAggregator> decorated = Lists.newArrayListWithExpectedSize(postAggs.size());
for (final PostAggregator postAgg : postAggs) {
final Set<String> dependencies = postAgg.getDependentFields();
final Set<String> missing = Sets.difference(dependencies, combinedAggNames);
@ -54,8 +70,14 @@ public class Queries
missing.isEmpty(),
"Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName()
);
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined", postAgg.getName());
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()),
"[%s] already defined", postAgg.getName());
decorated.add(postAgg.decorate(aggsFactoryMap));
}
return decorated;
}
return postAggs;
}
}

View File

@ -37,4 +37,13 @@ public interface PostAggregator extends Cacheable
Object compute(Map<String, Object> combinedAggregators);
String getName();
/**
* Returns a richer post aggregator which are built from the given aggregators with their names and some accessible
* environmental variables such as ones in the object scope.
*
* @param aggregators A map of aggregator factories with their names.
*
*/
PostAggregator decorate(Map<String, AggregatorFactory> aggregators);
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
@ -86,6 +87,12 @@ public class HyperUniqueFinalizingPostAggregator implements PostAggregator
return name;
}
@Override
public HyperUniqueFinalizingPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty("fieldName")
public String getFieldName()
{

View File

@ -25,6 +25,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.Queries;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
@ -53,6 +55,7 @@ public class ArithmeticPostAggregator implements PostAggregator
private final Ops op;
private final Comparator comparator;
private final String ordering;
private Map<String, AggregatorFactory> aggFactoryMap;
public ArithmeticPostAggregator(
String name,
@ -124,6 +127,12 @@ public class ArithmeticPostAggregator implements PostAggregator
return name;
}
@Override
public ArithmeticPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return new ArithmeticPostAggregator(name, fnName, Queries.decoratePostAggregators(fields, aggregators), ordering);
}
@Override
public byte[] getCacheKey()
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
@ -80,6 +81,12 @@ public class ConstantPostAggregator implements PostAggregator
return name;
}
@Override
public ConstantPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty("value")
public Number getConstantValue()
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.Queries;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
@ -99,6 +101,12 @@ public class DoubleGreatestPostAggregator implements PostAggregator
return name;
}
@Override
public DoubleGreatestPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return new DoubleGreatestPostAggregator(name, Queries.decoratePostAggregators(fields, aggregators));
}
@JsonProperty
public List<PostAggregator> getFields()
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.Queries;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
@ -99,6 +101,12 @@ public class DoubleLeastPostAggregator implements PostAggregator
return name;
}
@Override
public DoubleLeastPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return new DoubleLeastPostAggregator(name, Queries.decoratePostAggregators(fields, aggregators));
}
@JsonProperty
public List<PostAggregator> getFields()
{

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.math.expr.Expr;
import io.druid.math.expr.Parser;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
@ -106,6 +107,12 @@ public class ExpressionPostAggregator implements PostAggregator
return name;
}
@Override
public ExpressionPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty("expression")
public String getExpression()
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
@ -73,6 +74,12 @@ public class FieldAccessPostAggregator implements PostAggregator
return name;
}
@Override
public FieldAccessPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@Override
public byte[] getCacheKey()
{

View File

@ -0,0 +1,167 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.post;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
public class FinalizingFieldAccessPostAggregator implements PostAggregator
{
private final String name;
private final String fieldName;
@JsonCreator
public FinalizingFieldAccessPostAggregator(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName
)
{
this.name = name;
this.fieldName = fieldName;
}
@Override
public Set<String> getDependentFields()
{
return Sets.newHashSet(fieldName);
}
@Override
public Comparator getComparator()
{
throw new UnsupportedOperationException();
}
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
throw new UnsupportedOperationException("No decorated");
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public FinalizingFieldAccessPostAggregator decorate(final Map<String, AggregatorFactory> aggregators)
{
return new FinalizingFieldAccessPostAggregator(name, fieldName)
{
@Override
public Comparator getComparator()
{
if (aggregators != null && aggregators.containsKey(fieldName)) {
return aggregators.get(fieldName).getComparator();
} else {
return Ordering.natural().nullsFirst();
}
}
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
if (aggregators != null && aggregators.containsKey(fieldName)) {
return aggregators.get(fieldName).finalizeComputation(
combinedAggregators.get(fieldName)
);
} else {
return combinedAggregators.get(fieldName);
}
}
};
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(PostAggregatorIds.FINALIZING_FIELD_ACCESS)
.appendString(fieldName)
.build();
}
@Override
public String toString()
{
return "FinalizingFieldAccessPostAggregator{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FinalizingFieldAccessPostAggregator that = (FinalizingFieldAccessPostAggregator) o;
if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0);
return result;
}
@VisibleForTesting
static FinalizingFieldAccessPostAggregator buildDecorated(
String name,
String fieldName,
Map<String, AggregatorFactory> aggregators
)
{
FinalizingFieldAccessPostAggregator ret = new FinalizingFieldAccessPostAggregator(name, fieldName);
return ret.decorate(aggregators);
}
}

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.js.JavaScriptConfig;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
import org.mozilla.javascript.Context;
@ -144,6 +145,12 @@ public class JavaScriptPostAggregator implements PostAggregator
return name;
}
@Override
public JavaScriptPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}
@JsonProperty
public List<String> getFieldNames()
{

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.druid.query.Queries;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
@ -100,6 +102,12 @@ public class LongGreatestPostAggregator implements PostAggregator
return name;
}
@Override
public LongGreatestPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return new LongGreatestPostAggregator(name, Queries.decoratePostAggregators(fields, aggregators));
}
@JsonProperty
public List<PostAggregator> getFields()
{

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.druid.query.Queries;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
@ -100,6 +102,12 @@ public class LongLeastPostAggregator implements PostAggregator
return name;
}
@Override
public LongLeastPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return new LongLeastPostAggregator(name, Queries.decoratePostAggregators(fields, aggregators));
}
@JsonProperty
public List<PostAggregator> getFields()
{

View File

@ -41,4 +41,5 @@ public class PostAggregatorIds
public static final byte DATA_SKETCHES_SKETCH_ESTIMATE = 17;
public static final byte DATA_SKETCHES_SKETCH_SET = 18;
public static final byte VARIANCE_STANDARD_DEVIATION = 19;
public static final byte FINALIZING_FIELD_ACCESS = 20;
}

View File

@ -120,12 +120,15 @@ public class GroupByQuery extends BaseQuery<Row>
Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec");
}
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.<AggregatorFactory>of() : aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs;
this.postAggregatorSpecs = Queries.prepareAggregations(
this.aggregatorSpecs,
postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs
);
this.havingSpec = havingSpec;
this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec;
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
// Verify no duplicate names between dimensions, aggregators, and postAggregators.
// They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other.

View File

@ -63,9 +63,11 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
this.dimFilter = dimFilter;
this.granularity = granularity;
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.<AggregatorFactory>of() : aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs;
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
this.postAggregatorSpecs = Queries.prepareAggregations(this.aggregatorSpecs,
postAggregatorSpecs == null
? ImmutableList.<PostAggregator>of()
: postAggregatorSpecs
);
}
@Override
@ -176,15 +178,15 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
public String toString()
{
return "TimeseriesQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", descending=" + isDescending() +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() +
'}';
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", descending=" + isDescending() +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() +
'}';
}
@Override
@ -211,7 +213,9 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) {
return false;
}
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null) {
if (postAggregatorSpecs != null
? !postAggregatorSpecs.equals(that.postAggregatorSpecs)
: that.postAggregatorSpecs != null) {
return false;
}

View File

@ -74,15 +74,17 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
this.dimFilter = dimFilter;
this.granularity = granularity;
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.<AggregatorFactory>of() : aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs;
this.postAggregatorSpecs = Queries.prepareAggregations(this.aggregatorSpecs,
postAggregatorSpecs == null
? ImmutableList.<PostAggregator>of()
: postAggregatorSpecs
);
Preconditions.checkNotNull(dimensionSpec, "dimensionSpec can't be null");
Preconditions.checkNotNull(topNMetricSpec, "must specify a metric");
Preconditions.checkArgument(threshold != 0, "Threshold cannot be equal to 0.");
topNMetricSpec.verifyPreconditions(this.aggregatorSpecs, this.postAggregatorSpecs);
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
}
@Override
@ -316,7 +318,9 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
if (threshold != topNQuery.threshold) {
return false;
}
if (aggregatorSpecs != null ? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs) : topNQuery.aggregatorSpecs != null) {
if (aggregatorSpecs != null
? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs)
: topNQuery.aggregatorSpecs != null) {
return false;
}
if (dimFilter != null ? !dimFilter.equals(topNQuery.dimFilter) : topNQuery.dimFilter != null) {
@ -328,7 +332,9 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
if (granularity != null ? !granularity.equals(topNQuery.granularity) : topNQuery.granularity != null) {
return false;
}
if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs) : topNQuery.postAggregatorSpecs != null) {
if (postAggregatorSpecs != null
? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs)
: topNQuery.postAggregatorSpecs != null) {
return false;
}
if (topNMetricSpec != null ? !topNMetricSpec.equals(topNQuery.topNMetricSpec) : topNQuery.topNMetricSpec != null) {

View File

@ -59,7 +59,7 @@ public class QueriesTest
boolean exceptionOccured = false;
try {
Queries.verifyAggregations(aggFactories, postAggs);
Queries.prepareAggregations(aggFactories, postAggs);
}
catch (IllegalArgumentException e) {
exceptionOccured = true;
@ -91,7 +91,7 @@ public class QueriesTest
boolean exceptionOccured = false;
try {
Queries.verifyAggregations(aggFactories, postAggs);
Queries.prepareAggregations(aggFactories, postAggs);
}
catch (IllegalArgumentException e) {
exceptionOccured = true;
@ -145,7 +145,7 @@ public class QueriesTest
boolean exceptionOccured = false;
try {
Queries.verifyAggregations(aggFactories, postAggs);
Queries.prepareAggregations(aggFactories, postAggs);
}
catch (IllegalArgumentException e) {
exceptionOccured = true;
@ -199,7 +199,7 @@ public class QueriesTest
boolean exceptionOccured = false;
try {
Queries.verifyAggregations(aggFactories, postAggs);
Queries.prepareAggregations(aggFactories, postAggs);
}
catch (IllegalArgumentException e) {
exceptionOccured = true;

View File

@ -0,0 +1,242 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.post;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import io.druid.data.input.MapBasedRow;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.AggregatorsModule;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregator;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class FinalizingFieldAccessPostAggregatorTest
{
@Rule
public final TemporaryFolder tempFoler = new TemporaryFolder();
@Test(expected = UnsupportedOperationException.class)
public void testComputeWithoutFinalizing()
{
String aggName = "rows";
Aggregator agg = new CountAggregator();
agg.aggregate();
agg.aggregate();
agg.aggregate();
Map<String, Object> metricValues = Maps.newHashMap();
metricValues.put(aggName, agg.get());
FinalizingFieldAccessPostAggregator postAgg = new FinalizingFieldAccessPostAggregator("final_rows", aggName);
Assert.assertEquals(new Long(3L), postAgg.compute(metricValues));
}
@Test
public void testComputedWithFinalizing()
{
String aggName = "biily";
AggregatorFactory aggFactory = EasyMock.createMock(AggregatorFactory.class);
EasyMock.expect(aggFactory.finalizeComputation("test"))
.andReturn(new Long(3L))
.times(1);
EasyMock.replay(aggFactory);
FinalizingFieldAccessPostAggregator postAgg = FinalizingFieldAccessPostAggregator.buildDecorated(
"final_billy", aggName, ImmutableMap.of(aggName, aggFactory)
);
Map<String, Object> metricValues = Maps.newHashMap();
metricValues.put(aggName, "test");
Assert.assertEquals(new Long(3L), postAgg.compute(metricValues));
EasyMock.verify(aggFactory);
}
@Test
public void testComputedInArithmeticPostAggregator()
{
String aggName = "billy";
AggregatorFactory aggFactory = EasyMock.createMock(AggregatorFactory.class);
EasyMock.expect(aggFactory.finalizeComputation("test"))
.andReturn(new Long(3L))
.times(1);
EasyMock.replay(aggFactory);
FinalizingFieldAccessPostAggregator postAgg = FinalizingFieldAccessPostAggregator.buildDecorated(
"final_billy", aggName, ImmutableMap.of(aggName, aggFactory)
);
Map<String, Object> metricValues = Maps.newHashMap();
metricValues.put(aggName, "test");
List<PostAggregator> postAggsList = Lists.newArrayList(
new ConstantPostAggregator("roku", 6), postAgg);
ArithmeticPostAggregator arithmeticPostAggregator = new ArithmeticPostAggregator("add", "+", postAggsList);
Assert.assertEquals(new Double(9.0f), arithmeticPostAggregator.compute(metricValues));
EasyMock.verify();
}
@Test
public void testComparatorsWithFinalizing() throws Exception
{
String aggName = "billy";
AggregatorFactory aggFactory = EasyMock.createMock(AggregatorFactory.class);
EasyMock.expect(aggFactory.finalizeComputation("test_val1"))
.andReturn(new Long(10L))
.times(1);
EasyMock.expect(aggFactory.finalizeComputation("test_val2"))
.andReturn(new Long(21))
.times(1);
EasyMock.expect(aggFactory.finalizeComputation("test_val3"))
.andReturn(new Long(3))
.times(1);
EasyMock.expect(aggFactory.finalizeComputation("test_val4"))
.andReturn(null)
.times(1);
EasyMock.expect(aggFactory.getComparator())
.andReturn(Ordering.natural().<Long>nullsLast())
.times(1);
EasyMock.replay(aggFactory);
FinalizingFieldAccessPostAggregator postAgg = FinalizingFieldAccessPostAggregator.buildDecorated(
"final_billy", aggName, ImmutableMap.of(aggName, aggFactory)
);
List<Object> computedValues = Lists.newArrayList();
computedValues.add(postAgg.compute(ImmutableMap.of(aggName, (Object)"test_val1")));
computedValues.add(postAgg.compute(ImmutableMap.of(aggName, (Object)"test_val2")));
computedValues.add(postAgg.compute(ImmutableMap.of(aggName, (Object)"test_val3")));
computedValues.add(postAgg.compute(ImmutableMap.of(aggName, (Object)"test_val4")));
Collections.sort(computedValues, postAgg.getComparator());
Assert.assertArrayEquals(new Object[]{3L, 10L, 21L, null}, computedValues.toArray(new Object[]{}));
EasyMock.verify();
}
@Test
public void testComparatorsWithFinalizingAndComparatorNull() throws Exception
{
String aggName = "billy";
AggregatorFactory aggFactory = EasyMock.createMock(AggregatorFactory.class);
EasyMock.expect(aggFactory.getComparator())
.andReturn(null)
.times(1);
EasyMock.replay(aggFactory);
FinalizingFieldAccessPostAggregator postAgg = FinalizingFieldAccessPostAggregator.buildDecorated(
"final_billy", "joe", ImmutableMap.of(aggName, aggFactory));
List<Object> computedValues = Lists.newArrayList();
Map<String, Object> forNull = Maps.newHashMap();
forNull.put("joe", (Object)null); // guava does not allow the value to be null.
computedValues.add(postAgg.compute(ImmutableMap.of("joe", (Object)"test_val1")));
computedValues.add(postAgg.compute(ImmutableMap.of("joe", (Object)"test_val2")));
computedValues.add(postAgg.compute(forNull));
computedValues.add(postAgg.compute(ImmutableMap.of("joe", (Object)"test_val4")));
Collections.sort(computedValues, postAgg.getComparator());
Assert.assertArrayEquals(new Object[]{null, "test_val1", "test_val2", "test_val4"}, computedValues.toArray(new Object[]{}));
EasyMock.verify();
}
@Test
public void testIngestAndQueryWithArithmeticPostAggregator() throws Exception
{
AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
Lists.newArrayList(new AggregatorsModule()),
GroupByQueryRunnerTest.testConfigs().get(0),
tempFoler
);
String metricSpec = "[{\"type\": \"hyperUnique\", \"name\": \"hll_market\", \"fieldName\": \"market\"},"
+ "{\"type\": \"hyperUnique\", \"name\": \"hll_quality\", \"fieldName\": \"quality\"}]";
String parseSpec = "{"
+ "\"type\" : \"string\","
+ "\"parseSpec\" : {"
+ " \"format\" : \"tsv\","
+ " \"timestampSpec\" : {"
+ " \"column\" : \"timestamp\","
+ " \"format\" : \"auto\""
+ "},"
+ " \"dimensionsSpec\" : {"
+ " \"dimensions\": [],"
+ " \"dimensionExclusions\" : [],"
+ " \"spatialDimensions\" : []"
+ " },"
+ " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]"
+ " }"
+ "}";
String query = "{"
+ "\"queryType\": \"groupBy\","
+ "\"dataSource\": \"test_datasource\","
+ "\"granularity\": \"ALL\","
+ "\"dimensions\": [],"
+ "\"aggregations\": ["
+ " { \"type\": \"hyperUnique\", \"name\": \"hll_market\", \"fieldName\": \"hll_market\" },"
+ " { \"type\": \"hyperUnique\", \"name\": \"hll_quality\", \"fieldName\": \"hll_quality\" }"
+ "],"
+ "\"postAggregations\": ["
+ " { \"type\": \"arithmetic\", \"name\": \"uniq_add\", \"fn\": \"+\", \"fields\":["
+ " { \"type\": \"finalizingFieldAccess\", \"name\": \"uniq_market\", \"fieldName\": \"hll_market\" },"
+ " { \"type\": \"finalizingFieldAccess\", \"name\": \"uniq_quality\", \"fieldName\": \"hll_quality\" }]"
+ " }"
+ "],"
+ "\"intervals\": [ \"1970/2050\" ]"
+ "}";
Sequence seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("druid.sample.tsv").getFile()),
parseSpec,
metricSpec,
0,
QueryGranularities.NONE,
50000,
query
);
MapBasedRow row = (MapBasedRow) Sequences.toList(seq, Lists.newArrayList()).get(0);
Assert.assertEquals(3.0, row.getFloatMetric("hll_market"), 0.1);
Assert.assertEquals(9.0, row.getFloatMetric("hll_quality"), 0.1);
Assert.assertEquals(12.0, row.getFloatMetric("uniq_add"), 0.1);
}
}